feat: implement lead notification on approval resolution and enhance logging

This commit is contained in:
Abhimanyu Saharan
2026-02-11 16:56:35 +05:30
parent c3f849ddb1
commit a954a6ae99
2 changed files with 272 additions and 0 deletions

View File

@@ -20,12 +20,16 @@ from app.api.deps import (
get_board_for_user_write,
require_admin_or_agent,
)
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db.pagination import paginate
from app.db.session import async_session_maker, get_session
from app.models.agents import Agent
from app.models.approvals import Approval
from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalStatus, ApprovalUpdate
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.activity_log import record_activity
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
if TYPE_CHECKING:
from collections.abc import AsyncIterator
@@ -36,6 +40,7 @@ if TYPE_CHECKING:
from app.models.boards import Board
router = APIRouter(prefix="/boards/{board_id}/approvals", tags=["approvals"])
logger = get_logger(__name__)
TASK_ID_KEYS: tuple[str, ...] = ("task_id", "taskId", "taskID")
STREAM_POLL_SECONDS = 2
@@ -90,6 +95,83 @@ def _serialize_approval(approval: Approval) -> dict[str, object]:
).model_dump(mode="json")
def _approval_resolution_message(
*,
board: Board,
approval: Approval,
) -> str:
status_text = "approved" if approval.status == "approved" else "rejected"
lines = [
"APPROVAL RESOLVED",
f"Board: {board.name}",
f"Approval ID: {approval.id}",
f"Action: {approval.action_type}",
f"Decision: {status_text}",
f"Confidence: {approval.confidence}",
]
if approval.task_id is not None:
lines.append(f"Task ID: {approval.task_id}")
lines.append("")
lines.append("Take action: continue execution using the final approval decision.")
return "\n".join(lines)
async def _resolve_board_lead(
session: AsyncSession,
*,
board_id: UUID,
) -> Agent | None:
return (
await Agent.objects.filter_by(board_id=board_id)
.filter(col(Agent.is_board_lead).is_(True))
.first(session)
)
async def _notify_lead_on_approval_resolution(
*,
session: AsyncSession,
board: Board,
approval: Approval,
) -> None:
if approval.status not in {"approved", "rejected"}:
return
lead = await _resolve_board_lead(session, board_id=board.id)
if lead is None or not lead.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
message = _approval_resolution_message(board=board, approval=approval)
error = await dispatch.try_send_agent_message(
session_key=lead.openclaw_session_id,
config=config,
agent_name=lead.name,
message=message,
deliver=False,
)
if error is None:
record_activity(
session,
event_type="approval.lead_notified",
message=f"Lead agent notified for {approval.status} approval {approval.id}.",
agent_id=lead.id,
task_id=approval.task_id,
)
else:
record_activity(
session,
event_type="approval.lead_notify_failed",
message=f"Lead notify failed for approval {approval.id}: {error}",
agent_id=lead.id,
task_id=approval.task_id,
)
await session.commit()
async def _fetch_approval_events(
session: AsyncSession,
board_id: UUID,
@@ -238,6 +320,7 @@ async def update_approval(
if approval is None or approval.board_id != board.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
updates = payload.model_dump(exclude_unset=True)
prior_status = approval.status
if "status" in updates:
approval.status = updates["status"]
if approval.status != "pending":
@@ -245,4 +328,18 @@ async def update_approval(
session.add(approval)
await session.commit()
await session.refresh(approval)
if approval.status in {"approved", "rejected"} and approval.status != prior_status:
try:
await _notify_lead_on_approval_resolution(
session=session,
board=board,
approval=approval,
)
except Exception:
logger.exception(
"approval.lead_notify_unexpected board_id=%s approval_id=%s status=%s",
board.id,
approval.id,
approval.status,
)
return approval

View File

@@ -0,0 +1,175 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from uuid import UUID, uuid4
import pytest
from app.api import approvals
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.boards import Board
from app.schemas.approvals import ApprovalUpdate
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
class _ByIdQuery:
def __init__(self, approval: Approval | None) -> None:
self._approval = approval
async def first(self, _session: object) -> Approval | None:
return self._approval
class _ApprovalObjects:
def __init__(self, approval: Approval | None) -> None:
self._approval = approval
def by_id(self, _approval_id: str) -> _ByIdQuery:
return _ByIdQuery(self._approval)
@dataclass
class _FakeSession:
commits: int = 0
refreshed: int = 0
added: list[object] = None # type: ignore[assignment]
def __post_init__(self) -> None:
if self.added is None:
self.added = []
def add(self, value: object) -> None:
self.added.append(value)
async def commit(self) -> None:
self.commits += 1
async def refresh(self, _value: object) -> None:
self.refreshed += 1
def _board() -> Board:
return Board(
id=uuid4(),
organization_id=uuid4(),
name="Ops",
slug="ops",
)
def _approval(*, board_id: UUID, status: str = "pending") -> Approval:
return Approval(
id=uuid4(),
board_id=board_id,
action_type="task.execute",
confidence=91,
status=status,
payload={"target": "deployment"},
)
@pytest.mark.asyncio
async def test_update_approval_notifies_lead_when_approved(
monkeypatch: pytest.MonkeyPatch,
) -> None:
board = _board()
approval = _approval(board_id=board.id, status="pending")
lead = Agent(
id=uuid4(),
board_id=board.id,
gateway_id=uuid4(),
name="Lead Agent",
is_board_lead=True,
openclaw_session_id="agent:lead:session",
)
session = _FakeSession()
captured: dict[str, Any] = {}
fake_approval_model = type("FakeApprovalModel", (), {"objects": _ApprovalObjects(approval)})
monkeypatch.setattr(approvals, "Approval", fake_approval_model)
async def _fake_resolve_lead(*_args: Any, **_kwargs: Any) -> Agent:
return lead
async def _fake_optional_gateway_config_for_board(
self: approvals.GatewayDispatchService,
_board: Board,
) -> GatewayClientConfig:
_ = self
return GatewayClientConfig(url="ws://gateway.example/ws", token=None)
async def _fake_try_send_agent_message(
self: approvals.GatewayDispatchService,
**kwargs: Any,
) -> None:
_ = self
captured.update(kwargs)
return None
monkeypatch.setattr(approvals, "_resolve_board_lead", _fake_resolve_lead)
monkeypatch.setattr(
approvals.GatewayDispatchService,
"optional_gateway_config_for_board",
_fake_optional_gateway_config_for_board,
)
monkeypatch.setattr(
approvals.GatewayDispatchService,
"try_send_agent_message",
_fake_try_send_agent_message,
)
updated = await approvals.update_approval(
approval_id=str(approval.id),
payload=ApprovalUpdate(status="approved"),
board=board,
session=session, # type: ignore[arg-type]
)
assert updated.status == "approved"
assert captured["session_key"] == "agent:lead:session"
assert captured["agent_name"] == "Lead Agent"
assert "APPROVAL RESOLVED" in captured["message"]
assert "Decision: approved" in captured["message"]
event_types = [item.event_type for item in session.added if hasattr(item, "event_type")]
assert "approval.lead_notified" in event_types
assert session.commits >= 2
@pytest.mark.asyncio
async def test_update_approval_skips_notify_when_status_not_resolved(
monkeypatch: pytest.MonkeyPatch,
) -> None:
board = _board()
approval = _approval(board_id=board.id, status="pending")
session = _FakeSession()
called = {"notify": 0}
fake_approval_model = type("FakeApprovalModel", (), {"objects": _ApprovalObjects(approval)})
monkeypatch.setattr(approvals, "Approval", fake_approval_model)
async def _fake_notify(**_kwargs: Any) -> None:
called["notify"] += 1
monkeypatch.setattr(approvals, "_notify_lead_on_approval_resolution", _fake_notify)
updated = await approvals.update_approval(
approval_id=str(approval.id),
payload=ApprovalUpdate(status="pending"),
board=board,
session=session, # type: ignore[arg-type]
)
assert updated.status == "pending"
assert called["notify"] == 0
def test_approval_resolution_message_uses_rejected_enum_value() -> None:
board = _board()
approval = _approval(board_id=board.id, status="rejected")
message = approvals._approval_resolution_message(board=board, approval=approval)
assert "APPROVAL RESOLVED" in message
assert f"Approval ID: {approval.id}" in message
assert "Decision: rejected" in message