12 Commits

Author SHA1 Message Date
Abhimanyu Saharan
2a923f2c32 fix: mypy redundant cast in approval check 2026-02-15 06:15:41 +00:00
Abhimanyu Saharan
11b2d9a968 fix: robust PR->task mapping for github_pr_url JSON field 2026-02-15 06:13:32 +00:00
Abhimanyu Saharan
ac0199e3b8 style: fix flake8 blank lines 2026-02-15 06:09:28 +00:00
Abhimanyu Saharan
ee05ad2f28 test: cover GitHub PR URL parsing 2026-02-15 06:08:25 +00:00
Abhimanyu Saharan
d44bbedf0e perf: bound concurrency for approval check reconciliation 2026-02-15 06:07:17 +00:00
Abhimanyu Saharan
16fe7d260c chore: sanitize GitHub API error details 2026-02-15 06:05:38 +00:00
Abhimanyu Saharan
a8c4e4b339 fix: canonicalize parsed GitHub PR URL 2026-02-15 06:03:15 +00:00
Abhimanyu Saharan
87add28339 fix: satisfy mypy on board id query 2026-02-15 06:00:02 +00:00
Abhimanyu Saharan
f3e7c6b67e fix: mypy for github approval check services 2026-02-15 05:58:17 +00:00
Abhimanyu Saharan
0c6c093736 feat(github): add approval-check reconciliation scheduler 2026-02-15 05:55:44 +00:00
Abhimanyu Saharan
de807aca25 test: fix flake8 unused imports 2026-02-15 05:51:39 +00:00
Abhimanyu Saharan
009218158b feat(github): add mission-control approval check-run gate 2026-02-15 05:50:03 +00:00
12 changed files with 1333 additions and 2 deletions

View File

@@ -19,6 +19,15 @@ CLERK_VERIFY_IAT=true
CLERK_LEEWAY=10.0
# Database
DB_AUTO_MIGRATE=false
# GitHub integration (for Check Runs / required-check enforcement)
# Used by mission-control/approval check updater.
GH_TOKEN=
# Periodic reconciliation safety net (rq-scheduler)
GITHUB_APPROVAL_CHECK_SCHEDULE_ID=mission-control-approval-check-reconcile
GITHUB_APPROVAL_CHECK_SCHEDULE_INTERVAL_SECONDS=900
GITHUB_APPROVAL_CHECK_RECONCILE_CONCURRENCY=3
GITHUB_APPROVAL_CHECK_RECONCILE_MAX_PR_URLS=500
# Webhook queue / worker
WEBHOOK_REDIS_URL=redis://localhost:6379/0
WEBHOOK_QUEUE_NAME=webhook-dispatch

View File

@@ -38,6 +38,10 @@ from app.services.approval_task_links import (
replace_approval_task_links,
task_counts_for_board,
)
from app.services.github.mission_control_approval_check import (
github_approval_check_enabled,
sync_github_approval_check_for_task_ids,
)
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
if TYPE_CHECKING:
@@ -426,6 +430,20 @@ async def create_approval(
await session.commit()
await session.refresh(approval)
title_by_id = await _task_titles_by_id(session, task_ids=set(task_ids))
if github_approval_check_enabled() and task_ids:
try:
await sync_github_approval_check_for_task_ids(
session,
board_id=board.id,
task_ids=list(task_ids),
)
except Exception:
logger.exception(
"approval.github_check_sync_failed",
extra={"board_id": str(board.id), "task_ids": [str(tid) for tid in task_ids]},
)
return _approval_to_read(
approval,
task_ids=task_ids,
@@ -481,5 +499,26 @@ async def update_approval(
approval.id,
approval.status,
)
if github_approval_check_enabled():
try:
task_ids_by_approval = await load_task_ids_by_approval(
session,
approval_ids=[approval.id],
)
approval_task_ids = task_ids_by_approval.get(approval.id) or []
if not approval_task_ids and approval.task_id is not None:
approval_task_ids = [approval.task_id]
if approval_task_ids:
await sync_github_approval_check_for_task_ids(
session,
board_id=board.id,
task_ids=list(approval_task_ids),
)
except Exception:
logger.exception(
"approval.github_check_sync_failed",
extra={"board_id": str(board.id), "approval_id": str(approval.id)},
)
reads = await _approval_reads(session, [approval])
return reads[0]

View File

@@ -23,6 +23,7 @@ from app.api.deps import (
require_admin_auth,
require_admin_or_agent,
)
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db import crud
from app.db.pagination import paginate
@@ -56,6 +57,10 @@ from app.services.approval_task_links import (
load_task_ids_by_approval,
pending_approval_conflicts_by_task,
)
from app.services.github.mission_control_approval_check import (
github_approval_check_enabled,
sync_github_approval_check_for_pr_url,
)
from app.services.mentions import extract_mentions, matches_agent_mention
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
@@ -76,6 +81,8 @@ from app.services.task_dependencies import (
validate_dependency_update,
)
logger = get_logger(__name__)
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Sequence
@@ -2402,6 +2409,22 @@ async def _finalize_updated_task(
await _record_task_update_activity(session, update=update)
await _notify_task_update_assignment_changes(session, update=update)
# Sync GitHub approval gate check when a task's PR link changes.
if github_approval_check_enabled() and update.custom_field_values_set:
pr_url = update.custom_field_values.get("github_pr_url")
if isinstance(pr_url, str) and pr_url.strip():
try:
await sync_github_approval_check_for_pr_url(
session,
board_id=update.board_id,
pr_url=pr_url.strip(),
)
except Exception:
logger.exception(
"task.github_check_sync_failed",
extra={"board_id": str(update.board_id), "task_id": str(update.task.id)},
)
return await _task_read_response(
session,
task=update.task,

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from pathlib import Path
from typing import Self
from pydantic import Field, model_validator
from pydantic import AliasChoices, Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from app.core.auth_mode import AuthMode
@@ -50,6 +50,19 @@ class Settings(BaseSettings):
cors_origins: str = ""
base_url: str = ""
# GitHub integration
# Token used for GitHub REST API calls (checks/status updates). Supports GH_TOKEN or GITHUB_TOKEN.
github_token: str = Field(
default="",
validation_alias=AliasChoices("GH_TOKEN", "GITHUB_TOKEN"),
)
# Periodic reconciliation safety net for mission-control/approval checks.
github_approval_check_schedule_id: str = "mission-control-approval-check-reconcile"
github_approval_check_schedule_interval_seconds: int = 900
github_approval_check_reconcile_concurrency: int = Field(default=3, ge=1, le=10)
github_approval_check_reconcile_max_pr_urls: int = Field(default=500, ge=1)
# Database lifecycle
db_auto_migrate: bool = False

View File

@@ -0,0 +1 @@
"""GitHub integration services (checks, statuses, PR metadata)."""

View File

@@ -0,0 +1,221 @@
"""Minimal GitHub REST client used for merge-policy enforcement.
This module is intentionally small and purpose-built for:
- PR metadata lookup (head SHA)
- Check Runs upsert (create or update by name)
It uses a repo-scoped token (PAT or GitHub App token) provided via settings.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Literal
import httpx
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
_ERROR_DETAIL_MAX_CHARS = 500
def _safe_error_detail(resp: httpx.Response) -> str:
"""Return a safe, truncated error detail string for exceptions/logs."""
try:
text = (resp.text or "").strip()
except Exception:
return ""
if not text:
return ""
if len(text) <= _ERROR_DETAIL_MAX_CHARS:
return text
return f"{text[: _ERROR_DETAIL_MAX_CHARS - 3]}..."
GITHUB_API_BASE_URL = "https://api.github.com"
GITHUB_API_VERSION = "2022-11-28"
@dataclass(frozen=True)
class ParsedPullRequest:
owner: str
repo: str
number: int
url: str
def parse_pull_request_url(url: str) -> ParsedPullRequest | None:
"""Parse a GitHub PR URL: https://github.com/<owner>/<repo>/pull/<number>."""
raw = (url or "").strip()
if not raw:
return None
if raw.startswith("http://"):
# normalize; we only accept github.com URLs
raw = "https://" + raw.removeprefix("http://")
if not raw.startswith("https://github.com/"):
return None
path = raw.removeprefix("https://github.com/")
parts = [p for p in path.split("/") if p]
if len(parts) < 4:
return None
owner, repo, kind, num = parts[0], parts[1], parts[2], parts[3]
if kind != "pull":
return None
try:
number = int(num)
except ValueError:
return None
if number <= 0:
return None
canonical_url = f"https://github.com/{owner}/{repo}/pull/{number}"
return ParsedPullRequest(owner=owner, repo=repo, number=number, url=canonical_url)
class GitHubClientError(RuntimeError):
pass
def _auth_headers() -> dict[str, str]:
token = (settings.github_token or "").strip()
if not token:
raise GitHubClientError("GitHub token is not configured (GH_TOKEN/GITHUB_TOKEN).")
return {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": GITHUB_API_VERSION,
}
async def get_pull_request_head_sha(pr: ParsedPullRequest) -> str:
"""Return head SHA for a PR."""
url = f"{GITHUB_API_BASE_URL}/repos/{pr.owner}/{pr.repo}/pulls/{pr.number}"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, headers=_auth_headers())
if resp.status_code >= 400:
detail = _safe_error_detail(resp)
suffix = f" {detail}" if detail else ""
raise GitHubClientError(f"GitHub PR lookup failed: {resp.status_code}{suffix}")
data = resp.json()
head = data.get("head")
if not isinstance(head, dict) or not isinstance(head.get("sha"), str):
raise GitHubClientError("GitHub PR response missing head.sha")
# mypy: dict indexing returns Any; we've validated it's a str above.
return str(head["sha"])
async def _find_check_run_id(*, owner: str, repo: str, ref: str, check_name: str) -> int | None:
# Docs: GET /repos/{owner}/{repo}/commits/{ref}/check-runs
url = f"{GITHUB_API_BASE_URL}/repos/{owner}/{repo}/commits/{ref}/check-runs"
params: dict[str, str | int] = {"check_name": check_name, "per_page": 100}
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, headers=_auth_headers(), params=params)
if resp.status_code >= 400:
detail = _safe_error_detail(resp)
suffix = f" {detail}" if detail else ""
raise GitHubClientError(
f"GitHub check-runs lookup failed: {resp.status_code}{suffix}",
)
payload = resp.json()
runs = payload.get("check_runs")
if not isinstance(runs, list):
return None
for run in runs:
if not isinstance(run, dict):
continue
if run.get("name") != check_name:
continue
run_id = run.get("id")
if isinstance(run_id, int):
return run_id
return None
CheckStatus = Literal["queued", "in_progress", "completed"]
CheckConclusion = Literal[
"success",
"failure",
"neutral",
"cancelled",
"skipped",
"timed_out",
"action_required",
]
async def upsert_check_run(
*,
owner: str,
repo: str,
head_sha: str,
check_name: str,
status: CheckStatus,
conclusion: CheckConclusion | None,
title: str,
summary: str,
details_url: str | None = None,
) -> None:
"""Create or update a check run on a commit SHA.
If a check run with the same name exists on the ref, we patch it.
Otherwise, we create a new one.
"""
payload: dict[str, Any] = {
"name": check_name,
"head_sha": head_sha,
"status": status,
"output": {
"title": title,
"summary": summary,
},
}
if details_url:
payload["details_url"] = details_url
if status == "completed":
if conclusion is None:
raise ValueError("conclusion is required when status=completed")
payload["conclusion"] = conclusion
run_id = await _find_check_run_id(owner=owner, repo=repo, ref=head_sha, check_name=check_name)
if run_id is None:
url = f"{GITHUB_API_BASE_URL}/repos/{owner}/{repo}/check-runs"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, headers={**_auth_headers(), "Accept": "application/vnd.github+json"}, json=payload)
if resp.status_code >= 400:
detail = _safe_error_detail(resp)
suffix = f" {detail}" if detail else ""
raise GitHubClientError(
f"GitHub check-run create failed: {resp.status_code}{suffix}",
)
logger.info(
"github.check_run.created",
extra={"owner": owner, "repo": repo, "sha": head_sha, "check": check_name},
)
return
url = f"{GITHUB_API_BASE_URL}/repos/{owner}/{repo}/check-runs/{run_id}"
# PATCH payload should not include head_sha/name for updates? Safe to include minimal fields.
patch_payload = {
"status": status,
"output": payload["output"],
}
if details_url:
patch_payload["details_url"] = details_url
if status == "completed":
patch_payload["conclusion"] = conclusion
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.patch(url, headers=_auth_headers(), json=patch_payload)
if resp.status_code >= 400:
detail = _safe_error_detail(resp)
suffix = f" {detail}" if detail else ""
raise GitHubClientError(
f"GitHub check-run update failed: {resp.status_code}{suffix}",
)
logger.info(
"github.check_run.updated",
extra={"owner": owner, "repo": repo, "sha": head_sha, "check": check_name, "id": run_id},
)

View File

@@ -0,0 +1,453 @@
"""Mission Control approval gate → GitHub required check.
This module maintains a GitHub Check Run (recommended) named:
- `mission-control/approval`
The check is intended to be added to GitHub ruleset required checks so PRs
cannot merge unless the corresponding Mission Control task has an approved
in-app approval.
Mapping:
- PR → Task: by `custom_field_values.github_pr_url` exact match.
- Task → Approval: any linked Approval rows with status in {pending, approved, rejected}.
Triggers (implemented via API hooks):
- approval created / resolved
- task github_pr_url updated
A periodic reconciliation job should call the sync functions as a safety net.
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import Literal, cast
from uuid import UUID
from sqlmodel import col, select
from app.core.config import settings
from app.core.logging import get_logger
from app.db.session import async_session_maker
from app.models.approval_task_links import ApprovalTaskLink
from app.models.approvals import Approval
from app.models.boards import Board
from app.models.task_custom_fields import TaskCustomFieldDefinition, TaskCustomFieldValue
from app.models.tasks import Task
from app.services.github.client import (
GitHubClientError,
get_pull_request_head_sha,
parse_pull_request_url,
upsert_check_run,
)
if False: # pragma: no cover
from sqlmodel.ext.asyncio.session import AsyncSession
logger = get_logger(__name__)
CHECK_NAME = "mission-control/approval"
# Default action types that qualify as a "merge gate" approval.
# (Action types are free-form today; keep this conservative but configurable later.)
REQUIRED_ACTION_TYPES = {"mark_done", "mark_task_done"}
CheckOutcome = Literal["success", "pending", "rejected", "missing", "error", "multiple"]
@dataclass(frozen=True)
class ApprovalGateEvaluation:
outcome: CheckOutcome
task_ids: tuple[UUID, ...] = ()
summary: str = ""
async def _board_org_id(session: AsyncSession, *, board_id: UUID) -> UUID | None:
return (
await session.exec(
select(col(Board.organization_id)).where(col(Board.id) == board_id),
)
).first()
async def _tasks_for_pr_url(
session: AsyncSession,
*,
board_id: UUID,
pr_url: str,
) -> list[Task]:
"""Return tasks whose `github_pr_url` custom field matches the PR URL.
NOTE: custom-field values are stored as JSON, and cross-dialect equality
semantics can be surprising. We therefore fetch candidate rows and perform a
strict Python-side string match.
"""
org_id = await _board_org_id(session, board_id=board_id)
if org_id is None:
return []
normalized = pr_url.strip()
if not normalized:
return []
statement = (
select(Task, col(TaskCustomFieldValue.value))
.join(TaskCustomFieldValue, col(TaskCustomFieldValue.task_id) == col(Task.id))
.join(
TaskCustomFieldDefinition,
col(TaskCustomFieldDefinition.id)
== col(TaskCustomFieldValue.task_custom_field_definition_id),
)
.where(col(Task.board_id) == board_id)
.where(col(TaskCustomFieldDefinition.organization_id) == org_id)
.where(col(TaskCustomFieldDefinition.field_key) == "github_pr_url")
.order_by(col(Task.created_at).asc())
)
rows = list(await session.exec(statement))
tasks: list[Task] = []
for task, value in rows:
if not isinstance(task, Task):
continue
if isinstance(value, str) and value.strip() == normalized:
tasks.append(task)
return tasks
async def _approval_rows_for_task(
session: AsyncSession,
*,
board_id: UUID,
task_id: UUID,
) -> list[Approval]:
# Linked approvals (new style)
linked_stmt = (
select(Approval)
.join(ApprovalTaskLink, col(ApprovalTaskLink.approval_id) == col(Approval.id))
.where(col(Approval.board_id) == board_id)
.where(col(ApprovalTaskLink.task_id) == task_id)
.order_by(col(Approval.created_at).asc())
)
linked = list(await session.exec(linked_stmt))
# Legacy approvals (Approval.task_id) not linked via ApprovalTaskLink
legacy_stmt = (
select(Approval)
.where(col(Approval.board_id) == board_id)
.where(col(Approval.task_id) == task_id)
.order_by(col(Approval.created_at).asc())
)
legacy = list(await session.exec(legacy_stmt))
# Merge unique by id
by_id: dict[UUID, Approval] = {}
for approval in [*linked, *legacy]:
if isinstance(approval, Approval):
by_id.setdefault(approval.id, approval)
return list(by_id.values())
def _qualifies_for_gate(approval: Approval) -> bool:
# If action types evolve, we can broaden this; for now keep it anchored.
return approval.action_type in REQUIRED_ACTION_TYPES
async def evaluate_approval_gate_for_pr_url(
session: AsyncSession,
*,
board_id: UUID,
pr_url: str,
) -> ApprovalGateEvaluation:
tasks = await _tasks_for_pr_url(session, board_id=board_id, pr_url=pr_url)
if not tasks:
return ApprovalGateEvaluation(
outcome="missing",
task_ids=(),
summary=(
"No Mission Control task is linked to this PR. Set the task custom field "
"`github_pr_url` to this PR URL."
),
)
if len(tasks) > 1:
return ApprovalGateEvaluation(
outcome="multiple",
task_ids=tuple(task.id for task in tasks),
summary=(
"Multiple Mission Control tasks are linked to this PR URL. "
"Ensure exactly one task has `github_pr_url` set to this PR."
),
)
task = tasks[0]
approvals = await _approval_rows_for_task(session, board_id=board_id, task_id=task.id)
gate_approvals = [a for a in approvals if _qualifies_for_gate(a)]
if not gate_approvals:
return ApprovalGateEvaluation(
outcome="missing",
task_ids=(task.id,),
summary=(
"No qualifying approval found for this task. Create an approval request "
f"(action_type in {sorted(REQUIRED_ACTION_TYPES)})."
),
)
statuses = [str(a.status) for a in gate_approvals]
if any(s == "approved" for s in statuses):
return ApprovalGateEvaluation(
outcome="success",
task_ids=(task.id,),
summary="Approval is approved. Merge is permitted.",
)
if any(s == "rejected" for s in statuses):
return ApprovalGateEvaluation(
outcome="rejected",
task_ids=(task.id,),
summary="Approval was rejected. Merge is blocked until a new approval is granted.",
)
if any(s == "pending" for s in statuses):
return ApprovalGateEvaluation(
outcome="pending",
task_ids=(task.id,),
summary="Approval is pending. Merge is blocked until approved.",
)
return ApprovalGateEvaluation(
outcome="error",
task_ids=(task.id,),
summary=f"Unexpected approval statuses: {sorted(set(statuses))}",
)
async def sync_github_approval_check_for_pr_url(
session: AsyncSession,
*,
board_id: UUID,
pr_url: str,
) -> None:
"""Upsert the GitHub check run for a PR URL based on Mission Control approval state."""
parsed = parse_pull_request_url(pr_url)
if parsed is None:
logger.warning(
"github.approval_check.invalid_pr_url",
extra={"board_id": str(board_id), "pr_url": pr_url},
)
return
try:
evaluation = await evaluate_approval_gate_for_pr_url(
session,
board_id=board_id,
pr_url=pr_url,
)
head_sha = await get_pull_request_head_sha(parsed)
title = "Mission Control approval gate"
summary_lines = [
f"PR: {parsed.url}",
f"Board: {board_id}",
]
if evaluation.task_ids:
summary_lines.append("Task(s): " + ", ".join(str(tid) for tid in evaluation.task_ids))
summary_lines.append("")
summary_lines.append(evaluation.summary)
if evaluation.outcome == "success":
await upsert_check_run(
owner=parsed.owner,
repo=parsed.repo,
head_sha=head_sha,
check_name=CHECK_NAME,
status="completed",
conclusion="success",
title=title,
summary="\n".join(summary_lines),
)
return
if evaluation.outcome == "pending":
# Keep as in_progress to clearly signal it's waiting.
await upsert_check_run(
owner=parsed.owner,
repo=parsed.repo,
head_sha=head_sha,
check_name=CHECK_NAME,
status="in_progress",
conclusion=None,
title=title,
summary="\n".join(summary_lines),
)
return
# failure-like outcomes
await upsert_check_run(
owner=parsed.owner,
repo=parsed.repo,
head_sha=head_sha,
check_name=CHECK_NAME,
status="completed",
conclusion="failure",
title=title,
summary="\n".join(summary_lines),
)
except GitHubClientError as exc:
logger.warning(
"github.approval_check.github_error",
extra={"board_id": str(board_id), "pr_url": pr_url, "error": str(exc)},
)
except Exception as exc:
logger.exception(
"github.approval_check.unexpected",
extra={"board_id": str(board_id), "pr_url": pr_url, "error": str(exc)},
)
async def sync_github_approval_check_for_task_ids(
session: AsyncSession,
*,
board_id: UUID,
task_ids: list[UUID],
) -> None:
"""Sync approval checks for any tasks that have github_pr_url set.
Used by approval hooks (one approval can link multiple tasks).
"""
if not task_ids:
return
# Load custom-field values for these tasks and find github_pr_url.
# We reuse the same join approach but filter by task ids.
org_id = await _board_org_id(session, board_id=board_id)
if org_id is None:
return
stmt = (
select(col(TaskCustomFieldValue.task_id), col(TaskCustomFieldValue.value))
.join(
TaskCustomFieldDefinition,
col(TaskCustomFieldDefinition.id)
== col(TaskCustomFieldValue.task_custom_field_definition_id),
)
.where(col(TaskCustomFieldDefinition.organization_id) == org_id)
.where(col(TaskCustomFieldDefinition.field_key) == "github_pr_url")
.where(col(TaskCustomFieldValue.task_id).in_(task_ids))
)
rows = list(await session.exec(stmt))
pr_urls: set[str] = set()
for _task_id, value in rows:
if isinstance(value, str) and value.strip():
pr_urls.add(value.strip())
for pr_url in sorted(pr_urls):
await sync_github_approval_check_for_pr_url(session, board_id=board_id, pr_url=pr_url)
async def reconcile_github_approval_checks_for_board(
session: AsyncSession,
*,
board_id: UUID,
) -> int:
"""Periodic reconciliation safety net.
Returns number of distinct PR URLs processed.
Intended to be run by a cron/worker periodically.
"""
org_id = await _board_org_id(session, board_id=board_id)
if org_id is None:
return 0
stmt = (
select(col(TaskCustomFieldValue.value))
.join(
TaskCustomFieldDefinition,
col(TaskCustomFieldDefinition.id)
== col(TaskCustomFieldValue.task_custom_field_definition_id),
)
.join(Task, col(Task.id) == col(TaskCustomFieldValue.task_id))
.where(col(Task.board_id) == board_id)
.where(col(TaskCustomFieldDefinition.organization_id) == org_id)
.where(col(TaskCustomFieldDefinition.field_key) == "github_pr_url")
)
raw_rows = list(await session.exec(stmt))
rows = cast(list[tuple[object]], raw_rows)
pr_urls: set[str] = set()
for (value,) in rows:
if isinstance(value, str) and value.strip():
pr_urls.add(value.strip())
pr_url_list = sorted(pr_urls)
max_urls = settings.github_approval_check_reconcile_max_pr_urls
if len(pr_url_list) > max_urls:
logger.warning(
"github.approval_check.reconcile.truncated_pr_urls",
extra={"board_id": str(board_id), "count": len(pr_url_list), "max": max_urls},
)
pr_url_list = pr_url_list[:max_urls]
sem = asyncio.Semaphore(settings.github_approval_check_reconcile_concurrency)
async def _run(url: str) -> None:
async with sem:
await sync_github_approval_check_for_pr_url(
session,
board_id=board_id,
pr_url=url,
)
# Process concurrently but bounded to avoid overwhelming GitHub.
results = await asyncio.gather(*[_run(url) for url in pr_url_list], return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.exception(
"github.approval_check.reconcile.pr_failed",
extra={"board_id": str(board_id), "error": str(result)},
)
return len(pr_url_list)
async def reconcile_mission_control_approval_checks_for_all_boards() -> int:
"""Reconcile approval checks for every board.
Returns total number of distinct PR URLs processed across boards.
This is intentionally a safety net: the primary, low-latency updates happen on
approval create/resolution and task github_pr_url updates.
"""
async with async_session_maker() as session:
raw_board_ids = list(
await session.exec(
select(col(Board.id)).order_by(col(Board.created_at).asc()),
),
)
board_ids = [value for value in raw_board_ids if isinstance(value, UUID)]
processed = 0
for board_id in board_ids:
try:
processed += await reconcile_github_approval_checks_for_board(
session,
board_id=board_id,
)
except Exception:
logger.exception(
"github.approval_check.reconcile.board_failed",
extra={"board_id": str(board_id)},
)
return processed
def github_approval_check_enabled() -> bool:
return bool((settings.github_token or "").strip())

View File

@@ -0,0 +1,84 @@
"""Scheduler bootstrap for Mission Control GitHub approval check reconciliation.
This uses rq-scheduler (same pattern as webhook dispatch scheduler) to periodically
reconcile the `mission-control/approval` check run state.
The periodic job is a safety net; primary updates happen on:
- approval create / resolution
- task github_pr_url updates
"""
from __future__ import annotations
import time
from datetime import datetime, timedelta, timezone
from redis import Redis
from rq_scheduler import Scheduler # type: ignore[import-untyped]
from app.core.config import settings
from app.core.logging import get_logger
from app.services.github.worker import run_reconcile_mission_control_approval_checks
logger = get_logger(__name__)
def bootstrap_mission_control_approval_check_schedule(
interval_seconds: int | None = None,
*,
max_attempts: int = 5,
retry_sleep_seconds: float = 1.0,
) -> None:
"""Register a recurring reconciliation job for GitHub approval checks."""
effective_interval_seconds = (
settings.github_approval_check_schedule_interval_seconds
if interval_seconds is None
else interval_seconds
)
last_exc: Exception | None = None
for attempt in range(1, max_attempts + 1):
try:
connection = Redis.from_url(settings.webhook_redis_url)
connection.ping()
scheduler = Scheduler(
queue_name=settings.webhook_queue_name,
connection=connection,
)
for job in scheduler.get_jobs():
if job.id == settings.github_approval_check_schedule_id:
scheduler.cancel(job)
scheduler.schedule(
datetime.now(tz=timezone.utc) + timedelta(seconds=10),
func=run_reconcile_mission_control_approval_checks,
interval=effective_interval_seconds,
repeat=None,
id=settings.github_approval_check_schedule_id,
queue_name=settings.webhook_queue_name,
)
logger.info(
"github.approval_check.scheduler.bootstrapped",
extra={
"schedule_id": settings.github_approval_check_schedule_id,
"queue_name": settings.webhook_queue_name,
"interval_seconds": effective_interval_seconds,
},
)
return
except Exception as exc:
last_exc = exc
logger.warning(
"github.approval_check.scheduler.bootstrap_failed",
extra={
"attempt": attempt,
"max_attempts": max_attempts,
"error": str(exc),
},
)
if attempt < max_attempts:
time.sleep(retry_sleep_seconds * attempt)
raise RuntimeError("Failed to bootstrap GitHub approval check schedule") from last_exc

View File

@@ -0,0 +1,30 @@
"""RQ worker entrypoints for GitHub check reconciliation."""
from __future__ import annotations
import asyncio
import time
from app.core.logging import get_logger
from app.services.github.mission_control_approval_check import (
github_approval_check_enabled,
reconcile_mission_control_approval_checks_for_all_boards,
)
logger = get_logger(__name__)
def run_reconcile_mission_control_approval_checks() -> None:
"""RQ entrypoint for periodically reconciling mission-control/approval checks."""
if not github_approval_check_enabled():
logger.info("github.approval_check.reconcile.skipped_missing_token")
return
start = time.time()
logger.info("github.approval_check.reconcile.started")
count = asyncio.run(reconcile_mission_control_approval_checks_for_all_boards())
elapsed_ms = int((time.time() - start) * 1000)
logger.info(
"github.approval_check.reconcile.finished",
extra={"duration_ms": elapsed_ms, "pr_urls": count},
)

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
from app.services.github.client import parse_pull_request_url
def test_parse_pull_request_url_empty_is_none() -> None:
assert parse_pull_request_url("") is None
assert parse_pull_request_url(" ") is None
def test_parse_pull_request_url_non_github_is_none() -> None:
assert parse_pull_request_url("https://example.com/a/b/pull/1") is None
assert parse_pull_request_url("https://github.com/a/b/issues/1") is None
def test_parse_pull_request_url_valid_https_returns_canonical() -> None:
pr = parse_pull_request_url("https://github.com/acme/widgets/pull/123")
assert pr is not None
assert pr.owner == "acme"
assert pr.repo == "widgets"
assert pr.number == 123
assert pr.url == "https://github.com/acme/widgets/pull/123"
def test_parse_pull_request_url_http_normalizes_to_canonical() -> None:
pr = parse_pull_request_url("http://github.com/acme/widgets/pull/123")
assert pr is not None
assert pr.url == "https://github.com/acme/widgets/pull/123"
def test_parse_pull_request_url_invalid_number_is_none() -> None:
assert parse_pull_request_url("https://github.com/acme/widgets/pull/0") is None
assert parse_pull_request_url("https://github.com/acme/widgets/pull/-1") is None
assert parse_pull_request_url("https://github.com/acme/widgets/pull/not-a-number") is None
def test_parse_pull_request_url_extra_segments_still_parses() -> None:
pr = parse_pull_request_url("https://github.com/acme/widgets/pull/123/files")
assert pr is not None
assert pr.url == "https://github.com/acme/widgets/pull/123"

View File

@@ -0,0 +1,416 @@
from __future__ import annotations
from uuid import uuid4
import pytest
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.approval_task_links import ApprovalTaskLink
from app.models.approvals import Approval
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organizations import Organization
from app.models.task_custom_fields import TaskCustomFieldDefinition, TaskCustomFieldValue
from app.models.tasks import Task
from app.services.github.mission_control_approval_check import (
REQUIRED_ACTION_TYPES,
evaluate_approval_gate_for_pr_url,
)
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.connect() as conn, conn.begin():
await conn.run_sync(SQLModel.metadata.create_all)
return engine
async def _make_session(engine: AsyncEngine) -> AsyncSession:
return AsyncSession(engine, expire_on_commit=False)
@pytest.mark.asyncio
async def test_approval_gate_no_task_linked_is_missing() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
)
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
)
)
session.add(
TaskCustomFieldDefinition(
organization_id=org_id,
field_key="github_pr_url",
label="GitHub PR URL",
field_type="url",
)
)
await session.commit()
out = await evaluate_approval_gate_for_pr_url(
session,
board_id=board_id,
pr_url="https://github.com/acme/repo/pull/1",
)
assert out.outcome == "missing"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_approval_gate_multiple_tasks_is_multiple() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
task_id_1 = uuid4()
task_id_2 = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
)
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
)
)
field = TaskCustomFieldDefinition(
organization_id=org_id,
field_key="github_pr_url",
label="GitHub PR URL",
field_type="url",
)
session.add(field)
session.add(Task(id=task_id_1, board_id=board_id, title="t1", description="", status="inbox"))
session.add(Task(id=task_id_2, board_id=board_id, title="t2", description="", status="inbox"))
await session.commit()
session.add(
TaskCustomFieldValue(
task_id=task_id_1,
task_custom_field_definition_id=field.id,
value="https://github.com/acme/repo/pull/2",
)
)
session.add(
TaskCustomFieldValue(
task_id=task_id_2,
task_custom_field_definition_id=field.id,
value="https://github.com/acme/repo/pull/2",
)
)
await session.commit()
out = await evaluate_approval_gate_for_pr_url(
session,
board_id=board_id,
pr_url="https://github.com/acme/repo/pull/2",
)
assert out.outcome == "multiple"
assert len(out.task_ids) == 2
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_approval_gate_pending_is_pending() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
)
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
)
)
field = TaskCustomFieldDefinition(
organization_id=org_id,
field_key="github_pr_url",
label="GitHub PR URL",
field_type="url",
)
session.add(field)
session.add(Task(id=task_id, board_id=board_id, title="t", description="", status="inbox"))
await session.commit()
session.add(
TaskCustomFieldValue(
task_id=task_id,
task_custom_field_definition_id=field.id,
value="https://github.com/acme/repo/pull/3",
)
)
approval = Approval(
board_id=board_id,
task_id=task_id,
action_type=sorted(REQUIRED_ACTION_TYPES)[0],
confidence=90,
status="pending",
)
session.add(approval)
await session.commit()
out = await evaluate_approval_gate_for_pr_url(
session,
board_id=board_id,
pr_url="https://github.com/acme/repo/pull/3",
)
assert out.outcome == "pending"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_approval_gate_approved_is_success() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
)
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
)
)
field = TaskCustomFieldDefinition(
organization_id=org_id,
field_key="github_pr_url",
label="GitHub PR URL",
field_type="url",
)
session.add(field)
session.add(Task(id=task_id, board_id=board_id, title="t", description="", status="review"))
await session.commit()
session.add(
TaskCustomFieldValue(
task_id=task_id,
task_custom_field_definition_id=field.id,
value="https://github.com/acme/repo/pull/4",
)
)
approval = Approval(
board_id=board_id,
task_id=None,
action_type=sorted(REQUIRED_ACTION_TYPES)[0],
confidence=90,
status="approved",
)
session.add(approval)
await session.commit()
session.add(ApprovalTaskLink(approval_id=approval.id, task_id=task_id))
await session.commit()
out = await evaluate_approval_gate_for_pr_url(
session,
board_id=board_id,
pr_url="https://github.com/acme/repo/pull/4",
)
assert out.outcome == "success"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_approval_gate_rejected_is_rejected() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
)
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
)
)
field = TaskCustomFieldDefinition(
organization_id=org_id,
field_key="github_pr_url",
label="GitHub PR URL",
field_type="url",
)
session.add(field)
session.add(Task(id=task_id, board_id=board_id, title="t", description="", status="review"))
await session.commit()
session.add(
TaskCustomFieldValue(
task_id=task_id,
task_custom_field_definition_id=field.id,
value="https://github.com/acme/repo/pull/5",
)
)
approval = Approval(
board_id=board_id,
task_id=task_id,
action_type=sorted(REQUIRED_ACTION_TYPES)[0],
confidence=90,
status="rejected",
)
session.add(approval)
await session.commit()
out = await evaluate_approval_gate_for_pr_url(
session,
board_id=board_id,
pr_url="https://github.com/acme/repo/pull/5",
)
assert out.outcome == "rejected"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_approval_gate_non_qualifying_action_type_is_missing() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
)
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
)
)
field = TaskCustomFieldDefinition(
organization_id=org_id,
field_key="github_pr_url",
label="GitHub PR URL",
field_type="url",
)
session.add(field)
session.add(Task(id=task_id, board_id=board_id, title="t", description="", status="review"))
await session.commit()
session.add(
TaskCustomFieldValue(
task_id=task_id,
task_custom_field_definition_id=field.id,
value="https://github.com/acme/repo/pull/6",
)
)
# approval exists but wrong action_type
session.add(
Approval(
board_id=board_id,
task_id=task_id,
action_type="some_other_action",
confidence=50,
status="approved",
)
)
await session.commit()
out = await evaluate_approval_gate_for_pr_url(
session,
board_id=board_id,
pr_url="https://github.com/acme/repo/pull/6",
)
assert out.outcome == "missing"
finally:
await engine.dispose()

View File

@@ -95,7 +95,7 @@ services:
- sh
- -c
- |
python -c "from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule; bootstrap_webhook_dispatch_schedule()" && \
python -c "from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule; from app.services.github.scheduler import bootstrap_mission_control_approval_check_schedule; bootstrap_webhook_dispatch_schedule(); bootstrap_mission_control_approval_check_schedule()" && \
rqscheduler -u "${WEBHOOK_REDIS_URL:-redis://redis:6379/0}" -i 60
depends_on:
- redis
@@ -105,6 +105,8 @@ services:
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: ${WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS:-900}
GITHUB_APPROVAL_CHECK_SCHEDULE_ID: ${GITHUB_APPROVAL_CHECK_SCHEDULE_ID:-mission-control-approval-check-reconcile}
GITHUB_APPROVAL_CHECK_SCHEDULE_INTERVAL_SECONDS: ${GITHUB_APPROVAL_CHECK_SCHEDULE_INTERVAL_SECONDS:-900}
restart: unless-stopped
volumes: