diff --git a/backend/app/api/approvals.py b/backend/app/api/approvals.py index e904e8fe..0a25cd28 100644 --- a/backend/app/api/approvals.py +++ b/backend/app/api/approvals.py @@ -20,12 +20,16 @@ from app.api.deps import ( get_board_for_user_write, require_admin_or_agent, ) +from app.core.logging import get_logger from app.core.time import utcnow from app.db.pagination import paginate from app.db.session import async_session_maker, get_session +from app.models.agents import Agent from app.models.approvals import Approval from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalStatus, ApprovalUpdate from app.schemas.pagination import DefaultLimitOffsetPage +from app.services.activity_log import record_activity +from app.services.openclaw.gateway_dispatch import GatewayDispatchService if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -36,6 +40,7 @@ if TYPE_CHECKING: from app.models.boards import Board router = APIRouter(prefix="/boards/{board_id}/approvals", tags=["approvals"]) +logger = get_logger(__name__) TASK_ID_KEYS: tuple[str, ...] = ("task_id", "taskId", "taskID") STREAM_POLL_SECONDS = 2 @@ -90,6 +95,83 @@ def _serialize_approval(approval: Approval) -> dict[str, object]: ).model_dump(mode="json") +def _approval_resolution_message( + *, + board: Board, + approval: Approval, +) -> str: + status_text = "approved" if approval.status == "approved" else "rejected" + lines = [ + "APPROVAL RESOLVED", + f"Board: {board.name}", + f"Approval ID: {approval.id}", + f"Action: {approval.action_type}", + f"Decision: {status_text}", + f"Confidence: {approval.confidence}", + ] + if approval.task_id is not None: + lines.append(f"Task ID: {approval.task_id}") + lines.append("") + lines.append("Take action: continue execution using the final approval decision.") + return "\n".join(lines) + + +async def _resolve_board_lead( + session: AsyncSession, + *, + board_id: UUID, +) -> Agent | None: + return ( + await Agent.objects.filter_by(board_id=board_id) + .filter(col(Agent.is_board_lead).is_(True)) + .first(session) + ) + + +async def _notify_lead_on_approval_resolution( + *, + session: AsyncSession, + board: Board, + approval: Approval, +) -> None: + if approval.status not in {"approved", "rejected"}: + return + lead = await _resolve_board_lead(session, board_id=board.id) + if lead is None or not lead.openclaw_session_id: + return + + dispatch = GatewayDispatchService(session) + config = await dispatch.optional_gateway_config_for_board(board) + if config is None: + return + + message = _approval_resolution_message(board=board, approval=approval) + error = await dispatch.try_send_agent_message( + session_key=lead.openclaw_session_id, + config=config, + agent_name=lead.name, + message=message, + deliver=False, + ) + if error is None: + record_activity( + session, + event_type="approval.lead_notified", + message=f"Lead agent notified for {approval.status} approval {approval.id}.", + agent_id=lead.id, + task_id=approval.task_id, + ) + else: + record_activity( + session, + event_type="approval.lead_notify_failed", + message=f"Lead notify failed for approval {approval.id}: {error}", + agent_id=lead.id, + task_id=approval.task_id, + ) + await session.commit() + + async def _fetch_approval_events( session: AsyncSession, board_id: UUID, @@ -238,6 +320,7 @@ async def update_approval( if approval is None or approval.board_id != board.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) updates = payload.model_dump(exclude_unset=True) + prior_status = approval.status if "status" in updates: approval.status = updates["status"] if approval.status != "pending": @@ -245,4 +328,18 @@ async def update_approval( session.add(approval) await session.commit() await session.refresh(approval) + if approval.status in {"approved", "rejected"} and approval.status != prior_status: + try: + await _notify_lead_on_approval_resolution( + session=session, + board=board, + approval=approval, + ) + except Exception: + logger.exception( + "approval.lead_notify_unexpected board_id=%s approval_id=%s status=%s", + board.id, + approval.id, + approval.status, + ) return approval diff --git a/backend/tests/test_approvals_lead_notifications.py b/backend/tests/test_approvals_lead_notifications.py new file mode 100644 index 00000000..e79d399f --- /dev/null +++ b/backend/tests/test_approvals_lead_notifications.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any +from uuid import UUID, uuid4 + +import pytest + +from app.api import approvals +from app.models.agents import Agent +from app.models.approvals import Approval +from app.models.boards import Board +from app.schemas.approvals import ApprovalUpdate +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig + + +class _ByIdQuery: + def __init__(self, approval: Approval | None) -> None: + self._approval = approval + + async def first(self, _session: object) -> Approval | None: + return self._approval + + +class _ApprovalObjects: + def __init__(self, approval: Approval | None) -> None: + self._approval = approval + + def by_id(self, _approval_id: str) -> _ByIdQuery: + return _ByIdQuery(self._approval) + + +@dataclass +class _FakeSession: + commits: int = 0 + refreshed: int = 0 + added: list[object] = None # type: ignore[assignment] + + def __post_init__(self) -> None: + if self.added is None: + self.added = [] + + def add(self, value: object) -> None: + self.added.append(value) + + async def commit(self) -> None: + self.commits += 1 + + async def refresh(self, _value: object) -> None: + self.refreshed += 1 + + +def _board() -> Board: + return Board( + id=uuid4(), + organization_id=uuid4(), + name="Ops", + slug="ops", + ) + + +def _approval(*, board_id: UUID, status: str = "pending") -> Approval: + return Approval( + id=uuid4(), + board_id=board_id, + action_type="task.execute", + confidence=91, + status=status, + payload={"target": "deployment"}, + ) + + +@pytest.mark.asyncio +async def test_update_approval_notifies_lead_when_approved( + monkeypatch: pytest.MonkeyPatch, +) -> None: + board = _board() + approval = _approval(board_id=board.id, status="pending") + lead = Agent( + id=uuid4(), + board_id=board.id, + gateway_id=uuid4(), + name="Lead Agent", + is_board_lead=True, + openclaw_session_id="agent:lead:session", + ) + session = _FakeSession() + captured: dict[str, Any] = {} + + fake_approval_model = type("FakeApprovalModel", (), {"objects": _ApprovalObjects(approval)}) + monkeypatch.setattr(approvals, "Approval", fake_approval_model) + + async def _fake_resolve_lead(*_args: Any, **_kwargs: Any) -> Agent: + return lead + + async def _fake_optional_gateway_config_for_board( + self: approvals.GatewayDispatchService, + _board: Board, + ) -> GatewayClientConfig: + _ = self + return GatewayClientConfig(url="ws://gateway.example/ws", token=None) + + async def _fake_try_send_agent_message( + self: approvals.GatewayDispatchService, + **kwargs: Any, + ) -> None: + _ = self + captured.update(kwargs) + return None + + monkeypatch.setattr(approvals, "_resolve_board_lead", _fake_resolve_lead) + monkeypatch.setattr( + approvals.GatewayDispatchService, + "optional_gateway_config_for_board", + _fake_optional_gateway_config_for_board, + ) + monkeypatch.setattr( + approvals.GatewayDispatchService, + "try_send_agent_message", + _fake_try_send_agent_message, + ) + + updated = await approvals.update_approval( + approval_id=str(approval.id), + payload=ApprovalUpdate(status="approved"), + board=board, + session=session, # type: ignore[arg-type] + ) + + assert updated.status == "approved" + assert captured["session_key"] == "agent:lead:session" + assert captured["agent_name"] == "Lead Agent" + assert "APPROVAL RESOLVED" in captured["message"] + assert "Decision: approved" in captured["message"] + + event_types = [item.event_type for item in session.added if hasattr(item, "event_type")] + assert "approval.lead_notified" in event_types + assert session.commits >= 2 + + +@pytest.mark.asyncio +async def test_update_approval_skips_notify_when_status_not_resolved( + monkeypatch: pytest.MonkeyPatch, +) -> None: + board = _board() + approval = _approval(board_id=board.id, status="pending") + session = _FakeSession() + called = {"notify": 0} + + fake_approval_model = type("FakeApprovalModel", (), {"objects": _ApprovalObjects(approval)}) + monkeypatch.setattr(approvals, "Approval", fake_approval_model) + + async def _fake_notify(**_kwargs: Any) -> None: + called["notify"] += 1 + + monkeypatch.setattr(approvals, "_notify_lead_on_approval_resolution", _fake_notify) + + updated = await approvals.update_approval( + approval_id=str(approval.id), + payload=ApprovalUpdate(status="pending"), + board=board, + session=session, # type: ignore[arg-type] + ) + + assert updated.status == "pending" + assert called["notify"] == 0 + + +def test_approval_resolution_message_uses_rejected_enum_value() -> None: + board = _board() + approval = _approval(board_id=board.id, status="rejected") + message = approvals._approval_resolution_message(board=board, approval=approval) + assert "APPROVAL RESOLVED" in message + assert f"Approval ID: {approval.id}" in message + assert "Decision: rejected" in message