refactor: streamline agent lifecycle management with new DB service helpers

This commit is contained in:
Abhimanyu Saharan
2026-02-11 01:13:10 +05:30
parent f4161494d9
commit f1038acf44
17 changed files with 377 additions and 350 deletions

View File

@@ -33,10 +33,7 @@ from app.models.users import User
from app.schemas.board_group_memory import BoardGroupMemoryCreate, BoardGroupMemoryRead
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.mentions import extract_mentions, matches_agent_mention
from app.services.openclaw.shared import (
optional_gateway_config_for_board,
send_gateway_agent_message_safe,
)
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.organizations import (
is_org_admin,
list_accessible_board_ids,
@@ -206,6 +203,7 @@ def _group_header(*, is_broadcast: bool, mentioned: bool) -> str:
@dataclass(frozen=True)
class _NotifyGroupContext:
session: AsyncSession
dispatch: GatewayDispatchService
group: BoardGroup
board_by_id: dict[UUID, Board]
mentions: set[str]
@@ -226,7 +224,7 @@ async def _notify_group_target(
board = context.board_by_id.get(board_id)
if board is None:
return
config = await optional_gateway_config_for_board(context.session, board)
config = await context.dispatch.optional_gateway_config_for_board(board)
if config is None:
return
header = _group_header(
@@ -242,7 +240,7 @@ async def _notify_group_target(
f"POST {context.base_url}/api/v1/boards/{board.id}/group-memory\n"
'Body: {"content":"...","tags":["chat"]}'
)
error = await send_gateway_agent_message_safe(
error = await context.dispatch.try_send_agent_message(
session_key=session_key,
config=config,
agent_name=agent.name,
@@ -294,6 +292,7 @@ async def _notify_group_memory_targets(
context = _NotifyGroupContext(
session=session,
dispatch=GatewayDispatchService(session),
group=group,
board_by_id=board_by_id,
mentions=mentions,

View File

@@ -30,8 +30,8 @@ from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.view_models import BoardGroupSnapshot
from app.services.board_group_snapshot import build_group_snapshot
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
from app.services.openclaw.shared import GatewayTransportError
from app.services.organizations import (
OrganizationContext,
board_access_filter,
@@ -273,7 +273,7 @@ async def _sync_gateway_heartbeats(
gateway,
gateway_agents,
)
except GatewayTransportError:
except OpenClawGatewayError:
failed_agent_ids.extend([agent.id for agent in gateway_agents])
return failed_agent_ids

View File

@@ -28,11 +28,8 @@ from app.models.board_memory import BoardMemory
from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.mentions import extract_mentions, matches_agent_mention
from app.services.openclaw.shared import (
GatewayClientConfig,
optional_gateway_config_for_board,
send_gateway_agent_message_safe,
)
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
if TYPE_CHECKING:
from collections.abc import AsyncIterator
@@ -102,6 +99,7 @@ async def _send_control_command(
session: AsyncSession,
board: Board,
actor: ActorContext,
dispatch: GatewayDispatchService,
config: GatewayClientConfig,
command: str,
) -> None:
@@ -115,7 +113,7 @@ async def _send_control_command(
continue
if not agent.openclaw_session_id:
continue
error = await send_gateway_agent_message_safe(
error = await dispatch.try_send_agent_message(
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
@@ -161,7 +159,8 @@ async def _notify_chat_targets(
) -> None:
if not memory.content:
return
config = await optional_gateway_config_for_board(session, board)
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
@@ -174,6 +173,7 @@ async def _notify_chat_targets(
session=session,
board=board,
actor=actor,
dispatch=dispatch,
config=config,
command=command,
)
@@ -206,7 +206,7 @@ async def _notify_chat_targets(
f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n"
'Body: {"content":"...","tags":["chat"]}'
)
error = await send_gateway_agent_message_safe(
error = await dispatch.try_send_agent_message(
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,

View File

@@ -33,6 +33,7 @@ from app.schemas.board_onboarding import (
BoardOnboardingUserProfile,
)
from app.schemas.boards import BoardRead
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.openclaw.onboarding_service import BoardOnboardingMessagingService
from app.services.openclaw.policies import OpenClawAuthorizationPolicy
from app.services.openclaw.provisioning_db import (
@@ -40,7 +41,6 @@ from app.services.openclaw.provisioning_db import (
LeadAgentRequest,
OpenClawProvisioningService,
)
from app.services.openclaw.shared import require_gateway_config_for_board
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -396,7 +396,7 @@ async def confirm_onboarding(
lead_agent = _parse_draft_lead_agent(onboarding.draft_goal)
lead_options = _lead_agent_options(lead_agent)
gateway, config = await require_gateway_config_for_board(session, board)
gateway, config = await GatewayDispatchService(session).require_gateway_config_for_board(board)
session.add(board)
session.add(onboarding)
await session.commit()

View File

@@ -39,8 +39,8 @@ from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.view_models import BoardGroupSnapshot, BoardSnapshot
from app.services.board_group_snapshot import build_board_group_snapshot
from app.services.board_snapshot import build_board_snapshot
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
from app.services.openclaw.shared import GatewayTransportError
from app.services.organizations import OrganizationContext, board_access_filter
if TYPE_CHECKING:
@@ -291,7 +291,7 @@ async def delete_board(
agent=agent,
gateway=config,
)
except GatewayTransportError as exc:
except OpenClawGatewayError as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway cleanup failed: {exc}",

View File

@@ -40,12 +40,9 @@ from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
from app.services.mentions import extract_mentions, matches_agent_mention
from app.services.openclaw.shared import (
GatewayClientConfig,
GatewayTransportError,
optional_gateway_config_for_board,
send_gateway_agent_message_safe,
)
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.organizations import require_board_access
from app.services.task_dependencies import (
blocked_by_dependency_ids,
@@ -305,11 +302,12 @@ def _serialize_comment(event: ActivityEvent) -> dict[str, object]:
async def _send_lead_task_message(
*,
dispatch: GatewayDispatchService,
session_key: str,
config: GatewayClientConfig,
message: str,
) -> GatewayTransportError | None:
return await send_gateway_agent_message_safe(
) -> OpenClawGatewayError | None:
return await dispatch.try_send_agent_message(
session_key=session_key,
config=config,
agent_name="Lead Agent",
@@ -320,12 +318,13 @@ async def _send_lead_task_message(
async def _send_agent_task_message(
*,
dispatch: GatewayDispatchService,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
) -> GatewayTransportError | None:
return await send_gateway_agent_message_safe(
) -> OpenClawGatewayError | None:
return await dispatch.try_send_agent_message(
session_key=session_key,
config=config,
agent_name=agent_name,
@@ -343,7 +342,8 @@ async def _notify_agent_on_task_assign(
) -> None:
if not agent.openclaw_session_id:
return
config = await optional_gateway_config_for_board(session, board)
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
description = _truncate_snippet(task.description or "")
@@ -361,6 +361,7 @@ async def _notify_agent_on_task_assign(
+ ("\n\nTake action: open the task and begin work. " "Post updates as task comments.")
)
error = await _send_agent_task_message(
dispatch=dispatch,
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
@@ -415,7 +416,8 @@ async def _notify_lead_on_task_create(
)
if lead is None or not lead.openclaw_session_id:
return
config = await optional_gateway_config_for_board(session, board)
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
description = _truncate_snippet(task.description or "")
@@ -433,6 +435,7 @@ async def _notify_lead_on_task_create(
+ "\n\nTake action: triage, assign, or plan next steps."
)
error = await _send_lead_task_message(
dispatch=dispatch,
session_key=lead.openclaw_session_id,
config=config,
message=message,
@@ -470,7 +473,8 @@ async def _notify_lead_on_task_unassigned(
)
if lead is None or not lead.openclaw_session_id:
return
config = await optional_gateway_config_for_board(session, board)
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
description = _truncate_snippet(task.description or "")
@@ -488,6 +492,7 @@ async def _notify_lead_on_task_unassigned(
+ "\n\nTake action: assign a new owner or adjust the plan."
)
error = await _send_lead_task_message(
dispatch=dispatch,
session_key=lead.openclaw_session_id,
config=config,
message=message,
@@ -1029,8 +1034,11 @@ async def _notify_task_comment_targets(
if request.task.board_id
else None
)
config = await optional_gateway_config_for_board(session, board) if board else None
if not board or not config:
if board is None:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if not config:
return
snippet = _truncate_snippet(request.message)
@@ -1057,6 +1065,7 @@ async def _notify_task_comment_targets(
"thread but do not change task status."
)
await _send_agent_task_message(
dispatch=dispatch,
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,

View File

@@ -2,7 +2,6 @@
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from uuid import UUID
@@ -10,7 +9,6 @@ from uuid import UUID
from fastapi import HTTPException, status
from sqlmodel import col
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext
from app.core.time import utcnow
from app.db import crud
@@ -21,6 +19,12 @@ from app.models.gateways import Gateway
from app.models.tasks import Task
from app.schemas.gateways import GatewayTemplatesSyncResult
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
@@ -64,7 +68,7 @@ class DefaultGatewayMainAgentManager(AbstractGatewayMainAgentManager):
}
class GatewayAdminLifecycleService:
class GatewayAdminLifecycleService(OpenClawDBService):
"""Write-side gateway lifecycle service (CRUD, main agent, template sync)."""
def __init__(
@@ -73,26 +77,9 @@ class GatewayAdminLifecycleService:
*,
main_agent_manager: AbstractGatewayMainAgentManager | None = None,
) -> None:
self._session = session
self._logger = logging.getLogger(__name__)
super().__init__(session)
self._main_agent_manager = main_agent_manager or DefaultGatewayMainAgentManager()
@property
def session(self) -> AsyncSession:
return self._session
@session.setter
def session(self, value: AsyncSession) -> None:
self._session = value
@property
def logger(self) -> logging.Logger:
return self._logger
@logger.setter
def logger(self, value: logging.Logger) -> None:
self._logger = value
@property
def main_agent_manager(self) -> AbstractGatewayMainAgentManager:
return self._main_agent_manager
@@ -206,16 +193,13 @@ class GatewayAdminLifecycleService:
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Organization owner not found (required for gateway agent USER.md rendering).",
)
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
agent.provision_requested_at = utcnow()
agent.provision_action = action
agent.updated_at = utcnow()
if agent.heartbeat_config is None:
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
self.session.add(agent)
await self.session.commit()
await self.session.refresh(agent)
raw_token = mint_agent_token(agent)
mark_provision_requested(
agent,
action=action,
status="updating" if action == "update" else "provisioning",
)
await self.add_commit_refresh(agent)
if not gateway.url:
return agent
@@ -253,13 +237,8 @@ class GatewayAdminLifecycleService:
detail=f"Unexpected error {action}ing gateway provisioning.",
) from exc
agent.status = "online"
agent.provision_requested_at = None
agent.provision_action = None
agent.updated_at = utcnow()
self.session.add(agent)
await self.session.commit()
await self.session.refresh(agent)
mark_provision_complete(agent, status="online")
await self.add_commit_refresh(agent)
self.logger.info(
"gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s",

View File

@@ -3,10 +3,9 @@
from __future__ import annotations
import json
import logging
from abc import ABC
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, TypeVar
from typing import TypeVar
from uuid import UUID
from fastapi import HTTPException, status
@@ -27,11 +26,13 @@ from app.schemas.gateway_coordination import (
GatewayMainAskUserResponse,
)
from app.services.activity_log import record_activity
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.exceptions import (
GatewayOperation,
map_gateway_error_message,
map_gateway_error_to_http_exception,
)
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call
from app.services.openclaw.internal.agent_key import agent_key
@@ -42,43 +43,14 @@ from app.services.openclaw.provisioning_db import (
LeadAgentRequest,
OpenClawProvisioningService,
)
from app.services.openclaw.shared import (
GatewayAgentIdentity,
require_gateway_config_for_board,
resolve_trace_id,
send_gateway_agent_message,
)
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
from app.services.openclaw.shared import GatewayAgentIdentity
_T = TypeVar("_T")
class AbstractGatewayMessagingService(ABC):
class AbstractGatewayMessagingService(OpenClawDBService, ABC):
"""Shared gateway messaging primitives with retry semantics."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
self._logger = logging.getLogger(__name__)
@property
def session(self) -> AsyncSession:
return self._session
@session.setter
def session(self, value: AsyncSession) -> None:
self._session = value
@property
def logger(self) -> logging.Logger:
return self._logger
@logger.setter
def logger(self, value: logging.Logger) -> None:
self._logger = value
@staticmethod
async def _with_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T:
return await with_coordination_gateway_retry(fn)
@@ -93,7 +65,7 @@ class AbstractGatewayMessagingService(ABC):
deliver: bool,
) -> None:
async def _do_send() -> bool:
await send_gateway_agent_message(
await GatewayDispatchService(self.session).send_agent_message(
session_key=session_key,
config=config,
agent_name=agent_name,
@@ -198,7 +170,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
message: str,
correlation_id: str | None = None,
) -> None:
trace_id = resolve_trace_id(correlation_id, prefix="coord.nudge")
trace_id = GatewayDispatchService.resolve_trace_id(correlation_id, prefix="coord.nudge")
self.logger.log(
5,
"gateway.coordination.nudge.start trace_id=%s board_id=%s actor_agent_id=%s "
@@ -214,7 +186,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Target agent has no session key",
)
_gateway, config = await require_gateway_config_for_board(self.session, board)
_gateway, config = await GatewayDispatchService(
self.session
).require_gateway_config_for_board(board)
try:
await self._dispatch_gateway_message(
session_key=target.openclaw_session_id or "",
@@ -276,7 +250,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
target_agent_id: str,
correlation_id: str | None = None,
) -> str:
trace_id = resolve_trace_id(correlation_id, prefix="coord.soul.read")
trace_id = GatewayDispatchService.resolve_trace_id(correlation_id, prefix="coord.soul.read")
self.logger.log(
5,
"gateway.coordination.soul_read.start trace_id=%s board_id=%s target_agent_id=%s",
@@ -285,7 +259,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
target_agent_id,
)
target = await self._board_agent_or_404(board=board, agent_id=target_agent_id)
_gateway, config = await require_gateway_config_for_board(self.session, board)
_gateway, config = await GatewayDispatchService(
self.session
).require_gateway_config_for_board(board)
try:
async def _do_get() -> object:
@@ -342,7 +318,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
actor_agent_id: UUID,
correlation_id: str | None = None,
) -> None:
trace_id = resolve_trace_id(correlation_id, prefix="coord.soul.write")
trace_id = GatewayDispatchService.resolve_trace_id(
correlation_id, prefix="coord.soul.write"
)
self.logger.log(
5,
"gateway.coordination.soul_write.start trace_id=%s board_id=%s target_agent_id=%s "
@@ -365,7 +343,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
self.session.add(target)
await self.session.commit()
_gateway, config = await require_gateway_config_for_board(self.session, board)
_gateway, config = await GatewayDispatchService(
self.session
).require_gateway_config_for_board(board)
try:
async def _do_set() -> object:
@@ -434,7 +414,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
payload: GatewayMainAskUserRequest,
actor_agent: Agent,
) -> GatewayMainAskUserResponse:
trace_id = resolve_trace_id(payload.correlation_id, prefix="coord.ask_user")
trace_id = GatewayDispatchService.resolve_trace_id(
payload.correlation_id, prefix="coord.ask_user"
)
self.logger.log(
5,
"gateway.coordination.ask_user.start trace_id=%s board_id=%s actor_agent_id=%s",
@@ -442,7 +424,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
board.id,
actor_agent.id,
)
gateway, config = await require_gateway_config_for_board(self.session, board)
gateway, config = await GatewayDispatchService(
self.session
).require_gateway_config_for_board(board)
main_session_key = GatewayAgentIdentity.session_key(gateway)
correlation = payload.correlation_id.strip() if payload.correlation_id else ""
@@ -575,7 +559,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
board_id: UUID,
payload: GatewayLeadMessageRequest,
) -> GatewayLeadMessageResponse:
trace_id = resolve_trace_id(payload.correlation_id, prefix="coord.lead_message")
trace_id = GatewayDispatchService.resolve_trace_id(
payload.correlation_id, prefix="coord.lead_message"
)
self.logger.log(
5,
"gateway.coordination.lead_message.start trace_id=%s board_id=%s actor_agent_id=%s",
@@ -662,7 +648,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
actor_agent: Agent,
payload: GatewayLeadBroadcastRequest,
) -> GatewayLeadBroadcastResponse:
trace_id = resolve_trace_id(payload.correlation_id, prefix="coord.lead_broadcast")
trace_id = GatewayDispatchService.resolve_trace_id(
payload.correlation_id, prefix="coord.lead_broadcast"
)
self.logger.log(
5,
"gateway.coordination.lead_broadcast.start trace_id=%s actor_agent_id=%s",

View File

@@ -0,0 +1,57 @@
"""Shared DB mutation helpers for OpenClaw agent lifecycle services."""
from __future__ import annotations
from typing import Literal
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.time import utcnow
from app.models.agents import Agent
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
def ensure_heartbeat_config(agent: Agent) -> None:
"""Ensure an agent has a heartbeat_config dict populated."""
if agent.heartbeat_config is None:
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
def mint_agent_token(agent: Agent) -> str:
"""Generate a new raw token and update the agent's token hash."""
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
return raw_token
def mark_provision_requested(
agent: Agent,
*,
action: str,
status: str | None = None,
) -> None:
"""Mark an agent as pending provisioning/update."""
ensure_heartbeat_config(agent)
agent.provision_requested_at = utcnow()
agent.provision_action = action
if status is not None:
agent.status = status
agent.updated_at = utcnow()
def mark_provision_complete(
agent: Agent,
*,
status: Literal["online", "offline", "provisioning", "updating", "deleting"] = "online",
clear_confirm_token: bool = False,
) -> None:
"""Clear provisioning fields after a successful gateway lifecycle run."""
if clear_confirm_token:
agent.provision_confirm_token_hash = None
agent.status = status
agent.provision_requested_at = None
agent.provision_action = None
agent.updated_at = utcnow()

View File

@@ -0,0 +1,47 @@
"""Shared DB-backed service base classes for OpenClaw.
These helpers are intentionally small: they reduce boilerplate (session + logger) across
OpenClaw services without adding new architectural layers.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
class OpenClawDBService:
"""Base class for OpenClaw services that require an AsyncSession."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
# Use the concrete subclass module for logger naming.
self._logger = logging.getLogger(self.__class__.__module__)
@property
def session(self) -> AsyncSession:
return self._session
@session.setter
def session(self, value: AsyncSession) -> None:
self._session = value
@property
def logger(self) -> logging.Logger:
return self._logger
@logger.setter
def logger(self, value: logging.Logger) -> None:
self._logger = value
async def add_commit_refresh(self, model: object) -> None:
"""Persist a model, committing the current transaction and refreshing when supported."""
self.session.add(model)
await self.session.commit()
refresh = getattr(self.session, "refresh", None)
if callable(refresh):
await refresh(model)

View File

@@ -0,0 +1,89 @@
"""DB-backed gateway config resolution and message dispatch helpers.
This module exists to keep `app.api.*` thin: APIs should call OpenClaw services, not
directly orchestrate gateway RPC calls.
"""
from __future__ import annotations
from uuid import uuid4
from fastapi import HTTPException, status
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError, ensure_session, send_message
class GatewayDispatchService(OpenClawDBService):
"""Resolve gateway config for boards and dispatch messages to agent sessions."""
async def optional_gateway_config_for_board(
self,
board: Board,
) -> GatewayClientConfig | None:
if board.gateway_id is None:
return None
gateway = await Gateway.objects.by_id(board.gateway_id).first(self.session)
if gateway is None or not gateway.url:
return None
return GatewayClientConfig(url=gateway.url, token=gateway.token)
async def require_gateway_config_for_board(
self,
board: Board,
) -> tuple[Gateway, GatewayClientConfig]:
if board.gateway_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Board is not attached to a gateway",
)
gateway = await Gateway.objects.by_id(board.gateway_id).first(self.session)
if gateway is None or not gateway.url:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Gateway is not configured for this board",
)
return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token)
async def send_agent_message(
self,
*,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
deliver: bool = False,
) -> None:
await ensure_session(session_key, config=config, label=agent_name)
await send_message(message, session_key=session_key, config=config, deliver=deliver)
async def try_send_agent_message(
self,
*,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
deliver: bool = False,
) -> OpenClawGatewayError | None:
try:
await self.send_agent_message(
session_key=session_key,
config=config,
agent_name=agent_name,
message=message,
deliver=deliver,
)
except OpenClawGatewayError as exc:
return exc
return None
@staticmethod
def resolve_trace_id(correlation_id: str | None, *, prefix: str) -> str:
normalized = (correlation_id or "").strip()
if normalized:
return normalized
return f"{prefix}:{uuid4().hex[:12]}"

View File

@@ -6,12 +6,9 @@ from app.models.board_onboarding import BoardOnboardingSession
from app.models.boards import Board
from app.services.openclaw.coordination_service import AbstractGatewayMessagingService
from app.services.openclaw.exceptions import GatewayOperation, map_gateway_error_to_http_exception
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.openclaw.shared import (
GatewayAgentIdentity,
require_gateway_config_for_board,
resolve_trace_id,
)
from app.services.openclaw.shared import GatewayAgentIdentity
class BoardOnboardingMessagingService(AbstractGatewayMessagingService):
@@ -24,14 +21,18 @@ class BoardOnboardingMessagingService(AbstractGatewayMessagingService):
prompt: str,
correlation_id: str | None = None,
) -> str:
trace_id = resolve_trace_id(correlation_id, prefix="onboarding.start")
trace_id = GatewayDispatchService.resolve_trace_id(
correlation_id, prefix="onboarding.start"
)
self.logger.log(
5,
"gateway.onboarding.start_dispatch.start trace_id=%s board_id=%s",
trace_id,
board.id,
)
gateway, config = await require_gateway_config_for_board(self.session, board)
gateway, config = await GatewayDispatchService(
self.session
).require_gateway_config_for_board(board)
session_key = GatewayAgentIdentity.session_key(gateway)
try:
await self._dispatch_gateway_message(
@@ -78,7 +79,9 @@ class BoardOnboardingMessagingService(AbstractGatewayMessagingService):
answer_text: str,
correlation_id: str | None = None,
) -> None:
trace_id = resolve_trace_id(correlation_id, prefix="onboarding.answer")
trace_id = GatewayDispatchService.resolve_trace_id(
correlation_id, prefix="onboarding.answer"
)
self.logger.log(
5,
"gateway.onboarding.answer_dispatch.start trace_id=%s board_id=%s onboarding_id=%s",
@@ -86,7 +89,9 @@ class BoardOnboardingMessagingService(AbstractGatewayMessagingService):
board.id,
onboarding.id,
)
_gateway, config = await require_gateway_config_for_board(self.session, board)
_gateway, config = await GatewayDispatchService(
self.session
).require_gateway_config_for_board(board)
try:
await self._dispatch_gateway_message(
session_key=onboarding.session_key,

View File

@@ -10,7 +10,6 @@ from __future__ import annotations
import asyncio
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import UTC, datetime
@@ -22,7 +21,7 @@ from sqlalchemy import asc, func, or_
from sqlmodel import col, select
from sse_starlette.sse import EventSourceResponse
from app.core.agent_tokens import generate_agent_token, hash_agent_token, verify_agent_token
from app.core.agent_tokens import verify_agent_token
from app.core.time import utcnow
from app.db import crud
from app.db.pagination import paginate
@@ -50,6 +49,12 @@ from app.services.openclaw.constants import (
DEFAULT_HEARTBEAT_CONFIG,
OFFLINE_AFTER,
)
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import (
OpenClawGatewayError,
@@ -120,17 +125,13 @@ class LeadAgentRequest:
options: LeadAgentOptions = field(default_factory=LeadAgentOptions)
class OpenClawProvisioningService:
class OpenClawProvisioningService(OpenClawDBService):
"""DB-backed provisioning workflows (bulk template sync, lead-agent record)."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
super().__init__(session)
self._gateway = OpenClawGatewayProvisioner()
@property
def session(self) -> AsyncSession:
return self._session
@staticmethod
def lead_session_key(board: Board) -> str:
return f"agent:lead-{board.id}:main"
@@ -191,21 +192,16 @@ class OpenClawProvisioningService:
agent = Agent(
name=config_options.agent_name or self.lead_agent_name(board),
status="provisioning",
board_id=board.id,
gateway_id=request.gateway.id,
is_board_lead=True,
heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(),
identity_profile=merged_identity_profile,
openclaw_session_id=self.lead_session_key(board),
provision_requested_at=utcnow(),
provision_action=config_options.action,
)
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
self.session.add(agent)
await self.session.commit()
await self.session.refresh(agent)
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action=config_options.action, status="provisioning")
await self.add_commit_refresh(agent)
# Strict behavior: provisioning errors surface to the caller. The DB row exists
# so a later retry can succeed with the same deterministic identity/session key.
@@ -220,13 +216,8 @@ class OpenClawProvisioningService:
deliver_wakeup=True,
)
agent.status = "online"
agent.provision_requested_at = None
agent.provision_action = None
agent.updated_at = utcnow()
self.session.add(agent)
await self.session.commit()
await self.session.refresh(agent)
mark_provision_complete(agent, status="online")
await self.add_commit_refresh(agent)
return agent, True
@@ -433,8 +424,7 @@ def _append_sync_error(
async def _rotate_agent_token(session: AsyncSession, agent: Agent) -> str:
token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(token)
token = mint_agent_token(agent)
agent.updated_at = utcnow()
session.add(agent)
await session.commit()
@@ -692,28 +682,11 @@ class AgentUpdateProvisionRequest:
force_bootstrap: bool
class AgentLifecycleService:
class AgentLifecycleService(OpenClawDBService):
"""Async service encapsulating agent lifecycle behavior for API routes."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
self._logger = logging.getLogger(__name__)
@property
def session(self) -> AsyncSession:
return self._session
@session.setter
def session(self, value: AsyncSession) -> None:
self._session = value
@property
def logger(self) -> logging.Logger:
return self._logger
@logger.setter
def logger(self, value: logging.Logger) -> None:
self._logger = value
super().__init__(session)
@staticmethod
def parse_since(value: str | None) -> datetime | None:
@@ -1013,17 +986,10 @@ class AgentLifecycleService:
data: dict[str, Any],
) -> tuple[Agent, str]:
agent = Agent.model_validate(data)
agent.status = "provisioning"
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
if agent.heartbeat_config is None:
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
agent.provision_requested_at = utcnow()
agent.provision_action = "provision"
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="provision", status="provisioning")
agent.openclaw_session_id = self.resolve_session_key(agent)
self.session.add(agent)
await self.session.commit()
await self.session.refresh(agent)
await self.add_commit_refresh(agent)
return agent, raw_token
async def _apply_gateway_provisioning(
@@ -1078,11 +1044,7 @@ class AgentLifecycleService:
deliver_wakeup=True,
wakeup_verb=wakeup_verb,
)
agent.provision_confirm_token_hash = None
agent.provision_requested_at = None
agent.provision_action = None
agent.status = "online"
agent.updated_at = utcnow()
mark_provision_complete(agent, status="online", clear_confirm_token=True)
self.session.add(agent)
await self.session.commit()
record_activity(
@@ -1301,11 +1263,8 @@ class AgentLifecycleService:
@staticmethod
def mark_agent_update_pending(agent: Agent) -> str:
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
agent.provision_requested_at = utcnow()
agent.provision_action = "update"
agent.status = "updating"
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="update", status="updating")
return raw_token
async def provision_updated_agent(
@@ -1379,15 +1338,9 @@ class AgentLifecycleService:
if agent.agent_token_hash is not None:
return
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
if agent.heartbeat_config is None:
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
agent.provision_requested_at = utcnow()
agent.provision_action = "provision"
self.session.add(agent)
await self.session.commit()
await self.session.refresh(agent)
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="provision", status="provisioning")
await self.add_commit_refresh(agent)
board = await self.require_board(
str(agent.board_id) if agent.board_id else None,
user=user,

View File

@@ -2,7 +2,6 @@
from __future__ import annotations
import logging
from collections.abc import Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING
@@ -22,6 +21,7 @@ from app.schemas.gateway_api import (
GatewaySessionsResponse,
GatewaysStatusResponse,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import (
OpenClawGatewayError,
@@ -50,28 +50,11 @@ class GatewayTemplateSyncQuery:
board_id: UUID | None
class GatewaySessionService:
class GatewaySessionService(OpenClawDBService):
"""Read/query gateway runtime session state for user-facing APIs."""
def __init__(self, session: AsyncSession) -> None:
self._session = session
self._logger = logging.getLogger(__name__)
@property
def session(self) -> AsyncSession:
return self._session
@session.setter
def session(self, value: AsyncSession) -> None:
self._session = value
@property
def logger(self) -> logging.Logger:
return self._logger
@logger.setter
def logger(self, value: logging.Logger) -> None:
self._logger = value
super().__init__(session)
@staticmethod
def to_resolve_query(

View File

@@ -2,29 +2,14 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from uuid import UUID, uuid4
from uuid import UUID
from fastapi import HTTPException, status
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services.openclaw.constants import (
_GATEWAY_AGENT_PREFIX,
_GATEWAY_AGENT_SUFFIX,
_GATEWAY_OPENCLAW_AGENT_PREFIX,
)
from app.services.openclaw.gateway_rpc import GatewayConfig as _GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError, ensure_session, send_message
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
GatewayClientConfig = _GatewayClientConfig
# Keep integration exceptions behind the OpenClaw service boundary.
GatewayTransportError = OpenClawGatewayError
class GatewayAgentIdentity:
@@ -45,81 +30,3 @@ class GatewayAgentIdentity:
@classmethod
def openclaw_agent_id(cls, gateway: Gateway) -> str:
return cls.openclaw_agent_id_for_id(gateway.id)
async def optional_gateway_config_for_board(
session: AsyncSession,
board: Board,
) -> GatewayClientConfig | None:
"""Return gateway client config when board has a reachable configured gateway."""
if board.gateway_id is None:
return None
gateway = await Gateway.objects.by_id(board.gateway_id).first(session)
if gateway is None or not gateway.url:
return None
return GatewayClientConfig(url=gateway.url, token=gateway.token)
async def require_gateway_config_for_board(
session: AsyncSession,
board: Board,
) -> tuple[Gateway, GatewayClientConfig]:
"""Resolve board gateway and config, raising 422 when unavailable."""
if board.gateway_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Board is not attached to a gateway",
)
gateway = await Gateway.objects.by_id(board.gateway_id).first(session)
if gateway is None or not gateway.url:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Gateway is not configured for this board",
)
return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token)
async def send_gateway_agent_message(
*,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
deliver: bool = False,
) -> None:
"""Ensure session and dispatch a message to an agent session."""
await ensure_session(session_key, config=config, label=agent_name)
await send_message(message, session_key=session_key, config=config, deliver=deliver)
async def send_gateway_agent_message_safe(
*,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
deliver: bool = False,
) -> GatewayTransportError | None:
"""Best-effort gateway dispatch returning transport error when one occurs."""
try:
await send_gateway_agent_message(
session_key=session_key,
config=config,
agent_name=agent_name,
message=message,
deliver=deliver,
)
except GatewayTransportError as exc:
return exc
return None
def resolve_trace_id(correlation_id: str | None, *, prefix: str) -> str:
"""Resolve a stable trace id from correlation id or generate a scoped fallback."""
normalized = (correlation_id or "").strip()
if normalized:
return normalized
return f"{prefix}:{uuid4().hex[:12]}"
logger = logging.getLogger(__name__)