410 lines
13 KiB
Python
410 lines
13 KiB
Python
"""Mission Control approval gate → GitHub required check.
|
|
|
|
This module maintains a GitHub Check Run (recommended) named:
|
|
- `mission-control/approval`
|
|
|
|
The check is intended to be added to GitHub ruleset required checks so PRs
|
|
cannot merge unless the corresponding Mission Control task has an approved
|
|
in-app approval.
|
|
|
|
Mapping:
|
|
- PR → Task: by `custom_field_values.github_pr_url` exact match.
|
|
- Task → Approval: any linked Approval rows with status in {pending, approved, rejected}.
|
|
|
|
Triggers (implemented via API hooks):
|
|
- approval created / resolved
|
|
- task github_pr_url updated
|
|
|
|
A periodic reconciliation job should call the sync functions as a safety net.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Literal, cast
|
|
from uuid import UUID
|
|
|
|
from sqlmodel import col, select
|
|
|
|
from app.core.config import settings
|
|
from app.core.logging import get_logger
|
|
from app.db.session import async_session_maker
|
|
from app.models.approval_task_links import ApprovalTaskLink
|
|
from app.models.approvals import Approval
|
|
from app.models.boards import Board
|
|
from app.models.task_custom_fields import TaskCustomFieldDefinition, TaskCustomFieldValue
|
|
from app.models.tasks import Task
|
|
from app.services.github.client import (
|
|
GitHubClientError,
|
|
get_pull_request_head_sha,
|
|
parse_pull_request_url,
|
|
upsert_check_run,
|
|
)
|
|
|
|
if False: # pragma: no cover
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
CHECK_NAME = "mission-control/approval"
|
|
|
|
# Default action types that qualify as a "merge gate" approval.
|
|
# (Action types are free-form today; keep this conservative but configurable later.)
|
|
REQUIRED_ACTION_TYPES = {"mark_done", "mark_task_done"}
|
|
|
|
|
|
CheckOutcome = Literal["success", "pending", "rejected", "missing", "error", "multiple"]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ApprovalGateEvaluation:
|
|
outcome: CheckOutcome
|
|
task_ids: tuple[UUID, ...] = ()
|
|
summary: str = ""
|
|
|
|
|
|
async def _board_org_id(session: AsyncSession, *, board_id: UUID) -> UUID | None:
|
|
return (
|
|
await session.exec(
|
|
select(col(Board.organization_id)).where(col(Board.id) == board_id),
|
|
)
|
|
).first()
|
|
|
|
|
|
async def _tasks_for_pr_url(
|
|
session: AsyncSession,
|
|
*,
|
|
board_id: UUID,
|
|
pr_url: str,
|
|
) -> list[Task]:
|
|
org_id = await _board_org_id(session, board_id=board_id)
|
|
if org_id is None:
|
|
return []
|
|
|
|
statement = (
|
|
select(Task)
|
|
.join(TaskCustomFieldValue, col(TaskCustomFieldValue.task_id) == col(Task.id))
|
|
.join(
|
|
TaskCustomFieldDefinition,
|
|
col(TaskCustomFieldDefinition.id)
|
|
== col(TaskCustomFieldValue.task_custom_field_definition_id),
|
|
)
|
|
.where(col(Task.board_id) == board_id)
|
|
.where(col(TaskCustomFieldDefinition.organization_id) == org_id)
|
|
.where(col(TaskCustomFieldDefinition.field_key) == "github_pr_url")
|
|
.where(col(TaskCustomFieldValue.value) == pr_url)
|
|
.order_by(col(Task.created_at).asc())
|
|
)
|
|
rows = list(await session.exec(statement))
|
|
return [row for row in rows if isinstance(row, Task)]
|
|
|
|
|
|
async def _approval_rows_for_task(
|
|
session: AsyncSession,
|
|
*,
|
|
board_id: UUID,
|
|
task_id: UUID,
|
|
) -> list[Approval]:
|
|
# Linked approvals (new style)
|
|
linked_stmt = (
|
|
select(Approval)
|
|
.join(ApprovalTaskLink, col(ApprovalTaskLink.approval_id) == col(Approval.id))
|
|
.where(col(Approval.board_id) == board_id)
|
|
.where(col(ApprovalTaskLink.task_id) == task_id)
|
|
.order_by(col(Approval.created_at).asc())
|
|
)
|
|
linked = list(await session.exec(linked_stmt))
|
|
|
|
# Legacy approvals (Approval.task_id) not linked via ApprovalTaskLink
|
|
legacy_stmt = (
|
|
select(Approval)
|
|
.where(col(Approval.board_id) == board_id)
|
|
.where(col(Approval.task_id) == task_id)
|
|
.order_by(col(Approval.created_at).asc())
|
|
)
|
|
legacy = list(await session.exec(legacy_stmt))
|
|
|
|
# Merge unique by id
|
|
by_id: dict[UUID, Approval] = {}
|
|
for approval in [*linked, *legacy]:
|
|
if isinstance(approval, Approval):
|
|
by_id.setdefault(approval.id, approval)
|
|
return list(by_id.values())
|
|
|
|
|
|
def _qualifies_for_gate(approval: Approval) -> bool:
|
|
# If action types evolve, we can broaden this; for now keep it anchored.
|
|
return approval.action_type in REQUIRED_ACTION_TYPES
|
|
|
|
|
|
async def evaluate_approval_gate_for_pr_url(
|
|
session: AsyncSession,
|
|
*,
|
|
board_id: UUID,
|
|
pr_url: str,
|
|
) -> ApprovalGateEvaluation:
|
|
tasks = await _tasks_for_pr_url(session, board_id=board_id, pr_url=pr_url)
|
|
if not tasks:
|
|
return ApprovalGateEvaluation(
|
|
outcome="missing",
|
|
task_ids=(),
|
|
summary=(
|
|
"No Mission Control task is linked to this PR. Set the task custom field "
|
|
"`github_pr_url` to this PR URL."
|
|
),
|
|
)
|
|
if len(tasks) > 1:
|
|
return ApprovalGateEvaluation(
|
|
outcome="multiple",
|
|
task_ids=tuple(task.id for task in tasks),
|
|
summary=(
|
|
"Multiple Mission Control tasks are linked to this PR URL. "
|
|
"Ensure exactly one task has `github_pr_url` set to this PR."
|
|
),
|
|
)
|
|
|
|
task = tasks[0]
|
|
approvals = await _approval_rows_for_task(session, board_id=board_id, task_id=task.id)
|
|
gate_approvals = [a for a in approvals if _qualifies_for_gate(a)]
|
|
|
|
if not gate_approvals:
|
|
return ApprovalGateEvaluation(
|
|
outcome="missing",
|
|
task_ids=(task.id,),
|
|
summary=(
|
|
"No qualifying approval found for this task. Create an approval request "
|
|
f"(action_type in {sorted(REQUIRED_ACTION_TYPES)})."
|
|
),
|
|
)
|
|
|
|
statuses = [str(a.status) for a in gate_approvals]
|
|
if any(s == "approved" for s in statuses):
|
|
return ApprovalGateEvaluation(
|
|
outcome="success",
|
|
task_ids=(task.id,),
|
|
summary="Approval is approved. Merge is permitted.",
|
|
)
|
|
if any(s == "rejected" for s in statuses):
|
|
return ApprovalGateEvaluation(
|
|
outcome="rejected",
|
|
task_ids=(task.id,),
|
|
summary="Approval was rejected. Merge is blocked until a new approval is granted.",
|
|
)
|
|
if any(s == "pending" for s in statuses):
|
|
return ApprovalGateEvaluation(
|
|
outcome="pending",
|
|
task_ids=(task.id,),
|
|
summary="Approval is pending. Merge is blocked until approved.",
|
|
)
|
|
|
|
return ApprovalGateEvaluation(
|
|
outcome="error",
|
|
task_ids=(task.id,),
|
|
summary=f"Unexpected approval statuses: {sorted(set(statuses))}",
|
|
)
|
|
|
|
|
|
async def sync_github_approval_check_for_pr_url(
|
|
session: AsyncSession,
|
|
*,
|
|
board_id: UUID,
|
|
pr_url: str,
|
|
) -> None:
|
|
"""Upsert the GitHub check run for a PR URL based on Mission Control approval state."""
|
|
|
|
parsed = parse_pull_request_url(pr_url)
|
|
if parsed is None:
|
|
logger.warning(
|
|
"github.approval_check.invalid_pr_url",
|
|
extra={"board_id": str(board_id), "pr_url": pr_url},
|
|
)
|
|
return
|
|
|
|
try:
|
|
evaluation = await evaluate_approval_gate_for_pr_url(
|
|
session,
|
|
board_id=board_id,
|
|
pr_url=pr_url,
|
|
)
|
|
|
|
head_sha = await get_pull_request_head_sha(parsed)
|
|
|
|
title = "Mission Control approval gate"
|
|
summary_lines = [
|
|
f"PR: {parsed.url}",
|
|
f"Board: {board_id}",
|
|
]
|
|
if evaluation.task_ids:
|
|
summary_lines.append("Task(s): " + ", ".join(str(tid) for tid in evaluation.task_ids))
|
|
summary_lines.append("")
|
|
summary_lines.append(evaluation.summary)
|
|
|
|
if evaluation.outcome == "success":
|
|
await upsert_check_run(
|
|
owner=parsed.owner,
|
|
repo=parsed.repo,
|
|
head_sha=head_sha,
|
|
check_name=CHECK_NAME,
|
|
status="completed",
|
|
conclusion="success",
|
|
title=title,
|
|
summary="\n".join(summary_lines),
|
|
)
|
|
return
|
|
|
|
if evaluation.outcome == "pending":
|
|
# Keep as in_progress to clearly signal it's waiting.
|
|
await upsert_check_run(
|
|
owner=parsed.owner,
|
|
repo=parsed.repo,
|
|
head_sha=head_sha,
|
|
check_name=CHECK_NAME,
|
|
status="in_progress",
|
|
conclusion=None,
|
|
title=title,
|
|
summary="\n".join(summary_lines),
|
|
)
|
|
return
|
|
|
|
# failure-like outcomes
|
|
await upsert_check_run(
|
|
owner=parsed.owner,
|
|
repo=parsed.repo,
|
|
head_sha=head_sha,
|
|
check_name=CHECK_NAME,
|
|
status="completed",
|
|
conclusion="failure",
|
|
title=title,
|
|
summary="\n".join(summary_lines),
|
|
)
|
|
|
|
except GitHubClientError as exc:
|
|
logger.warning(
|
|
"github.approval_check.github_error",
|
|
extra={"board_id": str(board_id), "pr_url": pr_url, "error": str(exc)},
|
|
)
|
|
except Exception as exc:
|
|
logger.exception(
|
|
"github.approval_check.unexpected",
|
|
extra={"board_id": str(board_id), "pr_url": pr_url, "error": str(exc)},
|
|
)
|
|
|
|
|
|
async def sync_github_approval_check_for_task_ids(
|
|
session: AsyncSession,
|
|
*,
|
|
board_id: UUID,
|
|
task_ids: list[UUID],
|
|
) -> None:
|
|
"""Sync approval checks for any tasks that have github_pr_url set.
|
|
|
|
Used by approval hooks (one approval can link multiple tasks).
|
|
"""
|
|
|
|
if not task_ids:
|
|
return
|
|
|
|
# Load custom-field values for these tasks and find github_pr_url.
|
|
# We reuse the same join approach but filter by task ids.
|
|
org_id = await _board_org_id(session, board_id=board_id)
|
|
if org_id is None:
|
|
return
|
|
|
|
stmt = (
|
|
select(col(TaskCustomFieldValue.task_id), col(TaskCustomFieldValue.value))
|
|
.join(
|
|
TaskCustomFieldDefinition,
|
|
col(TaskCustomFieldDefinition.id)
|
|
== col(TaskCustomFieldValue.task_custom_field_definition_id),
|
|
)
|
|
.where(col(TaskCustomFieldDefinition.organization_id) == org_id)
|
|
.where(col(TaskCustomFieldDefinition.field_key) == "github_pr_url")
|
|
.where(col(TaskCustomFieldValue.task_id).in_(task_ids))
|
|
)
|
|
rows = list(await session.exec(stmt))
|
|
|
|
pr_urls: set[str] = set()
|
|
for _task_id, value in rows:
|
|
if isinstance(value, str) and value.strip():
|
|
pr_urls.add(value.strip())
|
|
|
|
for pr_url in sorted(pr_urls):
|
|
await sync_github_approval_check_for_pr_url(session, board_id=board_id, pr_url=pr_url)
|
|
|
|
|
|
async def reconcile_github_approval_checks_for_board(
|
|
session: AsyncSession,
|
|
*,
|
|
board_id: UUID,
|
|
) -> int:
|
|
"""Periodic reconciliation safety net.
|
|
|
|
Returns number of distinct PR URLs processed.
|
|
|
|
Intended to be run by a cron/worker periodically.
|
|
"""
|
|
|
|
org_id = await _board_org_id(session, board_id=board_id)
|
|
if org_id is None:
|
|
return 0
|
|
|
|
stmt = (
|
|
select(col(TaskCustomFieldValue.value))
|
|
.join(
|
|
TaskCustomFieldDefinition,
|
|
col(TaskCustomFieldDefinition.id)
|
|
== col(TaskCustomFieldValue.task_custom_field_definition_id),
|
|
)
|
|
.join(Task, col(Task.id) == col(TaskCustomFieldValue.task_id))
|
|
.where(col(Task.board_id) == board_id)
|
|
.where(col(TaskCustomFieldDefinition.organization_id) == org_id)
|
|
.where(col(TaskCustomFieldDefinition.field_key) == "github_pr_url")
|
|
)
|
|
raw_rows = list(await session.exec(stmt))
|
|
rows = cast(list[tuple[object]], raw_rows)
|
|
|
|
pr_urls: set[str] = set()
|
|
for (value,) in rows:
|
|
if isinstance(value, str) and value.strip():
|
|
pr_urls.add(value.strip())
|
|
|
|
for pr_url in sorted(pr_urls):
|
|
await sync_github_approval_check_for_pr_url(session, board_id=board_id, pr_url=pr_url)
|
|
|
|
return len(pr_urls)
|
|
|
|
|
|
async def reconcile_mission_control_approval_checks_for_all_boards() -> int:
|
|
"""Reconcile approval checks for every board.
|
|
|
|
Returns total number of distinct PR URLs processed across boards.
|
|
|
|
This is intentionally a safety net: the primary, low-latency updates happen on
|
|
approval create/resolution and task github_pr_url updates.
|
|
"""
|
|
|
|
async with async_session_maker() as session:
|
|
raw_board_ids = list(
|
|
await session.exec(
|
|
select(col(Board.id)).order_by(col(Board.created_at).asc()),
|
|
),
|
|
)
|
|
board_ids = [value for value in raw_board_ids if isinstance(value, UUID)]
|
|
processed = 0
|
|
for board_id in board_ids:
|
|
try:
|
|
processed += await reconcile_github_approval_checks_for_board(
|
|
session,
|
|
board_id=board_id,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"github.approval_check.reconcile.board_failed",
|
|
extra={"board_id": str(board_id)},
|
|
)
|
|
return processed
|
|
|
|
|
|
def github_approval_check_enabled() -> bool:
|
|
return bool((settings.github_token or "").strip())
|