From f1038acf44adf1729f8874c718bb4d54d0c7b542 Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Wed, 11 Feb 2026 01:13:10 +0530 Subject: [PATCH] refactor: streamline agent lifecycle management with new DB service helpers --- backend/app/api/board_group_memory.py | 11 +-- backend/app/api/board_groups.py | 4 +- backend/app/api/board_memory.py | 16 +-- backend/app/api/board_onboarding.py | 4 +- backend/app/api/boards.py | 4 +- backend/app/api/tasks.py | 39 +++++--- .../app/services/openclaw/admin_service.py | 55 ++++------- .../services/openclaw/coordination_service.py | 76 ++++++-------- .../app/services/openclaw/db_agent_state.py | 57 +++++++++++ backend/app/services/openclaw/db_service.py | 47 +++++++++ .../app/services/openclaw/gateway_dispatch.py | 89 +++++++++++++++++ .../services/openclaw/onboarding_service.py | 23 +++-- .../app/services/openclaw/provisioning_db.py | 99 +++++-------------- .../app/services/openclaw/session_service.py | 23 +---- backend/app/services/openclaw/shared.py | 95 +----------------- .../test_api_openclaw_integration_boundary.py | 15 ++- backend/tests/test_lifecycle_services.py | 70 +++++++------ 17 files changed, 377 insertions(+), 350 deletions(-) create mode 100644 backend/app/services/openclaw/db_agent_state.py create mode 100644 backend/app/services/openclaw/db_service.py create mode 100644 backend/app/services/openclaw/gateway_dispatch.py diff --git a/backend/app/api/board_group_memory.py b/backend/app/api/board_group_memory.py index 1de961e0..3279dec8 100644 --- a/backend/app/api/board_group_memory.py +++ b/backend/app/api/board_group_memory.py @@ -33,10 +33,7 @@ from app.models.users import User from app.schemas.board_group_memory import BoardGroupMemoryCreate, BoardGroupMemoryRead from app.schemas.pagination import DefaultLimitOffsetPage from app.services.mentions import extract_mentions, matches_agent_mention -from app.services.openclaw.shared import ( - optional_gateway_config_for_board, - send_gateway_agent_message_safe, -) +from app.services.openclaw.gateway_dispatch import GatewayDispatchService from app.services.organizations import ( is_org_admin, list_accessible_board_ids, @@ -206,6 +203,7 @@ def _group_header(*, is_broadcast: bool, mentioned: bool) -> str: @dataclass(frozen=True) class _NotifyGroupContext: session: AsyncSession + dispatch: GatewayDispatchService group: BoardGroup board_by_id: dict[UUID, Board] mentions: set[str] @@ -226,7 +224,7 @@ async def _notify_group_target( board = context.board_by_id.get(board_id) if board is None: return - config = await optional_gateway_config_for_board(context.session, board) + config = await context.dispatch.optional_gateway_config_for_board(board) if config is None: return header = _group_header( @@ -242,7 +240,7 @@ async def _notify_group_target( f"POST {context.base_url}/api/v1/boards/{board.id}/group-memory\n" 'Body: {"content":"...","tags":["chat"]}' ) - error = await send_gateway_agent_message_safe( + error = await context.dispatch.try_send_agent_message( session_key=session_key, config=config, agent_name=agent.name, @@ -294,6 +292,7 @@ async def _notify_group_memory_targets( context = _NotifyGroupContext( session=session, + dispatch=GatewayDispatchService(session), group=group, board_by_id=board_by_id, mentions=mentions, diff --git a/backend/app/api/board_groups.py b/backend/app/api/board_groups.py index d70dacc2..3c0eaeca 100644 --- a/backend/app/api/board_groups.py +++ b/backend/app/api/board_groups.py @@ -30,8 +30,8 @@ from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.view_models import BoardGroupSnapshot from app.services.board_group_snapshot import build_group_snapshot from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG +from app.services.openclaw.gateway_rpc import OpenClawGatewayError from app.services.openclaw.provisioning import OpenClawGatewayProvisioner -from app.services.openclaw.shared import GatewayTransportError from app.services.organizations import ( OrganizationContext, board_access_filter, @@ -273,7 +273,7 @@ async def _sync_gateway_heartbeats( gateway, gateway_agents, ) - except GatewayTransportError: + except OpenClawGatewayError: failed_agent_ids.extend([agent.id for agent in gateway_agents]) return failed_agent_ids diff --git a/backend/app/api/board_memory.py b/backend/app/api/board_memory.py index 79207919..bf234be3 100644 --- a/backend/app/api/board_memory.py +++ b/backend/app/api/board_memory.py @@ -28,11 +28,8 @@ from app.models.board_memory import BoardMemory from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead from app.schemas.pagination import DefaultLimitOffsetPage from app.services.mentions import extract_mentions, matches_agent_mention -from app.services.openclaw.shared import ( - GatewayClientConfig, - optional_gateway_config_for_board, - send_gateway_agent_message_safe, -) +from app.services.openclaw.gateway_dispatch import GatewayDispatchService +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -102,6 +99,7 @@ async def _send_control_command( session: AsyncSession, board: Board, actor: ActorContext, + dispatch: GatewayDispatchService, config: GatewayClientConfig, command: str, ) -> None: @@ -115,7 +113,7 @@ async def _send_control_command( continue if not agent.openclaw_session_id: continue - error = await send_gateway_agent_message_safe( + error = await dispatch.try_send_agent_message( session_key=agent.openclaw_session_id, config=config, agent_name=agent.name, @@ -161,7 +159,8 @@ async def _notify_chat_targets( ) -> None: if not memory.content: return - config = await optional_gateway_config_for_board(session, board) + dispatch = GatewayDispatchService(session) + config = await dispatch.optional_gateway_config_for_board(board) if config is None: return @@ -174,6 +173,7 @@ async def _notify_chat_targets( session=session, board=board, actor=actor, + dispatch=dispatch, config=config, command=command, ) @@ -206,7 +206,7 @@ async def _notify_chat_targets( f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n" 'Body: {"content":"...","tags":["chat"]}' ) - error = await send_gateway_agent_message_safe( + error = await dispatch.try_send_agent_message( session_key=agent.openclaw_session_id, config=config, agent_name=agent.name, diff --git a/backend/app/api/board_onboarding.py b/backend/app/api/board_onboarding.py index d96707b4..9de6f0e8 100644 --- a/backend/app/api/board_onboarding.py +++ b/backend/app/api/board_onboarding.py @@ -33,6 +33,7 @@ from app.schemas.board_onboarding import ( BoardOnboardingUserProfile, ) from app.schemas.boards import BoardRead +from app.services.openclaw.gateway_dispatch import GatewayDispatchService from app.services.openclaw.onboarding_service import BoardOnboardingMessagingService from app.services.openclaw.policies import OpenClawAuthorizationPolicy from app.services.openclaw.provisioning_db import ( @@ -40,7 +41,6 @@ from app.services.openclaw.provisioning_db import ( LeadAgentRequest, OpenClawProvisioningService, ) -from app.services.openclaw.shared import require_gateway_config_for_board if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession @@ -396,7 +396,7 @@ async def confirm_onboarding( lead_agent = _parse_draft_lead_agent(onboarding.draft_goal) lead_options = _lead_agent_options(lead_agent) - gateway, config = await require_gateway_config_for_board(session, board) + gateway, config = await GatewayDispatchService(session).require_gateway_config_for_board(board) session.add(board) session.add(onboarding) await session.commit() diff --git a/backend/app/api/boards.py b/backend/app/api/boards.py index 34bf2c52..acfe7454 100644 --- a/backend/app/api/boards.py +++ b/backend/app/api/boards.py @@ -39,8 +39,8 @@ from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.view_models import BoardGroupSnapshot, BoardSnapshot from app.services.board_group_snapshot import build_board_group_snapshot from app.services.board_snapshot import build_board_snapshot +from app.services.openclaw.gateway_rpc import OpenClawGatewayError from app.services.openclaw.provisioning import OpenClawGatewayProvisioner -from app.services.openclaw.shared import GatewayTransportError from app.services.organizations import OrganizationContext, board_access_filter if TYPE_CHECKING: @@ -291,7 +291,7 @@ async def delete_board( agent=agent, gateway=config, ) - except GatewayTransportError as exc: + except OpenClawGatewayError as exc: raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Gateway cleanup failed: {exc}", diff --git a/backend/app/api/tasks.py b/backend/app/api/tasks.py index bc84aae3..90c2853c 100644 --- a/backend/app/api/tasks.py +++ b/backend/app/api/tasks.py @@ -40,12 +40,9 @@ from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate from app.services.activity_log import record_activity from app.services.mentions import extract_mentions, matches_agent_mention -from app.services.openclaw.shared import ( - GatewayClientConfig, - GatewayTransportError, - optional_gateway_config_for_board, - send_gateway_agent_message_safe, -) +from app.services.openclaw.gateway_dispatch import GatewayDispatchService +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import OpenClawGatewayError from app.services.organizations import require_board_access from app.services.task_dependencies import ( blocked_by_dependency_ids, @@ -305,11 +302,12 @@ def _serialize_comment(event: ActivityEvent) -> dict[str, object]: async def _send_lead_task_message( *, + dispatch: GatewayDispatchService, session_key: str, config: GatewayClientConfig, message: str, -) -> GatewayTransportError | None: - return await send_gateway_agent_message_safe( +) -> OpenClawGatewayError | None: + return await dispatch.try_send_agent_message( session_key=session_key, config=config, agent_name="Lead Agent", @@ -320,12 +318,13 @@ async def _send_lead_task_message( async def _send_agent_task_message( *, + dispatch: GatewayDispatchService, session_key: str, config: GatewayClientConfig, agent_name: str, message: str, -) -> GatewayTransportError | None: - return await send_gateway_agent_message_safe( +) -> OpenClawGatewayError | None: + return await dispatch.try_send_agent_message( session_key=session_key, config=config, agent_name=agent_name, @@ -343,7 +342,8 @@ async def _notify_agent_on_task_assign( ) -> None: if not agent.openclaw_session_id: return - config = await optional_gateway_config_for_board(session, board) + dispatch = GatewayDispatchService(session) + config = await dispatch.optional_gateway_config_for_board(board) if config is None: return description = _truncate_snippet(task.description or "") @@ -361,6 +361,7 @@ async def _notify_agent_on_task_assign( + ("\n\nTake action: open the task and begin work. " "Post updates as task comments.") ) error = await _send_agent_task_message( + dispatch=dispatch, session_key=agent.openclaw_session_id, config=config, agent_name=agent.name, @@ -415,7 +416,8 @@ async def _notify_lead_on_task_create( ) if lead is None or not lead.openclaw_session_id: return - config = await optional_gateway_config_for_board(session, board) + dispatch = GatewayDispatchService(session) + config = await dispatch.optional_gateway_config_for_board(board) if config is None: return description = _truncate_snippet(task.description or "") @@ -433,6 +435,7 @@ async def _notify_lead_on_task_create( + "\n\nTake action: triage, assign, or plan next steps." ) error = await _send_lead_task_message( + dispatch=dispatch, session_key=lead.openclaw_session_id, config=config, message=message, @@ -470,7 +473,8 @@ async def _notify_lead_on_task_unassigned( ) if lead is None or not lead.openclaw_session_id: return - config = await optional_gateway_config_for_board(session, board) + dispatch = GatewayDispatchService(session) + config = await dispatch.optional_gateway_config_for_board(board) if config is None: return description = _truncate_snippet(task.description or "") @@ -488,6 +492,7 @@ async def _notify_lead_on_task_unassigned( + "\n\nTake action: assign a new owner or adjust the plan." ) error = await _send_lead_task_message( + dispatch=dispatch, session_key=lead.openclaw_session_id, config=config, message=message, @@ -1029,8 +1034,11 @@ async def _notify_task_comment_targets( if request.task.board_id else None ) - config = await optional_gateway_config_for_board(session, board) if board else None - if not board or not config: + if board is None: + return + dispatch = GatewayDispatchService(session) + config = await dispatch.optional_gateway_config_for_board(board) + if not config: return snippet = _truncate_snippet(request.message) @@ -1057,6 +1065,7 @@ async def _notify_task_comment_targets( "thread but do not change task status." ) await _send_agent_task_message( + dispatch=dispatch, session_key=agent.openclaw_session_id, config=config, agent_name=agent.name, diff --git a/backend/app/services/openclaw/admin_service.py b/backend/app/services/openclaw/admin_service.py index 9c8d0463..2d8cc09e 100644 --- a/backend/app/services/openclaw/admin_service.py +++ b/backend/app/services/openclaw/admin_service.py @@ -2,7 +2,6 @@ from __future__ import annotations -import logging from abc import ABC, abstractmethod from typing import TYPE_CHECKING from uuid import UUID @@ -10,7 +9,6 @@ from uuid import UUID from fastapi import HTTPException, status from sqlmodel import col -from app.core.agent_tokens import generate_agent_token, hash_agent_token from app.core.auth import AuthContext from app.core.time import utcnow from app.db import crud @@ -21,6 +19,12 @@ from app.models.gateways import Gateway from app.models.tasks import Task from app.schemas.gateways import GatewayTemplatesSyncResult from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG +from app.services.openclaw.db_agent_state import ( + mark_provision_complete, + mark_provision_requested, + mint_agent_token, +) +from app.services.openclaw.db_service import OpenClawDBService from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call from app.services.openclaw.provisioning import OpenClawGatewayProvisioner @@ -64,7 +68,7 @@ class DefaultGatewayMainAgentManager(AbstractGatewayMainAgentManager): } -class GatewayAdminLifecycleService: +class GatewayAdminLifecycleService(OpenClawDBService): """Write-side gateway lifecycle service (CRUD, main agent, template sync).""" def __init__( @@ -73,26 +77,9 @@ class GatewayAdminLifecycleService: *, main_agent_manager: AbstractGatewayMainAgentManager | None = None, ) -> None: - self._session = session - self._logger = logging.getLogger(__name__) + super().__init__(session) self._main_agent_manager = main_agent_manager or DefaultGatewayMainAgentManager() - @property - def session(self) -> AsyncSession: - return self._session - - @session.setter - def session(self, value: AsyncSession) -> None: - self._session = value - - @property - def logger(self) -> logging.Logger: - return self._logger - - @logger.setter - def logger(self, value: logging.Logger) -> None: - self._logger = value - @property def main_agent_manager(self) -> AbstractGatewayMainAgentManager: return self._main_agent_manager @@ -206,16 +193,13 @@ class GatewayAdminLifecycleService: status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Organization owner not found (required for gateway agent USER.md rendering).", ) - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - agent.provision_requested_at = utcnow() - agent.provision_action = action - agent.updated_at = utcnow() - if agent.heartbeat_config is None: - agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) + raw_token = mint_agent_token(agent) + mark_provision_requested( + agent, + action=action, + status="updating" if action == "update" else "provisioning", + ) + await self.add_commit_refresh(agent) if not gateway.url: return agent @@ -253,13 +237,8 @@ class GatewayAdminLifecycleService: detail=f"Unexpected error {action}ing gateway provisioning.", ) from exc - agent.status = "online" - agent.provision_requested_at = None - agent.provision_action = None - agent.updated_at = utcnow() - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) + mark_provision_complete(agent, status="online") + await self.add_commit_refresh(agent) self.logger.info( "gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s", diff --git a/backend/app/services/openclaw/coordination_service.py b/backend/app/services/openclaw/coordination_service.py index 2264dcce..d4be349d 100644 --- a/backend/app/services/openclaw/coordination_service.py +++ b/backend/app/services/openclaw/coordination_service.py @@ -3,10 +3,9 @@ from __future__ import annotations import json -import logging from abc import ABC from collections.abc import Awaitable, Callable -from typing import TYPE_CHECKING, TypeVar +from typing import TypeVar from uuid import UUID from fastapi import HTTPException, status @@ -27,11 +26,13 @@ from app.schemas.gateway_coordination import ( GatewayMainAskUserResponse, ) from app.services.activity_log import record_activity +from app.services.openclaw.db_service import OpenClawDBService from app.services.openclaw.exceptions import ( GatewayOperation, map_gateway_error_message, map_gateway_error_to_http_exception, ) +from app.services.openclaw.gateway_dispatch import GatewayDispatchService from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call from app.services.openclaw.internal.agent_key import agent_key @@ -42,43 +43,14 @@ from app.services.openclaw.provisioning_db import ( LeadAgentRequest, OpenClawProvisioningService, ) -from app.services.openclaw.shared import ( - GatewayAgentIdentity, - require_gateway_config_for_board, - resolve_trace_id, - send_gateway_agent_message, -) - -if TYPE_CHECKING: - from sqlmodel.ext.asyncio.session import AsyncSession - +from app.services.openclaw.shared import GatewayAgentIdentity _T = TypeVar("_T") -class AbstractGatewayMessagingService(ABC): +class AbstractGatewayMessagingService(OpenClawDBService, ABC): """Shared gateway messaging primitives with retry semantics.""" - def __init__(self, session: AsyncSession) -> None: - self._session = session - self._logger = logging.getLogger(__name__) - - @property - def session(self) -> AsyncSession: - return self._session - - @session.setter - def session(self, value: AsyncSession) -> None: - self._session = value - - @property - def logger(self) -> logging.Logger: - return self._logger - - @logger.setter - def logger(self, value: logging.Logger) -> None: - self._logger = value - @staticmethod async def _with_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T: return await with_coordination_gateway_retry(fn) @@ -93,7 +65,7 @@ class AbstractGatewayMessagingService(ABC): deliver: bool, ) -> None: async def _do_send() -> bool: - await send_gateway_agent_message( + await GatewayDispatchService(self.session).send_agent_message( session_key=session_key, config=config, agent_name=agent_name, @@ -198,7 +170,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): message: str, correlation_id: str | None = None, ) -> None: - trace_id = resolve_trace_id(correlation_id, prefix="coord.nudge") + trace_id = GatewayDispatchService.resolve_trace_id(correlation_id, prefix="coord.nudge") self.logger.log( 5, "gateway.coordination.nudge.start trace_id=%s board_id=%s actor_agent_id=%s " @@ -214,7 +186,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Target agent has no session key", ) - _gateway, config = await require_gateway_config_for_board(self.session, board) + _gateway, config = await GatewayDispatchService( + self.session + ).require_gateway_config_for_board(board) try: await self._dispatch_gateway_message( session_key=target.openclaw_session_id or "", @@ -276,7 +250,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): target_agent_id: str, correlation_id: str | None = None, ) -> str: - trace_id = resolve_trace_id(correlation_id, prefix="coord.soul.read") + trace_id = GatewayDispatchService.resolve_trace_id(correlation_id, prefix="coord.soul.read") self.logger.log( 5, "gateway.coordination.soul_read.start trace_id=%s board_id=%s target_agent_id=%s", @@ -285,7 +259,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): target_agent_id, ) target = await self._board_agent_or_404(board=board, agent_id=target_agent_id) - _gateway, config = await require_gateway_config_for_board(self.session, board) + _gateway, config = await GatewayDispatchService( + self.session + ).require_gateway_config_for_board(board) try: async def _do_get() -> object: @@ -342,7 +318,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): actor_agent_id: UUID, correlation_id: str | None = None, ) -> None: - trace_id = resolve_trace_id(correlation_id, prefix="coord.soul.write") + trace_id = GatewayDispatchService.resolve_trace_id( + correlation_id, prefix="coord.soul.write" + ) self.logger.log( 5, "gateway.coordination.soul_write.start trace_id=%s board_id=%s target_agent_id=%s " @@ -365,7 +343,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): self.session.add(target) await self.session.commit() - _gateway, config = await require_gateway_config_for_board(self.session, board) + _gateway, config = await GatewayDispatchService( + self.session + ).require_gateway_config_for_board(board) try: async def _do_set() -> object: @@ -434,7 +414,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): payload: GatewayMainAskUserRequest, actor_agent: Agent, ) -> GatewayMainAskUserResponse: - trace_id = resolve_trace_id(payload.correlation_id, prefix="coord.ask_user") + trace_id = GatewayDispatchService.resolve_trace_id( + payload.correlation_id, prefix="coord.ask_user" + ) self.logger.log( 5, "gateway.coordination.ask_user.start trace_id=%s board_id=%s actor_agent_id=%s", @@ -442,7 +424,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): board.id, actor_agent.id, ) - gateway, config = await require_gateway_config_for_board(self.session, board) + gateway, config = await GatewayDispatchService( + self.session + ).require_gateway_config_for_board(board) main_session_key = GatewayAgentIdentity.session_key(gateway) correlation = payload.correlation_id.strip() if payload.correlation_id else "" @@ -575,7 +559,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): board_id: UUID, payload: GatewayLeadMessageRequest, ) -> GatewayLeadMessageResponse: - trace_id = resolve_trace_id(payload.correlation_id, prefix="coord.lead_message") + trace_id = GatewayDispatchService.resolve_trace_id( + payload.correlation_id, prefix="coord.lead_message" + ) self.logger.log( 5, "gateway.coordination.lead_message.start trace_id=%s board_id=%s actor_agent_id=%s", @@ -662,7 +648,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): actor_agent: Agent, payload: GatewayLeadBroadcastRequest, ) -> GatewayLeadBroadcastResponse: - trace_id = resolve_trace_id(payload.correlation_id, prefix="coord.lead_broadcast") + trace_id = GatewayDispatchService.resolve_trace_id( + payload.correlation_id, prefix="coord.lead_broadcast" + ) self.logger.log( 5, "gateway.coordination.lead_broadcast.start trace_id=%s actor_agent_id=%s", diff --git a/backend/app/services/openclaw/db_agent_state.py b/backend/app/services/openclaw/db_agent_state.py new file mode 100644 index 00000000..9d0a7723 --- /dev/null +++ b/backend/app/services/openclaw/db_agent_state.py @@ -0,0 +1,57 @@ +"""Shared DB mutation helpers for OpenClaw agent lifecycle services.""" + +from __future__ import annotations + +from typing import Literal + +from app.core.agent_tokens import generate_agent_token, hash_agent_token +from app.core.time import utcnow +from app.models.agents import Agent +from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG + + +def ensure_heartbeat_config(agent: Agent) -> None: + """Ensure an agent has a heartbeat_config dict populated.""" + + if agent.heartbeat_config is None: + agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() + + +def mint_agent_token(agent: Agent) -> str: + """Generate a new raw token and update the agent's token hash.""" + + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + return raw_token + + +def mark_provision_requested( + agent: Agent, + *, + action: str, + status: str | None = None, +) -> None: + """Mark an agent as pending provisioning/update.""" + + ensure_heartbeat_config(agent) + agent.provision_requested_at = utcnow() + agent.provision_action = action + if status is not None: + agent.status = status + agent.updated_at = utcnow() + + +def mark_provision_complete( + agent: Agent, + *, + status: Literal["online", "offline", "provisioning", "updating", "deleting"] = "online", + clear_confirm_token: bool = False, +) -> None: + """Clear provisioning fields after a successful gateway lifecycle run.""" + + if clear_confirm_token: + agent.provision_confirm_token_hash = None + agent.status = status + agent.provision_requested_at = None + agent.provision_action = None + agent.updated_at = utcnow() diff --git a/backend/app/services/openclaw/db_service.py b/backend/app/services/openclaw/db_service.py new file mode 100644 index 00000000..b224697d --- /dev/null +++ b/backend/app/services/openclaw/db_service.py @@ -0,0 +1,47 @@ +"""Shared DB-backed service base classes for OpenClaw. + +These helpers are intentionally small: they reduce boilerplate (session + logger) across +OpenClaw services without adding new architectural layers. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from sqlmodel.ext.asyncio.session import AsyncSession + + +class OpenClawDBService: + """Base class for OpenClaw services that require an AsyncSession.""" + + def __init__(self, session: AsyncSession) -> None: + self._session = session + # Use the concrete subclass module for logger naming. + self._logger = logging.getLogger(self.__class__.__module__) + + @property + def session(self) -> AsyncSession: + return self._session + + @session.setter + def session(self, value: AsyncSession) -> None: + self._session = value + + @property + def logger(self) -> logging.Logger: + return self._logger + + @logger.setter + def logger(self, value: logging.Logger) -> None: + self._logger = value + + async def add_commit_refresh(self, model: object) -> None: + """Persist a model, committing the current transaction and refreshing when supported.""" + + self.session.add(model) + await self.session.commit() + refresh = getattr(self.session, "refresh", None) + if callable(refresh): + await refresh(model) diff --git a/backend/app/services/openclaw/gateway_dispatch.py b/backend/app/services/openclaw/gateway_dispatch.py new file mode 100644 index 00000000..6fad5e69 --- /dev/null +++ b/backend/app/services/openclaw/gateway_dispatch.py @@ -0,0 +1,89 @@ +"""DB-backed gateway config resolution and message dispatch helpers. + +This module exists to keep `app.api.*` thin: APIs should call OpenClaw services, not +directly orchestrate gateway RPC calls. +""" + +from __future__ import annotations + +from uuid import uuid4 + +from fastapi import HTTPException, status + +from app.models.boards import Board +from app.models.gateways import Gateway +from app.services.openclaw.db_service import OpenClawDBService +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import OpenClawGatewayError, ensure_session, send_message + + +class GatewayDispatchService(OpenClawDBService): + """Resolve gateway config for boards and dispatch messages to agent sessions.""" + + async def optional_gateway_config_for_board( + self, + board: Board, + ) -> GatewayClientConfig | None: + if board.gateway_id is None: + return None + gateway = await Gateway.objects.by_id(board.gateway_id).first(self.session) + if gateway is None or not gateway.url: + return None + return GatewayClientConfig(url=gateway.url, token=gateway.token) + + async def require_gateway_config_for_board( + self, + board: Board, + ) -> tuple[Gateway, GatewayClientConfig]: + if board.gateway_id is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board is not attached to a gateway", + ) + gateway = await Gateway.objects.by_id(board.gateway_id).first(self.session) + if gateway is None or not gateway.url: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway is not configured for this board", + ) + return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token) + + async def send_agent_message( + self, + *, + session_key: str, + config: GatewayClientConfig, + agent_name: str, + message: str, + deliver: bool = False, + ) -> None: + await ensure_session(session_key, config=config, label=agent_name) + await send_message(message, session_key=session_key, config=config, deliver=deliver) + + async def try_send_agent_message( + self, + *, + session_key: str, + config: GatewayClientConfig, + agent_name: str, + message: str, + deliver: bool = False, + ) -> OpenClawGatewayError | None: + try: + await self.send_agent_message( + session_key=session_key, + config=config, + agent_name=agent_name, + message=message, + deliver=deliver, + ) + except OpenClawGatewayError as exc: + return exc + return None + + @staticmethod + def resolve_trace_id(correlation_id: str | None, *, prefix: str) -> str: + normalized = (correlation_id or "").strip() + if normalized: + return normalized + return f"{prefix}:{uuid4().hex[:12]}" diff --git a/backend/app/services/openclaw/onboarding_service.py b/backend/app/services/openclaw/onboarding_service.py index 2dbc99a4..7e443df0 100644 --- a/backend/app/services/openclaw/onboarding_service.py +++ b/backend/app/services/openclaw/onboarding_service.py @@ -6,12 +6,9 @@ from app.models.board_onboarding import BoardOnboardingSession from app.models.boards import Board from app.services.openclaw.coordination_service import AbstractGatewayMessagingService from app.services.openclaw.exceptions import GatewayOperation, map_gateway_error_to_http_exception +from app.services.openclaw.gateway_dispatch import GatewayDispatchService from app.services.openclaw.gateway_rpc import OpenClawGatewayError -from app.services.openclaw.shared import ( - GatewayAgentIdentity, - require_gateway_config_for_board, - resolve_trace_id, -) +from app.services.openclaw.shared import GatewayAgentIdentity class BoardOnboardingMessagingService(AbstractGatewayMessagingService): @@ -24,14 +21,18 @@ class BoardOnboardingMessagingService(AbstractGatewayMessagingService): prompt: str, correlation_id: str | None = None, ) -> str: - trace_id = resolve_trace_id(correlation_id, prefix="onboarding.start") + trace_id = GatewayDispatchService.resolve_trace_id( + correlation_id, prefix="onboarding.start" + ) self.logger.log( 5, "gateway.onboarding.start_dispatch.start trace_id=%s board_id=%s", trace_id, board.id, ) - gateway, config = await require_gateway_config_for_board(self.session, board) + gateway, config = await GatewayDispatchService( + self.session + ).require_gateway_config_for_board(board) session_key = GatewayAgentIdentity.session_key(gateway) try: await self._dispatch_gateway_message( @@ -78,7 +79,9 @@ class BoardOnboardingMessagingService(AbstractGatewayMessagingService): answer_text: str, correlation_id: str | None = None, ) -> None: - trace_id = resolve_trace_id(correlation_id, prefix="onboarding.answer") + trace_id = GatewayDispatchService.resolve_trace_id( + correlation_id, prefix="onboarding.answer" + ) self.logger.log( 5, "gateway.onboarding.answer_dispatch.start trace_id=%s board_id=%s onboarding_id=%s", @@ -86,7 +89,9 @@ class BoardOnboardingMessagingService(AbstractGatewayMessagingService): board.id, onboarding.id, ) - _gateway, config = await require_gateway_config_for_board(self.session, board) + _gateway, config = await GatewayDispatchService( + self.session + ).require_gateway_config_for_board(board) try: await self._dispatch_gateway_message( session_key=onboarding.session_key, diff --git a/backend/app/services/openclaw/provisioning_db.py b/backend/app/services/openclaw/provisioning_db.py index 5c3ca04e..4d48172e 100644 --- a/backend/app/services/openclaw/provisioning_db.py +++ b/backend/app/services/openclaw/provisioning_db.py @@ -10,7 +10,6 @@ from __future__ import annotations import asyncio import json -import logging import re from dataclasses import dataclass, field from datetime import UTC, datetime @@ -22,7 +21,7 @@ from sqlalchemy import asc, func, or_ from sqlmodel import col, select from sse_starlette.sse import EventSourceResponse -from app.core.agent_tokens import generate_agent_token, hash_agent_token, verify_agent_token +from app.core.agent_tokens import verify_agent_token from app.core.time import utcnow from app.db import crud from app.db.pagination import paginate @@ -50,6 +49,12 @@ from app.services.openclaw.constants import ( DEFAULT_HEARTBEAT_CONFIG, OFFLINE_AFTER, ) +from app.services.openclaw.db_agent_state import ( + mark_provision_complete, + mark_provision_requested, + mint_agent_token, +) +from app.services.openclaw.db_service import OpenClawDBService from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig from app.services.openclaw.gateway_rpc import ( OpenClawGatewayError, @@ -120,17 +125,13 @@ class LeadAgentRequest: options: LeadAgentOptions = field(default_factory=LeadAgentOptions) -class OpenClawProvisioningService: +class OpenClawProvisioningService(OpenClawDBService): """DB-backed provisioning workflows (bulk template sync, lead-agent record).""" def __init__(self, session: AsyncSession) -> None: - self._session = session + super().__init__(session) self._gateway = OpenClawGatewayProvisioner() - @property - def session(self) -> AsyncSession: - return self._session - @staticmethod def lead_session_key(board: Board) -> str: return f"agent:lead-{board.id}:main" @@ -191,21 +192,16 @@ class OpenClawProvisioningService: agent = Agent( name=config_options.agent_name or self.lead_agent_name(board), - status="provisioning", board_id=board.id, gateway_id=request.gateway.id, is_board_lead=True, heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), identity_profile=merged_identity_profile, openclaw_session_id=self.lead_session_key(board), - provision_requested_at=utcnow(), - provision_action=config_options.action, ) - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) + raw_token = mint_agent_token(agent) + mark_provision_requested(agent, action=config_options.action, status="provisioning") + await self.add_commit_refresh(agent) # Strict behavior: provisioning errors surface to the caller. The DB row exists # so a later retry can succeed with the same deterministic identity/session key. @@ -220,13 +216,8 @@ class OpenClawProvisioningService: deliver_wakeup=True, ) - agent.status = "online" - agent.provision_requested_at = None - agent.provision_action = None - agent.updated_at = utcnow() - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) + mark_provision_complete(agent, status="online") + await self.add_commit_refresh(agent) return agent, True @@ -433,8 +424,7 @@ def _append_sync_error( async def _rotate_agent_token(session: AsyncSession, agent: Agent) -> str: - token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(token) + token = mint_agent_token(agent) agent.updated_at = utcnow() session.add(agent) await session.commit() @@ -692,28 +682,11 @@ class AgentUpdateProvisionRequest: force_bootstrap: bool -class AgentLifecycleService: +class AgentLifecycleService(OpenClawDBService): """Async service encapsulating agent lifecycle behavior for API routes.""" def __init__(self, session: AsyncSession) -> None: - self._session = session - self._logger = logging.getLogger(__name__) - - @property - def session(self) -> AsyncSession: - return self._session - - @session.setter - def session(self, value: AsyncSession) -> None: - self._session = value - - @property - def logger(self) -> logging.Logger: - return self._logger - - @logger.setter - def logger(self, value: logging.Logger) -> None: - self._logger = value + super().__init__(session) @staticmethod def parse_since(value: str | None) -> datetime | None: @@ -1013,17 +986,10 @@ class AgentLifecycleService: data: dict[str, Any], ) -> tuple[Agent, str]: agent = Agent.model_validate(data) - agent.status = "provisioning" - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - if agent.heartbeat_config is None: - agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() - agent.provision_requested_at = utcnow() - agent.provision_action = "provision" + raw_token = mint_agent_token(agent) + mark_provision_requested(agent, action="provision", status="provisioning") agent.openclaw_session_id = self.resolve_session_key(agent) - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) + await self.add_commit_refresh(agent) return agent, raw_token async def _apply_gateway_provisioning( @@ -1078,11 +1044,7 @@ class AgentLifecycleService: deliver_wakeup=True, wakeup_verb=wakeup_verb, ) - agent.provision_confirm_token_hash = None - agent.provision_requested_at = None - agent.provision_action = None - agent.status = "online" - agent.updated_at = utcnow() + mark_provision_complete(agent, status="online", clear_confirm_token=True) self.session.add(agent) await self.session.commit() record_activity( @@ -1301,11 +1263,8 @@ class AgentLifecycleService: @staticmethod def mark_agent_update_pending(agent: Agent) -> str: - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - agent.provision_requested_at = utcnow() - agent.provision_action = "update" - agent.status = "updating" + raw_token = mint_agent_token(agent) + mark_provision_requested(agent, action="update", status="updating") return raw_token async def provision_updated_agent( @@ -1379,15 +1338,9 @@ class AgentLifecycleService: if agent.agent_token_hash is not None: return - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - if agent.heartbeat_config is None: - agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() - agent.provision_requested_at = utcnow() - agent.provision_action = "provision" - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) + raw_token = mint_agent_token(agent) + mark_provision_requested(agent, action="provision", status="provisioning") + await self.add_commit_refresh(agent) board = await self.require_board( str(agent.board_id) if agent.board_id else None, user=user, diff --git a/backend/app/services/openclaw/session_service.py b/backend/app/services/openclaw/session_service.py index 17fd6345..f8aad4a5 100644 --- a/backend/app/services/openclaw/session_service.py +++ b/backend/app/services/openclaw/session_service.py @@ -2,7 +2,6 @@ from __future__ import annotations -import logging from collections.abc import Iterable from dataclasses import dataclass from typing import TYPE_CHECKING @@ -22,6 +21,7 @@ from app.schemas.gateway_api import ( GatewaySessionsResponse, GatewaysStatusResponse, ) +from app.services.openclaw.db_service import OpenClawDBService from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig from app.services.openclaw.gateway_rpc import ( OpenClawGatewayError, @@ -50,28 +50,11 @@ class GatewayTemplateSyncQuery: board_id: UUID | None -class GatewaySessionService: +class GatewaySessionService(OpenClawDBService): """Read/query gateway runtime session state for user-facing APIs.""" def __init__(self, session: AsyncSession) -> None: - self._session = session - self._logger = logging.getLogger(__name__) - - @property - def session(self) -> AsyncSession: - return self._session - - @session.setter - def session(self, value: AsyncSession) -> None: - self._session = value - - @property - def logger(self) -> logging.Logger: - return self._logger - - @logger.setter - def logger(self, value: logging.Logger) -> None: - self._logger = value + super().__init__(session) @staticmethod def to_resolve_query( diff --git a/backend/app/services/openclaw/shared.py b/backend/app/services/openclaw/shared.py index d87ce474..f48c1ad1 100644 --- a/backend/app/services/openclaw/shared.py +++ b/backend/app/services/openclaw/shared.py @@ -2,29 +2,14 @@ from __future__ import annotations -import logging -from typing import TYPE_CHECKING -from uuid import UUID, uuid4 +from uuid import UUID -from fastapi import HTTPException, status - -from app.models.boards import Board from app.models.gateways import Gateway from app.services.openclaw.constants import ( _GATEWAY_AGENT_PREFIX, _GATEWAY_AGENT_SUFFIX, _GATEWAY_OPENCLAW_AGENT_PREFIX, ) -from app.services.openclaw.gateway_rpc import GatewayConfig as _GatewayClientConfig -from app.services.openclaw.gateway_rpc import OpenClawGatewayError, ensure_session, send_message - -if TYPE_CHECKING: - from sqlmodel.ext.asyncio.session import AsyncSession - - -GatewayClientConfig = _GatewayClientConfig -# Keep integration exceptions behind the OpenClaw service boundary. -GatewayTransportError = OpenClawGatewayError class GatewayAgentIdentity: @@ -45,81 +30,3 @@ class GatewayAgentIdentity: @classmethod def openclaw_agent_id(cls, gateway: Gateway) -> str: return cls.openclaw_agent_id_for_id(gateway.id) - - -async def optional_gateway_config_for_board( - session: AsyncSession, - board: Board, -) -> GatewayClientConfig | None: - """Return gateway client config when board has a reachable configured gateway.""" - if board.gateway_id is None: - return None - gateway = await Gateway.objects.by_id(board.gateway_id).first(session) - if gateway is None or not gateway.url: - return None - return GatewayClientConfig(url=gateway.url, token=gateway.token) - - -async def require_gateway_config_for_board( - session: AsyncSession, - board: Board, -) -> tuple[Gateway, GatewayClientConfig]: - """Resolve board gateway and config, raising 422 when unavailable.""" - if board.gateway_id is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Board is not attached to a gateway", - ) - gateway = await Gateway.objects.by_id(board.gateway_id).first(session) - if gateway is None or not gateway.url: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Gateway is not configured for this board", - ) - return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token) - - -async def send_gateway_agent_message( - *, - session_key: str, - config: GatewayClientConfig, - agent_name: str, - message: str, - deliver: bool = False, -) -> None: - """Ensure session and dispatch a message to an agent session.""" - await ensure_session(session_key, config=config, label=agent_name) - await send_message(message, session_key=session_key, config=config, deliver=deliver) - - -async def send_gateway_agent_message_safe( - *, - session_key: str, - config: GatewayClientConfig, - agent_name: str, - message: str, - deliver: bool = False, -) -> GatewayTransportError | None: - """Best-effort gateway dispatch returning transport error when one occurs.""" - try: - await send_gateway_agent_message( - session_key=session_key, - config=config, - agent_name=agent_name, - message=message, - deliver=deliver, - ) - except GatewayTransportError as exc: - return exc - return None - - -def resolve_trace_id(correlation_id: str | None, *, prefix: str) -> str: - """Resolve a stable trace id from correlation id or generate a scoped fallback.""" - normalized = (correlation_id or "").strip() - if normalized: - return normalized - return f"{prefix}:{uuid4().hex[:12]}" - - -logger = logging.getLogger(__name__) diff --git a/backend/tests/test_api_openclaw_integration_boundary.py b/backend/tests/test_api_openclaw_integration_boundary.py index e4f038cb..05702e42 100644 --- a/backend/tests/test_api_openclaw_integration_boundary.py +++ b/backend/tests/test_api_openclaw_integration_boundary.py @@ -30,24 +30,23 @@ def test_api_does_not_import_openclaw_gateway_client_directly() -> None: def test_api_uses_safe_gateway_dispatch_helper() -> None: - """API modules should use `send_gateway_agent_message_safe`, not direct send.""" + """API modules should not call low-level gateway RPC helpers directly.""" repo_root = Path(__file__).resolve().parents[2] api_root = repo_root / "backend" / "app" / "api" - direct_send_pattern = re.compile(r"\bsend_gateway_agent_message\b") + forbidden = {"ensure_session", "send_message", "openclaw_call"} violations: list[str] = [] for path in api_root.rglob("*.py"): rel = path.relative_to(repo_root) for lineno, raw_line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1): line = raw_line.strip() - if not direct_send_pattern.search(line): + if not line.startswith("from app.services.openclaw.gateway_rpc import "): continue - if "send_gateway_agent_message_safe" in line: - continue - violations.append(f"{rel}:{lineno}") + if any(re.search(rf"\\b{name}\\b", line) for name in forbidden): + violations.append(f"{rel}:{lineno}") assert not violations, ( - "Use `send_gateway_agent_message_safe` from `app.services.openclaw.shared` " - "for API-level gateway notification dispatch. " + "Use OpenClaw service modules (for example `app.services.openclaw.gateway_dispatch`) " + "instead of calling low-level gateway RPC helpers from `app.api`." f"Violations: {', '.join(violations)}" ) diff --git a/backend/tests/test_lifecycle_services.py b/backend/tests/test_lifecycle_services.py index f0d96418..89b81398 100644 --- a/backend/tests/test_lifecycle_services.py +++ b/backend/tests/test_lifecycle_services.py @@ -69,15 +69,17 @@ async def test_gateway_coordination_nudge_success(monkeypatch: pytest.MonkeyPatc return target async def _fake_require_gateway_config_for_board( - _session: object, + self: coordination_lifecycle.GatewayDispatchService, _board: object, ) -> tuple[object, GatewayClientConfig]: + _ = self gateway = SimpleNamespace(id=uuid4(), url="ws://gateway.example/ws") return gateway, GatewayClientConfig(url="ws://gateway.example/ws", token=None) - async def _fake_send_gateway_agent_message(**kwargs: Any) -> dict[str, bool]: + async def _fake_send_agent_message(self, **kwargs: Any) -> None: + _ = self captured.append(kwargs) - return {"ok": True} + return None monkeypatch.setattr( coordination_lifecycle.GatewayCoordinationService, @@ -85,14 +87,14 @@ async def test_gateway_coordination_nudge_success(monkeypatch: pytest.MonkeyPatc _fake_board_agent_or_404, ) monkeypatch.setattr( - coordination_lifecycle, + coordination_lifecycle.GatewayDispatchService, "require_gateway_config_for_board", _fake_require_gateway_config_for_board, ) monkeypatch.setattr( - coordination_lifecycle, - "send_gateway_agent_message", - _fake_send_gateway_agent_message, + coordination_lifecycle.GatewayDispatchService, + "send_agent_message", + _fake_send_agent_message, ) await service.nudge_board_agent( @@ -135,13 +137,15 @@ async def test_gateway_coordination_nudge_maps_gateway_error( return target async def _fake_require_gateway_config_for_board( - _session: object, + self: coordination_lifecycle.GatewayDispatchService, _board: object, ) -> tuple[object, GatewayClientConfig]: + _ = self gateway = SimpleNamespace(id=uuid4(), url="ws://gateway.example/ws") return gateway, GatewayClientConfig(url="ws://gateway.example/ws", token=None) - async def _fake_send_gateway_agent_message(**_kwargs: Any) -> None: + async def _fake_send_agent_message(self, **_kwargs: Any) -> None: + _ = self raise OpenClawGatewayError("dial tcp: connection refused") monkeypatch.setattr( @@ -150,14 +154,14 @@ async def test_gateway_coordination_nudge_maps_gateway_error( _fake_board_agent_or_404, ) monkeypatch.setattr( - coordination_lifecycle, + coordination_lifecycle.GatewayDispatchService, "require_gateway_config_for_board", _fake_require_gateway_config_for_board, ) monkeypatch.setattr( - coordination_lifecycle, - "send_gateway_agent_message", - _fake_send_gateway_agent_message, + coordination_lifecycle.GatewayDispatchService, + "send_agent_message", + _fake_send_agent_message, ) with pytest.raises(HTTPException) as exc_info: @@ -185,25 +189,27 @@ async def test_board_onboarding_dispatch_start_returns_session_key( captured: list[dict[str, Any]] = [] async def _fake_require_gateway_config_for_board( - _session: object, + self: onboarding_lifecycle.GatewayDispatchService, _board: object, ) -> tuple[object, GatewayClientConfig]: + _ = self gateway = SimpleNamespace(id=gateway_id, url="ws://gateway.example/ws") return gateway, GatewayClientConfig(url="ws://gateway.example/ws", token=None) - async def _fake_send_gateway_agent_message(**kwargs: Any) -> dict[str, bool]: + async def _fake_send_agent_message(self, **kwargs: Any) -> None: + _ = self captured.append(kwargs) - return {"ok": True} + return None monkeypatch.setattr( - onboarding_lifecycle, + onboarding_lifecycle.GatewayDispatchService, "require_gateway_config_for_board", _fake_require_gateway_config_for_board, ) monkeypatch.setattr( - coordination_lifecycle, - "send_gateway_agent_message", - _fake_send_gateway_agent_message, + coordination_lifecycle.GatewayDispatchService, + "send_agent_message", + _fake_send_agent_message, ) session_key = await service.dispatch_start_prompt( @@ -224,28 +230,34 @@ async def test_board_onboarding_dispatch_answer_maps_timeout_error( ) -> None: session = _FakeSession() service = onboarding_lifecycle.BoardOnboardingMessagingService(session) # type: ignore[arg-type] - board = _BoardStub(id=uuid4(), gateway_id=uuid4(), name="Roadmap") - onboarding = SimpleNamespace(id=uuid4(), session_key="agent:gateway-main:main") + gateway_id = uuid4() + board = _BoardStub(id=uuid4(), gateway_id=gateway_id, name="Roadmap") + onboarding = SimpleNamespace( + id=uuid4(), + session_key=GatewayAgentIdentity.session_key_for_id(gateway_id), + ) async def _fake_require_gateway_config_for_board( - _session: object, + self: onboarding_lifecycle.GatewayDispatchService, _board: object, ) -> tuple[object, GatewayClientConfig]: - gateway = SimpleNamespace(id=uuid4(), url="ws://gateway.example/ws") + _ = self + gateway = SimpleNamespace(id=gateway_id, url="ws://gateway.example/ws") return gateway, GatewayClientConfig(url="ws://gateway.example/ws", token=None) - async def _fake_send_gateway_agent_message(**_kwargs: Any) -> None: + async def _fake_send_agent_message(self, **_kwargs: Any) -> None: + _ = self raise TimeoutError("gateway timeout") monkeypatch.setattr( - onboarding_lifecycle, + onboarding_lifecycle.GatewayDispatchService, "require_gateway_config_for_board", _fake_require_gateway_config_for_board, ) monkeypatch.setattr( - coordination_lifecycle, - "send_gateway_agent_message", - _fake_send_gateway_agent_message, + coordination_lifecycle.GatewayDispatchService, + "send_agent_message", + _fake_send_agent_message, ) with pytest.raises(HTTPException) as exc_info: