diff --git a/backend/app/api/boards.py b/backend/app/api/boards.py index 7fed49ac..b8994537 100644 --- a/backend/app/api/boards.py +++ b/backend/app/api/boards.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status @@ -16,10 +16,12 @@ from app.api.deps import ( require_org_admin, require_org_member, ) +from app.core.logging import get_logger from app.core.time import utcnow from app.db import crud from app.db.pagination import paginate from app.db.session import get_session +from app.models.agents import Agent from app.models.board_groups import BoardGroup from app.models.boards import Board from app.models.gateways import Gateway @@ -27,9 +29,13 @@ from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate from app.schemas.common import OkResponse from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.view_models import BoardGroupSnapshot, BoardSnapshot +from app.services.activity_log import record_activity from app.services.board_group_snapshot import build_board_group_snapshot from app.services.board_lifecycle import delete_board as delete_board_service from app.services.board_snapshot import build_board_snapshot +from app.services.openclaw.gateway_dispatch import GatewayDispatchService +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import OpenClawGatewayError from app.services.organizations import OrganizationContext, board_access_filter if TYPE_CHECKING: @@ -37,6 +43,7 @@ if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession router = APIRouter(prefix="/boards", tags=["boards"]) +logger = get_logger(__name__) SESSION_DEP = Depends(get_session) ORG_ADMIN_DEP = Depends(require_org_admin) @@ -156,6 +163,185 @@ async def _apply_board_update( return await crud.save(session, board) +def _board_group_change_message( + *, + action: Literal["join", "leave"], + changed_board: Board, + recipient_board: Board, + group: BoardGroup, +) -> str: + changed_label = "Joined Board" if action == "join" else "Left Board" + guidance = ( + "1) Use cross-board discussion when work spans multiple boards.\n" + "2) Check related board activity before acting on shared concerns.\n" + "3) Explicitly coordinate ownership to avoid duplicate or conflicting work.\n" + ) + if action == "leave": + guidance = ( + "1) Treat cross-board coordination with the departed board as inactive.\n" + "2) Re-check dependencies and ownership that previously spanned this board.\n" + "3) Confirm no in-flight handoffs still rely on the prior group link.\n" + ) + return ( + "BOARD GROUP UPDATED\n" + f"{changed_label}: {changed_board.name}\n" + f"{changed_label} ID: {changed_board.id}\n" + f"Recipient Board: {recipient_board.name}\n" + f"Recipient Board ID: {recipient_board.id}\n" + f"Board Group: {group.name}\n" + f"Board Group ID: {group.id}\n\n" + "Coordination guidance:\n" + f"{guidance}" + ) + + +async def _notify_agents_on_board_group_change( + *, + session: AsyncSession, + board: Board, + group: BoardGroup, + action: Literal["join", "leave"], +) -> None: + dispatch = GatewayDispatchService(session) + group_boards = await Board.objects.filter_by(board_group_id=group.id).all(session) + board_by_id = {item.id: item for item in group_boards} + board_by_id.setdefault(board.id, board) + board_ids = list(board_by_id.keys()) + if not board_ids: + return + agents = await Agent.objects.by_field_in("board_id", board_ids).all(session) + if not agents: + return + + config_by_board_id: dict[UUID, GatewayClientConfig] = {} + for group_board in board_by_id.values(): + config = await dispatch.optional_gateway_config_for_board(group_board) + if config is None: + logger.warning( + "board.group.%s.notify_skipped board_id=%s group_id=%s target_board_id=%s " + "reason=no_gateway_config", + action, + board.id, + group.id, + group_board.id, + ) + continue + config_by_board_id[group_board.id] = config + + if not config_by_board_id: + logger.warning( + "board.group.%s.notify_skipped board_id=%s group_id=%s reason=no_gateway_config_any_board", + action, + board.id, + group.id, + ) + return + + message_by_board_id = { + recipient_board_id: _board_group_change_message( + action=action, + changed_board=board, + recipient_board=recipient_board, + group=group, + ) + for recipient_board_id, recipient_board in board_by_id.items() + } + + notified = 0 + failed = 0 + skipped_missing_session = 0 + skipped_missing_config = 0 + skipped_missing_board = 0 + for agent in agents: + if not agent.openclaw_session_id: + skipped_missing_session += 1 + continue + if agent.board_id is None: + skipped_missing_board += 1 + continue + config = config_by_board_id.get(agent.board_id) + message = message_by_board_id.get(agent.board_id) + recipient_board = board_by_id.get(agent.board_id) + if config is None or message is None or recipient_board is None: + skipped_missing_config += 1 + continue + error = await dispatch.try_send_agent_message( + session_key=agent.openclaw_session_id, + config=config, + agent_name=agent.name, + message=message, + deliver=False, + ) + if error is None: + notified += 1 + record_activity( + session, + event_type=f"board.group.{action}.notified", + message=( + f"Board-group {action} notice sent to {agent.name} for board " + f"{recipient_board.name} related to {board.name} and {group.name}." + ), + agent_id=agent.id, + ) + else: + failed += 1 + record_activity( + session, + event_type=f"board.group.{action}.notify_failed", + message=( + f"Board-group {action} notify failed for {agent.name} on board " + f"{recipient_board.name}: {error}" + ), + agent_id=agent.id, + ) + + if notified or failed: + await session.commit() + logger.info( + "board.group.%s.notify_complete board_id=%s group_id=%s boards_total=%s agents_total=%s " + "agents_notified=%s agents_failed=%s agents_skipped_no_session=%s " + "agents_skipped_no_gateway=%s agents_skipped_no_board=%s", + action, + board.id, + group.id, + len(board_by_id), + len(agents), + notified, + failed, + skipped_missing_session, + skipped_missing_config, + skipped_missing_board, + ) + + +async def _notify_agents_on_board_group_addition( + *, + session: AsyncSession, + board: Board, + group: BoardGroup, +) -> None: + await _notify_agents_on_board_group_change( + session=session, + board=board, + group=group, + action="join", + ) + + +async def _notify_agents_on_board_group_removal( + *, + session: AsyncSession, + board: Board, + group: BoardGroup, +) -> None: + await _notify_agents_on_board_group_change( + session=session, + board=board, + group=group, + action="leave", + ) + + @router.get("", response_model=DefaultLimitOffsetPage[BoardRead]) async def list_boards( gateway_id: UUID | None = GATEWAY_ID_QUERY, @@ -233,7 +419,40 @@ async def update_board( board: Board = BOARD_USER_WRITE_DEP, ) -> Board: """Update mutable board properties.""" - return await _apply_board_update(payload=payload, session=session, board=board) + previous_group_id = board.board_group_id + updated = await _apply_board_update(payload=payload, session=session, board=board) + new_group_id = updated.board_group_id + if previous_group_id is not None and previous_group_id != new_group_id: + previous_group = await crud.get_by_id(session, BoardGroup, previous_group_id) + if previous_group is not None: + try: + await _notify_agents_on_board_group_removal( + session=session, + board=updated, + group=previous_group, + ) + except (OpenClawGatewayError, OSError, RuntimeError, ValueError): + logger.exception( + "board.group.leave.notify_unexpected board_id=%s group_id=%s", + updated.id, + previous_group_id, + ) + if new_group_id is not None and new_group_id != previous_group_id: + board_group = await crud.get_by_id(session, BoardGroup, new_group_id) + if board_group is not None: + try: + await _notify_agents_on_board_group_addition( + session=session, + board=updated, + group=board_group, + ) + except (OpenClawGatewayError, OSError, RuntimeError, ValueError): + logger.exception( + "board.group.join.notify_unexpected board_id=%s group_id=%s", + updated.id, + new_group_id, + ) + return updated @router.delete("/{board_id}", response_model=OkResponse) diff --git a/backend/tests/test_board_group_assignment_notifications.py b/backend/tests/test_board_group_assignment_notifications.py new file mode 100644 index 00000000..5c24fcc3 --- /dev/null +++ b/backend/tests/test_board_group_assignment_notifications.py @@ -0,0 +1,434 @@ +# ruff: noqa: S101 +"""Tests for board-group assignment notifications to board agents.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any +from uuid import UUID, uuid4 + +import pytest + +from app.api import boards +from app.models.agents import Agent +from app.models.board_groups import BoardGroup +from app.models.boards import Board +from app.schemas.boards import BoardUpdate +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import OpenClawGatewayError + + +@dataclass +class _FakeSession: + added: list[object] = field(default_factory=list) + commits: int = 0 + + def add(self, value: object) -> None: + self.added.append(value) + + async def commit(self) -> None: + self.commits += 1 + + +def _board(*, board_group_id: UUID | None) -> Board: + return Board( + id=uuid4(), + organization_id=uuid4(), + name="Platform", + slug="platform", + gateway_id=uuid4(), + board_group_id=board_group_id, + ) + + +def _group(group_id: UUID, org_id: UUID) -> BoardGroup: + return BoardGroup( + id=group_id, + organization_id=org_id, + name="Execution Group", + slug="execution-group", + ) + + +@pytest.mark.asyncio +async def test_update_board_notifies_agents_when_added_to_group( + monkeypatch: pytest.MonkeyPatch, +) -> None: + board = _board(board_group_id=None) + session = _FakeSession() + group_id = uuid4() + group = _group(group_id, board.organization_id) + payload = BoardUpdate(board_group_id=group_id) + calls: dict[str, int] = {"notify": 0} + + async def _fake_apply_board_update(**kwargs: Any) -> Board: + target: Board = kwargs["board"] + target.board_group_id = group_id + return target + + async def _fake_notify(**_kwargs: Any) -> None: + calls["notify"] += 1 + + async def _fake_get_by_id(*_args: Any, **_kwargs: Any) -> BoardGroup: + return group + + monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update) + monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_notify) + monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id) + + updated = await boards.update_board( + payload=payload, + session=session, # type: ignore[arg-type] + board=board, + ) + + assert updated.board_group_id == group_id + assert calls["notify"] == 1 + + +@pytest.mark.asyncio +async def test_update_board_notifies_agents_when_removed_from_group( + monkeypatch: pytest.MonkeyPatch, +) -> None: + group_id = uuid4() + board = _board(board_group_id=group_id) + session = _FakeSession() + group = _group(group_id, board.organization_id) + payload = BoardUpdate(board_group_id=None) + calls: dict[str, int] = {"join": 0, "leave": 0} + + async def _fake_apply_board_update(**kwargs: Any) -> Board: + target: Board = kwargs["board"] + target.board_group_id = None + return target + + async def _fake_join(**_kwargs: Any) -> None: + calls["join"] += 1 + + async def _fake_leave(**_kwargs: Any) -> None: + calls["leave"] += 1 + + async def _fake_get_by_id(*_args: Any, **_kwargs: Any) -> BoardGroup: + return group + + monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update) + monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_join) + monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_leave) + monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id) + + updated = await boards.update_board( + payload=payload, + session=session, # type: ignore[arg-type] + board=board, + ) + + assert updated.board_group_id is None + assert calls["leave"] == 1 + assert calls["join"] == 0 + + +@pytest.mark.asyncio +async def test_update_board_notifies_agents_when_moved_between_groups( + monkeypatch: pytest.MonkeyPatch, +) -> None: + old_group_id = uuid4() + new_group_id = uuid4() + board = _board(board_group_id=old_group_id) + session = _FakeSession() + old_group = _group(old_group_id, board.organization_id) + new_group = _group(new_group_id, board.organization_id) + payload = BoardUpdate(board_group_id=new_group_id) + calls: dict[str, int] = {"join": 0, "leave": 0} + + async def _fake_apply_board_update(**kwargs: Any) -> Board: + target: Board = kwargs["board"] + target.board_group_id = new_group_id + return target + + async def _fake_join(**_kwargs: Any) -> None: + calls["join"] += 1 + + async def _fake_leave(**_kwargs: Any) -> None: + calls["leave"] += 1 + + async def _fake_get_by_id(_session: Any, _model: Any, obj_id: UUID) -> BoardGroup | None: + if obj_id == old_group_id: + return old_group + if obj_id == new_group_id: + return new_group + return None + + monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update) + monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_join) + monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_leave) + monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id) + + updated = await boards.update_board( + payload=payload, + session=session, # type: ignore[arg-type] + board=board, + ) + + assert updated.board_group_id == new_group_id + assert calls["leave"] == 1 + assert calls["join"] == 1 + + +@pytest.mark.asyncio +async def test_update_board_does_not_notify_when_group_unchanged( + monkeypatch: pytest.MonkeyPatch, +) -> None: + group_id = uuid4() + board = _board(board_group_id=group_id) + session = _FakeSession() + payload = BoardUpdate(name="Platform X") + calls: dict[str, int] = {"notify": 0} + + async def _fake_apply_board_update(**kwargs: Any) -> Board: + target: Board = kwargs["board"] + target.name = "Platform X" + return target + + async def _fake_notify(**_kwargs: Any) -> None: + calls["notify"] += 1 + + monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update) + monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_notify) + monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_notify) + + updated = await boards.update_board( + payload=payload, + session=session, # type: ignore[arg-type] + board=board, + ) + + assert updated.name == "Platform X" + assert calls["notify"] == 0 + + +@pytest.mark.asyncio +async def test_notify_agents_on_board_group_addition_fanout_and_records_results( + monkeypatch: pytest.MonkeyPatch, +) -> None: + group_id = uuid4() + board = _board(board_group_id=group_id) + peer_board = Board( + id=uuid4(), + organization_id=board.organization_id, + name="Operations", + slug="operations", + gateway_id=board.gateway_id, + board_group_id=group_id, + ) + group = _group(group_id, board.organization_id) + session = _FakeSession() + sent: list[dict[str, Any]] = [] + + agent_ok = Agent( + id=uuid4(), + board_id=board.id, + gateway_id=board.gateway_id or uuid4(), + name="Lead", + openclaw_session_id="agent:lead:session", + ) + agent_skip = Agent( + id=uuid4(), + board_id=board.id, + gateway_id=board.gateway_id or uuid4(), + name="Observer", + openclaw_session_id=None, + ) + agent_fail = Agent( + id=uuid4(), + board_id=board.id, + gateway_id=board.gateway_id or uuid4(), + name="Worker", + openclaw_session_id="agent:worker:session", + ) + agent_peer = Agent( + id=uuid4(), + board_id=peer_board.id, + gateway_id=peer_board.gateway_id or uuid4(), + name="Partner", + openclaw_session_id="agent:partner:session", + ) + + class _FakeBoardQuery: + async def all(self, _session: object) -> list[Board]: + return [board, peer_board] + + class _FakeBoardObjects: + @staticmethod + def filter_by(**_kwargs: Any) -> _FakeBoardQuery: + return _FakeBoardQuery() + + class _FakeBoardModel: + objects = _FakeBoardObjects() + + class _FakeAgentQuery: + async def all(self, _session: object) -> list[Agent]: + return [agent_ok, agent_skip, agent_fail, agent_peer] + + class _FakeAgentObjects: + @staticmethod + def by_field_in(*_args: Any, **_kwargs: Any) -> _FakeAgentQuery: + return _FakeAgentQuery() + + class _FakeAgentModel: + objects = _FakeAgentObjects() + + async def _fake_optional_gateway_config_for_board( + self: boards.GatewayDispatchService, + target_board: Board, + ) -> GatewayClientConfig: + _ = self + return GatewayClientConfig(url=f"ws://gateway.example/ws/{target_board.id}", token=None) + + async def _fake_try_send_agent_message( + self: boards.GatewayDispatchService, + **kwargs: Any, + ) -> OpenClawGatewayError | None: + _ = self + sent.append(kwargs) + if kwargs["session_key"] == "agent:worker:session": + return OpenClawGatewayError("gateway down") + return None + + monkeypatch.setattr(boards, "Agent", _FakeAgentModel) + monkeypatch.setattr(boards, "Board", _FakeBoardModel) + monkeypatch.setattr( + boards.GatewayDispatchService, + "optional_gateway_config_for_board", + _fake_optional_gateway_config_for_board, + ) + monkeypatch.setattr( + boards.GatewayDispatchService, + "try_send_agent_message", + _fake_try_send_agent_message, + ) + + await boards._notify_agents_on_board_group_addition( + session=session, # type: ignore[arg-type] + board=board, + group=group, + ) + + assert len(sent) == 3 + assert {item["agent_name"] for item in sent} == {"Lead", "Worker", "Partner"} + assert "BOARD GROUP UPDATED" in sent[0]["message"] + assert "cross-board discussion" in sent[0]["message"].lower() + assert "Joined Board: Platform" in sent[0]["message"] + + peer_message = next(item["message"] for item in sent if item["agent_name"] == "Partner") + assert "Recipient Board: Operations" in peer_message + + event_types = [getattr(item, "event_type", "") for item in session.added] + assert "board.group.join.notified" in event_types + assert "board.group.join.notify_failed" in event_types + assert session.commits == 1 + + +@pytest.mark.asyncio +async def test_notify_agents_on_board_group_removal_fanout_and_records_results( + monkeypatch: pytest.MonkeyPatch, +) -> None: + group_id = uuid4() + board = _board(board_group_id=None) + peer_board = Board( + id=uuid4(), + organization_id=board.organization_id, + name="Operations", + slug="operations", + gateway_id=board.gateway_id, + board_group_id=group_id, + ) + group = _group(group_id, board.organization_id) + session = _FakeSession() + sent: list[dict[str, Any]] = [] + + agent_board = Agent( + id=uuid4(), + board_id=board.id, + gateway_id=board.gateway_id or uuid4(), + name="Lead", + openclaw_session_id="agent:lead:session", + ) + agent_peer = Agent( + id=uuid4(), + board_id=peer_board.id, + gateway_id=peer_board.gateway_id or uuid4(), + name="Partner", + openclaw_session_id="agent:partner:session", + ) + + class _FakeBoardQuery: + async def all(self, _session: object) -> list[Board]: + return [peer_board] + + class _FakeBoardObjects: + @staticmethod + def filter_by(**_kwargs: Any) -> _FakeBoardQuery: + return _FakeBoardQuery() + + class _FakeBoardModel: + objects = _FakeBoardObjects() + + class _FakeAgentQuery: + async def all(self, _session: object) -> list[Agent]: + return [agent_board, agent_peer] + + class _FakeAgentObjects: + @staticmethod + def by_field_in(*_args: Any, **_kwargs: Any) -> _FakeAgentQuery: + return _FakeAgentQuery() + + class _FakeAgentModel: + objects = _FakeAgentObjects() + + async def _fake_optional_gateway_config_for_board( + self: boards.GatewayDispatchService, + target_board: Board, + ) -> GatewayClientConfig: + _ = self + return GatewayClientConfig(url=f"ws://gateway.example/ws/{target_board.id}", token=None) + + async def _fake_try_send_agent_message( + self: boards.GatewayDispatchService, + **kwargs: Any, + ) -> OpenClawGatewayError | None: + _ = self + sent.append(kwargs) + return None + + monkeypatch.setattr(boards, "Agent", _FakeAgentModel) + monkeypatch.setattr(boards, "Board", _FakeBoardModel) + monkeypatch.setattr( + boards.GatewayDispatchService, + "optional_gateway_config_for_board", + _fake_optional_gateway_config_for_board, + ) + monkeypatch.setattr( + boards.GatewayDispatchService, + "try_send_agent_message", + _fake_try_send_agent_message, + ) + + await boards._notify_agents_on_board_group_removal( + session=session, # type: ignore[arg-type] + board=board, + group=group, + ) + + assert len(sent) == 2 + assert {item["agent_name"] for item in sent} == {"Lead", "Partner"} + assert "Left Board: Platform" in sent[0]["message"] + assert "Recipient Board: Platform" in next( + item["message"] for item in sent if item["agent_name"] == "Lead" + ) + assert "Recipient Board: Operations" in next( + item["message"] for item in sent if item["agent_name"] == "Partner" + ) + + event_types = [getattr(item, "event_type", "") for item in session.added] + assert "board.group.leave.notified" in event_types + assert session.commits == 1