feat: add notifications for board group changes and implement related tests
This commit is contained in:
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING, Literal
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||||||
@@ -16,10 +16,12 @@ from app.api.deps import (
|
|||||||
require_org_admin,
|
require_org_admin,
|
||||||
require_org_member,
|
require_org_member,
|
||||||
)
|
)
|
||||||
|
from app.core.logging import get_logger
|
||||||
from app.core.time import utcnow
|
from app.core.time import utcnow
|
||||||
from app.db import crud
|
from app.db import crud
|
||||||
from app.db.pagination import paginate
|
from app.db.pagination import paginate
|
||||||
from app.db.session import get_session
|
from app.db.session import get_session
|
||||||
|
from app.models.agents import Agent
|
||||||
from app.models.board_groups import BoardGroup
|
from app.models.board_groups import BoardGroup
|
||||||
from app.models.boards import Board
|
from app.models.boards import Board
|
||||||
from app.models.gateways import Gateway
|
from app.models.gateways import Gateway
|
||||||
@@ -27,9 +29,13 @@ from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
|
|||||||
from app.schemas.common import OkResponse
|
from app.schemas.common import OkResponse
|
||||||
from app.schemas.pagination import DefaultLimitOffsetPage
|
from app.schemas.pagination import DefaultLimitOffsetPage
|
||||||
from app.schemas.view_models import BoardGroupSnapshot, BoardSnapshot
|
from app.schemas.view_models import BoardGroupSnapshot, BoardSnapshot
|
||||||
|
from app.services.activity_log import record_activity
|
||||||
from app.services.board_group_snapshot import build_board_group_snapshot
|
from app.services.board_group_snapshot import build_board_group_snapshot
|
||||||
from app.services.board_lifecycle import delete_board as delete_board_service
|
from app.services.board_lifecycle import delete_board as delete_board_service
|
||||||
from app.services.board_snapshot import build_board_snapshot
|
from app.services.board_snapshot import build_board_snapshot
|
||||||
|
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
|
||||||
|
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
|
||||||
|
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
|
||||||
from app.services.organizations import OrganizationContext, board_access_filter
|
from app.services.organizations import OrganizationContext, board_access_filter
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -37,6 +43,7 @@ if TYPE_CHECKING:
|
|||||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||||
|
|
||||||
router = APIRouter(prefix="/boards", tags=["boards"])
|
router = APIRouter(prefix="/boards", tags=["boards"])
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
SESSION_DEP = Depends(get_session)
|
SESSION_DEP = Depends(get_session)
|
||||||
ORG_ADMIN_DEP = Depends(require_org_admin)
|
ORG_ADMIN_DEP = Depends(require_org_admin)
|
||||||
@@ -156,6 +163,185 @@ async def _apply_board_update(
|
|||||||
return await crud.save(session, board)
|
return await crud.save(session, board)
|
||||||
|
|
||||||
|
|
||||||
|
def _board_group_change_message(
|
||||||
|
*,
|
||||||
|
action: Literal["join", "leave"],
|
||||||
|
changed_board: Board,
|
||||||
|
recipient_board: Board,
|
||||||
|
group: BoardGroup,
|
||||||
|
) -> str:
|
||||||
|
changed_label = "Joined Board" if action == "join" else "Left Board"
|
||||||
|
guidance = (
|
||||||
|
"1) Use cross-board discussion when work spans multiple boards.\n"
|
||||||
|
"2) Check related board activity before acting on shared concerns.\n"
|
||||||
|
"3) Explicitly coordinate ownership to avoid duplicate or conflicting work.\n"
|
||||||
|
)
|
||||||
|
if action == "leave":
|
||||||
|
guidance = (
|
||||||
|
"1) Treat cross-board coordination with the departed board as inactive.\n"
|
||||||
|
"2) Re-check dependencies and ownership that previously spanned this board.\n"
|
||||||
|
"3) Confirm no in-flight handoffs still rely on the prior group link.\n"
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
"BOARD GROUP UPDATED\n"
|
||||||
|
f"{changed_label}: {changed_board.name}\n"
|
||||||
|
f"{changed_label} ID: {changed_board.id}\n"
|
||||||
|
f"Recipient Board: {recipient_board.name}\n"
|
||||||
|
f"Recipient Board ID: {recipient_board.id}\n"
|
||||||
|
f"Board Group: {group.name}\n"
|
||||||
|
f"Board Group ID: {group.id}\n\n"
|
||||||
|
"Coordination guidance:\n"
|
||||||
|
f"{guidance}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _notify_agents_on_board_group_change(
|
||||||
|
*,
|
||||||
|
session: AsyncSession,
|
||||||
|
board: Board,
|
||||||
|
group: BoardGroup,
|
||||||
|
action: Literal["join", "leave"],
|
||||||
|
) -> None:
|
||||||
|
dispatch = GatewayDispatchService(session)
|
||||||
|
group_boards = await Board.objects.filter_by(board_group_id=group.id).all(session)
|
||||||
|
board_by_id = {item.id: item for item in group_boards}
|
||||||
|
board_by_id.setdefault(board.id, board)
|
||||||
|
board_ids = list(board_by_id.keys())
|
||||||
|
if not board_ids:
|
||||||
|
return
|
||||||
|
agents = await Agent.objects.by_field_in("board_id", board_ids).all(session)
|
||||||
|
if not agents:
|
||||||
|
return
|
||||||
|
|
||||||
|
config_by_board_id: dict[UUID, GatewayClientConfig] = {}
|
||||||
|
for group_board in board_by_id.values():
|
||||||
|
config = await dispatch.optional_gateway_config_for_board(group_board)
|
||||||
|
if config is None:
|
||||||
|
logger.warning(
|
||||||
|
"board.group.%s.notify_skipped board_id=%s group_id=%s target_board_id=%s "
|
||||||
|
"reason=no_gateway_config",
|
||||||
|
action,
|
||||||
|
board.id,
|
||||||
|
group.id,
|
||||||
|
group_board.id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
config_by_board_id[group_board.id] = config
|
||||||
|
|
||||||
|
if not config_by_board_id:
|
||||||
|
logger.warning(
|
||||||
|
"board.group.%s.notify_skipped board_id=%s group_id=%s reason=no_gateway_config_any_board",
|
||||||
|
action,
|
||||||
|
board.id,
|
||||||
|
group.id,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
message_by_board_id = {
|
||||||
|
recipient_board_id: _board_group_change_message(
|
||||||
|
action=action,
|
||||||
|
changed_board=board,
|
||||||
|
recipient_board=recipient_board,
|
||||||
|
group=group,
|
||||||
|
)
|
||||||
|
for recipient_board_id, recipient_board in board_by_id.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
notified = 0
|
||||||
|
failed = 0
|
||||||
|
skipped_missing_session = 0
|
||||||
|
skipped_missing_config = 0
|
||||||
|
skipped_missing_board = 0
|
||||||
|
for agent in agents:
|
||||||
|
if not agent.openclaw_session_id:
|
||||||
|
skipped_missing_session += 1
|
||||||
|
continue
|
||||||
|
if agent.board_id is None:
|
||||||
|
skipped_missing_board += 1
|
||||||
|
continue
|
||||||
|
config = config_by_board_id.get(agent.board_id)
|
||||||
|
message = message_by_board_id.get(agent.board_id)
|
||||||
|
recipient_board = board_by_id.get(agent.board_id)
|
||||||
|
if config is None or message is None or recipient_board is None:
|
||||||
|
skipped_missing_config += 1
|
||||||
|
continue
|
||||||
|
error = await dispatch.try_send_agent_message(
|
||||||
|
session_key=agent.openclaw_session_id,
|
||||||
|
config=config,
|
||||||
|
agent_name=agent.name,
|
||||||
|
message=message,
|
||||||
|
deliver=False,
|
||||||
|
)
|
||||||
|
if error is None:
|
||||||
|
notified += 1
|
||||||
|
record_activity(
|
||||||
|
session,
|
||||||
|
event_type=f"board.group.{action}.notified",
|
||||||
|
message=(
|
||||||
|
f"Board-group {action} notice sent to {agent.name} for board "
|
||||||
|
f"{recipient_board.name} related to {board.name} and {group.name}."
|
||||||
|
),
|
||||||
|
agent_id=agent.id,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
failed += 1
|
||||||
|
record_activity(
|
||||||
|
session,
|
||||||
|
event_type=f"board.group.{action}.notify_failed",
|
||||||
|
message=(
|
||||||
|
f"Board-group {action} notify failed for {agent.name} on board "
|
||||||
|
f"{recipient_board.name}: {error}"
|
||||||
|
),
|
||||||
|
agent_id=agent.id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if notified or failed:
|
||||||
|
await session.commit()
|
||||||
|
logger.info(
|
||||||
|
"board.group.%s.notify_complete board_id=%s group_id=%s boards_total=%s agents_total=%s "
|
||||||
|
"agents_notified=%s agents_failed=%s agents_skipped_no_session=%s "
|
||||||
|
"agents_skipped_no_gateway=%s agents_skipped_no_board=%s",
|
||||||
|
action,
|
||||||
|
board.id,
|
||||||
|
group.id,
|
||||||
|
len(board_by_id),
|
||||||
|
len(agents),
|
||||||
|
notified,
|
||||||
|
failed,
|
||||||
|
skipped_missing_session,
|
||||||
|
skipped_missing_config,
|
||||||
|
skipped_missing_board,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _notify_agents_on_board_group_addition(
|
||||||
|
*,
|
||||||
|
session: AsyncSession,
|
||||||
|
board: Board,
|
||||||
|
group: BoardGroup,
|
||||||
|
) -> None:
|
||||||
|
await _notify_agents_on_board_group_change(
|
||||||
|
session=session,
|
||||||
|
board=board,
|
||||||
|
group=group,
|
||||||
|
action="join",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _notify_agents_on_board_group_removal(
|
||||||
|
*,
|
||||||
|
session: AsyncSession,
|
||||||
|
board: Board,
|
||||||
|
group: BoardGroup,
|
||||||
|
) -> None:
|
||||||
|
await _notify_agents_on_board_group_change(
|
||||||
|
session=session,
|
||||||
|
board=board,
|
||||||
|
group=group,
|
||||||
|
action="leave",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("", response_model=DefaultLimitOffsetPage[BoardRead])
|
@router.get("", response_model=DefaultLimitOffsetPage[BoardRead])
|
||||||
async def list_boards(
|
async def list_boards(
|
||||||
gateway_id: UUID | None = GATEWAY_ID_QUERY,
|
gateway_id: UUID | None = GATEWAY_ID_QUERY,
|
||||||
@@ -233,7 +419,40 @@ async def update_board(
|
|||||||
board: Board = BOARD_USER_WRITE_DEP,
|
board: Board = BOARD_USER_WRITE_DEP,
|
||||||
) -> Board:
|
) -> Board:
|
||||||
"""Update mutable board properties."""
|
"""Update mutable board properties."""
|
||||||
return await _apply_board_update(payload=payload, session=session, board=board)
|
previous_group_id = board.board_group_id
|
||||||
|
updated = await _apply_board_update(payload=payload, session=session, board=board)
|
||||||
|
new_group_id = updated.board_group_id
|
||||||
|
if previous_group_id is not None and previous_group_id != new_group_id:
|
||||||
|
previous_group = await crud.get_by_id(session, BoardGroup, previous_group_id)
|
||||||
|
if previous_group is not None:
|
||||||
|
try:
|
||||||
|
await _notify_agents_on_board_group_removal(
|
||||||
|
session=session,
|
||||||
|
board=updated,
|
||||||
|
group=previous_group,
|
||||||
|
)
|
||||||
|
except (OpenClawGatewayError, OSError, RuntimeError, ValueError):
|
||||||
|
logger.exception(
|
||||||
|
"board.group.leave.notify_unexpected board_id=%s group_id=%s",
|
||||||
|
updated.id,
|
||||||
|
previous_group_id,
|
||||||
|
)
|
||||||
|
if new_group_id is not None and new_group_id != previous_group_id:
|
||||||
|
board_group = await crud.get_by_id(session, BoardGroup, new_group_id)
|
||||||
|
if board_group is not None:
|
||||||
|
try:
|
||||||
|
await _notify_agents_on_board_group_addition(
|
||||||
|
session=session,
|
||||||
|
board=updated,
|
||||||
|
group=board_group,
|
||||||
|
)
|
||||||
|
except (OpenClawGatewayError, OSError, RuntimeError, ValueError):
|
||||||
|
logger.exception(
|
||||||
|
"board.group.join.notify_unexpected board_id=%s group_id=%s",
|
||||||
|
updated.id,
|
||||||
|
new_group_id,
|
||||||
|
)
|
||||||
|
return updated
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/{board_id}", response_model=OkResponse)
|
@router.delete("/{board_id}", response_model=OkResponse)
|
||||||
|
|||||||
434
backend/tests/test_board_group_assignment_notifications.py
Normal file
434
backend/tests/test_board_group_assignment_notifications.py
Normal file
@@ -0,0 +1,434 @@
|
|||||||
|
# ruff: noqa: S101
|
||||||
|
"""Tests for board-group assignment notifications to board agents."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any
|
||||||
|
from uuid import UUID, uuid4
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.api import boards
|
||||||
|
from app.models.agents import Agent
|
||||||
|
from app.models.board_groups import BoardGroup
|
||||||
|
from app.models.boards import Board
|
||||||
|
from app.schemas.boards import BoardUpdate
|
||||||
|
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
|
||||||
|
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _FakeSession:
|
||||||
|
added: list[object] = field(default_factory=list)
|
||||||
|
commits: int = 0
|
||||||
|
|
||||||
|
def add(self, value: object) -> None:
|
||||||
|
self.added.append(value)
|
||||||
|
|
||||||
|
async def commit(self) -> None:
|
||||||
|
self.commits += 1
|
||||||
|
|
||||||
|
|
||||||
|
def _board(*, board_group_id: UUID | None) -> Board:
|
||||||
|
return Board(
|
||||||
|
id=uuid4(),
|
||||||
|
organization_id=uuid4(),
|
||||||
|
name="Platform",
|
||||||
|
slug="platform",
|
||||||
|
gateway_id=uuid4(),
|
||||||
|
board_group_id=board_group_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _group(group_id: UUID, org_id: UUID) -> BoardGroup:
|
||||||
|
return BoardGroup(
|
||||||
|
id=group_id,
|
||||||
|
organization_id=org_id,
|
||||||
|
name="Execution Group",
|
||||||
|
slug="execution-group",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_update_board_notifies_agents_when_added_to_group(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
board = _board(board_group_id=None)
|
||||||
|
session = _FakeSession()
|
||||||
|
group_id = uuid4()
|
||||||
|
group = _group(group_id, board.organization_id)
|
||||||
|
payload = BoardUpdate(board_group_id=group_id)
|
||||||
|
calls: dict[str, int] = {"notify": 0}
|
||||||
|
|
||||||
|
async def _fake_apply_board_update(**kwargs: Any) -> Board:
|
||||||
|
target: Board = kwargs["board"]
|
||||||
|
target.board_group_id = group_id
|
||||||
|
return target
|
||||||
|
|
||||||
|
async def _fake_notify(**_kwargs: Any) -> None:
|
||||||
|
calls["notify"] += 1
|
||||||
|
|
||||||
|
async def _fake_get_by_id(*_args: Any, **_kwargs: Any) -> BoardGroup:
|
||||||
|
return group
|
||||||
|
|
||||||
|
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
|
||||||
|
monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_notify)
|
||||||
|
monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id)
|
||||||
|
|
||||||
|
updated = await boards.update_board(
|
||||||
|
payload=payload,
|
||||||
|
session=session, # type: ignore[arg-type]
|
||||||
|
board=board,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert updated.board_group_id == group_id
|
||||||
|
assert calls["notify"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_update_board_notifies_agents_when_removed_from_group(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
group_id = uuid4()
|
||||||
|
board = _board(board_group_id=group_id)
|
||||||
|
session = _FakeSession()
|
||||||
|
group = _group(group_id, board.organization_id)
|
||||||
|
payload = BoardUpdate(board_group_id=None)
|
||||||
|
calls: dict[str, int] = {"join": 0, "leave": 0}
|
||||||
|
|
||||||
|
async def _fake_apply_board_update(**kwargs: Any) -> Board:
|
||||||
|
target: Board = kwargs["board"]
|
||||||
|
target.board_group_id = None
|
||||||
|
return target
|
||||||
|
|
||||||
|
async def _fake_join(**_kwargs: Any) -> None:
|
||||||
|
calls["join"] += 1
|
||||||
|
|
||||||
|
async def _fake_leave(**_kwargs: Any) -> None:
|
||||||
|
calls["leave"] += 1
|
||||||
|
|
||||||
|
async def _fake_get_by_id(*_args: Any, **_kwargs: Any) -> BoardGroup:
|
||||||
|
return group
|
||||||
|
|
||||||
|
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
|
||||||
|
monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_join)
|
||||||
|
monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_leave)
|
||||||
|
monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id)
|
||||||
|
|
||||||
|
updated = await boards.update_board(
|
||||||
|
payload=payload,
|
||||||
|
session=session, # type: ignore[arg-type]
|
||||||
|
board=board,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert updated.board_group_id is None
|
||||||
|
assert calls["leave"] == 1
|
||||||
|
assert calls["join"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_update_board_notifies_agents_when_moved_between_groups(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
old_group_id = uuid4()
|
||||||
|
new_group_id = uuid4()
|
||||||
|
board = _board(board_group_id=old_group_id)
|
||||||
|
session = _FakeSession()
|
||||||
|
old_group = _group(old_group_id, board.organization_id)
|
||||||
|
new_group = _group(new_group_id, board.organization_id)
|
||||||
|
payload = BoardUpdate(board_group_id=new_group_id)
|
||||||
|
calls: dict[str, int] = {"join": 0, "leave": 0}
|
||||||
|
|
||||||
|
async def _fake_apply_board_update(**kwargs: Any) -> Board:
|
||||||
|
target: Board = kwargs["board"]
|
||||||
|
target.board_group_id = new_group_id
|
||||||
|
return target
|
||||||
|
|
||||||
|
async def _fake_join(**_kwargs: Any) -> None:
|
||||||
|
calls["join"] += 1
|
||||||
|
|
||||||
|
async def _fake_leave(**_kwargs: Any) -> None:
|
||||||
|
calls["leave"] += 1
|
||||||
|
|
||||||
|
async def _fake_get_by_id(_session: Any, _model: Any, obj_id: UUID) -> BoardGroup | None:
|
||||||
|
if obj_id == old_group_id:
|
||||||
|
return old_group
|
||||||
|
if obj_id == new_group_id:
|
||||||
|
return new_group
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
|
||||||
|
monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_join)
|
||||||
|
monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_leave)
|
||||||
|
monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id)
|
||||||
|
|
||||||
|
updated = await boards.update_board(
|
||||||
|
payload=payload,
|
||||||
|
session=session, # type: ignore[arg-type]
|
||||||
|
board=board,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert updated.board_group_id == new_group_id
|
||||||
|
assert calls["leave"] == 1
|
||||||
|
assert calls["join"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_update_board_does_not_notify_when_group_unchanged(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
group_id = uuid4()
|
||||||
|
board = _board(board_group_id=group_id)
|
||||||
|
session = _FakeSession()
|
||||||
|
payload = BoardUpdate(name="Platform X")
|
||||||
|
calls: dict[str, int] = {"notify": 0}
|
||||||
|
|
||||||
|
async def _fake_apply_board_update(**kwargs: Any) -> Board:
|
||||||
|
target: Board = kwargs["board"]
|
||||||
|
target.name = "Platform X"
|
||||||
|
return target
|
||||||
|
|
||||||
|
async def _fake_notify(**_kwargs: Any) -> None:
|
||||||
|
calls["notify"] += 1
|
||||||
|
|
||||||
|
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
|
||||||
|
monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_notify)
|
||||||
|
monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_notify)
|
||||||
|
|
||||||
|
updated = await boards.update_board(
|
||||||
|
payload=payload,
|
||||||
|
session=session, # type: ignore[arg-type]
|
||||||
|
board=board,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert updated.name == "Platform X"
|
||||||
|
assert calls["notify"] == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_notify_agents_on_board_group_addition_fanout_and_records_results(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
group_id = uuid4()
|
||||||
|
board = _board(board_group_id=group_id)
|
||||||
|
peer_board = Board(
|
||||||
|
id=uuid4(),
|
||||||
|
organization_id=board.organization_id,
|
||||||
|
name="Operations",
|
||||||
|
slug="operations",
|
||||||
|
gateway_id=board.gateway_id,
|
||||||
|
board_group_id=group_id,
|
||||||
|
)
|
||||||
|
group = _group(group_id, board.organization_id)
|
||||||
|
session = _FakeSession()
|
||||||
|
sent: list[dict[str, Any]] = []
|
||||||
|
|
||||||
|
agent_ok = Agent(
|
||||||
|
id=uuid4(),
|
||||||
|
board_id=board.id,
|
||||||
|
gateway_id=board.gateway_id or uuid4(),
|
||||||
|
name="Lead",
|
||||||
|
openclaw_session_id="agent:lead:session",
|
||||||
|
)
|
||||||
|
agent_skip = Agent(
|
||||||
|
id=uuid4(),
|
||||||
|
board_id=board.id,
|
||||||
|
gateway_id=board.gateway_id or uuid4(),
|
||||||
|
name="Observer",
|
||||||
|
openclaw_session_id=None,
|
||||||
|
)
|
||||||
|
agent_fail = Agent(
|
||||||
|
id=uuid4(),
|
||||||
|
board_id=board.id,
|
||||||
|
gateway_id=board.gateway_id or uuid4(),
|
||||||
|
name="Worker",
|
||||||
|
openclaw_session_id="agent:worker:session",
|
||||||
|
)
|
||||||
|
agent_peer = Agent(
|
||||||
|
id=uuid4(),
|
||||||
|
board_id=peer_board.id,
|
||||||
|
gateway_id=peer_board.gateway_id or uuid4(),
|
||||||
|
name="Partner",
|
||||||
|
openclaw_session_id="agent:partner:session",
|
||||||
|
)
|
||||||
|
|
||||||
|
class _FakeBoardQuery:
|
||||||
|
async def all(self, _session: object) -> list[Board]:
|
||||||
|
return [board, peer_board]
|
||||||
|
|
||||||
|
class _FakeBoardObjects:
|
||||||
|
@staticmethod
|
||||||
|
def filter_by(**_kwargs: Any) -> _FakeBoardQuery:
|
||||||
|
return _FakeBoardQuery()
|
||||||
|
|
||||||
|
class _FakeBoardModel:
|
||||||
|
objects = _FakeBoardObjects()
|
||||||
|
|
||||||
|
class _FakeAgentQuery:
|
||||||
|
async def all(self, _session: object) -> list[Agent]:
|
||||||
|
return [agent_ok, agent_skip, agent_fail, agent_peer]
|
||||||
|
|
||||||
|
class _FakeAgentObjects:
|
||||||
|
@staticmethod
|
||||||
|
def by_field_in(*_args: Any, **_kwargs: Any) -> _FakeAgentQuery:
|
||||||
|
return _FakeAgentQuery()
|
||||||
|
|
||||||
|
class _FakeAgentModel:
|
||||||
|
objects = _FakeAgentObjects()
|
||||||
|
|
||||||
|
async def _fake_optional_gateway_config_for_board(
|
||||||
|
self: boards.GatewayDispatchService,
|
||||||
|
target_board: Board,
|
||||||
|
) -> GatewayClientConfig:
|
||||||
|
_ = self
|
||||||
|
return GatewayClientConfig(url=f"ws://gateway.example/ws/{target_board.id}", token=None)
|
||||||
|
|
||||||
|
async def _fake_try_send_agent_message(
|
||||||
|
self: boards.GatewayDispatchService,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> OpenClawGatewayError | None:
|
||||||
|
_ = self
|
||||||
|
sent.append(kwargs)
|
||||||
|
if kwargs["session_key"] == "agent:worker:session":
|
||||||
|
return OpenClawGatewayError("gateway down")
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr(boards, "Agent", _FakeAgentModel)
|
||||||
|
monkeypatch.setattr(boards, "Board", _FakeBoardModel)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
boards.GatewayDispatchService,
|
||||||
|
"optional_gateway_config_for_board",
|
||||||
|
_fake_optional_gateway_config_for_board,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
boards.GatewayDispatchService,
|
||||||
|
"try_send_agent_message",
|
||||||
|
_fake_try_send_agent_message,
|
||||||
|
)
|
||||||
|
|
||||||
|
await boards._notify_agents_on_board_group_addition(
|
||||||
|
session=session, # type: ignore[arg-type]
|
||||||
|
board=board,
|
||||||
|
group=group,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(sent) == 3
|
||||||
|
assert {item["agent_name"] for item in sent} == {"Lead", "Worker", "Partner"}
|
||||||
|
assert "BOARD GROUP UPDATED" in sent[0]["message"]
|
||||||
|
assert "cross-board discussion" in sent[0]["message"].lower()
|
||||||
|
assert "Joined Board: Platform" in sent[0]["message"]
|
||||||
|
|
||||||
|
peer_message = next(item["message"] for item in sent if item["agent_name"] == "Partner")
|
||||||
|
assert "Recipient Board: Operations" in peer_message
|
||||||
|
|
||||||
|
event_types = [getattr(item, "event_type", "") for item in session.added]
|
||||||
|
assert "board.group.join.notified" in event_types
|
||||||
|
assert "board.group.join.notify_failed" in event_types
|
||||||
|
assert session.commits == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_notify_agents_on_board_group_removal_fanout_and_records_results(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
) -> None:
|
||||||
|
group_id = uuid4()
|
||||||
|
board = _board(board_group_id=None)
|
||||||
|
peer_board = Board(
|
||||||
|
id=uuid4(),
|
||||||
|
organization_id=board.organization_id,
|
||||||
|
name="Operations",
|
||||||
|
slug="operations",
|
||||||
|
gateway_id=board.gateway_id,
|
||||||
|
board_group_id=group_id,
|
||||||
|
)
|
||||||
|
group = _group(group_id, board.organization_id)
|
||||||
|
session = _FakeSession()
|
||||||
|
sent: list[dict[str, Any]] = []
|
||||||
|
|
||||||
|
agent_board = Agent(
|
||||||
|
id=uuid4(),
|
||||||
|
board_id=board.id,
|
||||||
|
gateway_id=board.gateway_id or uuid4(),
|
||||||
|
name="Lead",
|
||||||
|
openclaw_session_id="agent:lead:session",
|
||||||
|
)
|
||||||
|
agent_peer = Agent(
|
||||||
|
id=uuid4(),
|
||||||
|
board_id=peer_board.id,
|
||||||
|
gateway_id=peer_board.gateway_id or uuid4(),
|
||||||
|
name="Partner",
|
||||||
|
openclaw_session_id="agent:partner:session",
|
||||||
|
)
|
||||||
|
|
||||||
|
class _FakeBoardQuery:
|
||||||
|
async def all(self, _session: object) -> list[Board]:
|
||||||
|
return [peer_board]
|
||||||
|
|
||||||
|
class _FakeBoardObjects:
|
||||||
|
@staticmethod
|
||||||
|
def filter_by(**_kwargs: Any) -> _FakeBoardQuery:
|
||||||
|
return _FakeBoardQuery()
|
||||||
|
|
||||||
|
class _FakeBoardModel:
|
||||||
|
objects = _FakeBoardObjects()
|
||||||
|
|
||||||
|
class _FakeAgentQuery:
|
||||||
|
async def all(self, _session: object) -> list[Agent]:
|
||||||
|
return [agent_board, agent_peer]
|
||||||
|
|
||||||
|
class _FakeAgentObjects:
|
||||||
|
@staticmethod
|
||||||
|
def by_field_in(*_args: Any, **_kwargs: Any) -> _FakeAgentQuery:
|
||||||
|
return _FakeAgentQuery()
|
||||||
|
|
||||||
|
class _FakeAgentModel:
|
||||||
|
objects = _FakeAgentObjects()
|
||||||
|
|
||||||
|
async def _fake_optional_gateway_config_for_board(
|
||||||
|
self: boards.GatewayDispatchService,
|
||||||
|
target_board: Board,
|
||||||
|
) -> GatewayClientConfig:
|
||||||
|
_ = self
|
||||||
|
return GatewayClientConfig(url=f"ws://gateway.example/ws/{target_board.id}", token=None)
|
||||||
|
|
||||||
|
async def _fake_try_send_agent_message(
|
||||||
|
self: boards.GatewayDispatchService,
|
||||||
|
**kwargs: Any,
|
||||||
|
) -> OpenClawGatewayError | None:
|
||||||
|
_ = self
|
||||||
|
sent.append(kwargs)
|
||||||
|
return None
|
||||||
|
|
||||||
|
monkeypatch.setattr(boards, "Agent", _FakeAgentModel)
|
||||||
|
monkeypatch.setattr(boards, "Board", _FakeBoardModel)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
boards.GatewayDispatchService,
|
||||||
|
"optional_gateway_config_for_board",
|
||||||
|
_fake_optional_gateway_config_for_board,
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
boards.GatewayDispatchService,
|
||||||
|
"try_send_agent_message",
|
||||||
|
_fake_try_send_agent_message,
|
||||||
|
)
|
||||||
|
|
||||||
|
await boards._notify_agents_on_board_group_removal(
|
||||||
|
session=session, # type: ignore[arg-type]
|
||||||
|
board=board,
|
||||||
|
group=group,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(sent) == 2
|
||||||
|
assert {item["agent_name"] for item in sent} == {"Lead", "Partner"}
|
||||||
|
assert "Left Board: Platform" in sent[0]["message"]
|
||||||
|
assert "Recipient Board: Platform" in next(
|
||||||
|
item["message"] for item in sent if item["agent_name"] == "Lead"
|
||||||
|
)
|
||||||
|
assert "Recipient Board: Operations" in next(
|
||||||
|
item["message"] for item in sent if item["agent_name"] == "Partner"
|
||||||
|
)
|
||||||
|
|
||||||
|
event_types = [getattr(item, "event_type", "") for item in session.added]
|
||||||
|
assert "board.group.leave.notified" in event_types
|
||||||
|
assert session.commits == 1
|
||||||
Reference in New Issue
Block a user