From 997d21c9131f335f35b093c2600415a1eabe9d13 Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Tue, 10 Feb 2026 23:31:14 +0530 Subject: [PATCH] refactor: update provisioning service references to use OpenClawGatewayProvisioner --- backend/app/api/board_groups.py | 4 +- backend/app/api/board_onboarding.py | 2 +- backend/app/api/boards.py | 4 +- backend/app/api/gateways.py | 4 - .../app/services/openclaw/admin_service.py | 51 +- .../app/services/openclaw/agent_service.py | 360 +++----- .../services/openclaw/coordination_service.py | 6 +- backend/app/services/openclaw/provisioning.py | 800 ++---------------- .../app/services/openclaw/provisioning_db.py | 718 ++++++++++++++++ backend/scripts/sync_gateway_templates.py | 2 +- backend/tests/test_agent_delete_main_agent.py | 10 +- .../tests/test_agent_provisioning_utils.py | 2 +- 12 files changed, 985 insertions(+), 978 deletions(-) create mode 100644 backend/app/services/openclaw/provisioning_db.py diff --git a/backend/app/api/board_groups.py b/backend/app/api/board_groups.py index 2f9371f6..d70dacc2 100644 --- a/backend/app/api/board_groups.py +++ b/backend/app/api/board_groups.py @@ -30,7 +30,7 @@ from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.view_models import BoardGroupSnapshot from app.services.board_group_snapshot import build_group_snapshot from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG -from app.services.openclaw.provisioning import OpenClawProvisioningService +from app.services.openclaw.provisioning import OpenClawGatewayProvisioner from app.services.openclaw.shared import GatewayTransportError from app.services.organizations import ( OrganizationContext, @@ -269,7 +269,7 @@ async def _sync_gateway_heartbeats( failed_agent_ids.extend([agent.id for agent in gateway_agents]) continue try: - await OpenClawProvisioningService().sync_gateway_agent_heartbeats( + await OpenClawGatewayProvisioner().sync_gateway_agent_heartbeats( gateway, gateway_agents, ) diff --git a/backend/app/api/board_onboarding.py b/backend/app/api/board_onboarding.py index 3b91bc12..d96707b4 100644 --- a/backend/app/api/board_onboarding.py +++ b/backend/app/api/board_onboarding.py @@ -35,7 +35,7 @@ from app.schemas.board_onboarding import ( from app.schemas.boards import BoardRead from app.services.openclaw.onboarding_service import BoardOnboardingMessagingService from app.services.openclaw.policies import OpenClawAuthorizationPolicy -from app.services.openclaw.provisioning import ( +from app.services.openclaw.provisioning_db import ( LeadAgentOptions, LeadAgentRequest, OpenClawProvisioningService, diff --git a/backend/app/api/boards.py b/backend/app/api/boards.py index c43fac84..34bf2c52 100644 --- a/backend/app/api/boards.py +++ b/backend/app/api/boards.py @@ -39,7 +39,7 @@ from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.view_models import BoardGroupSnapshot, BoardSnapshot from app.services.board_group_snapshot import build_board_group_snapshot from app.services.board_snapshot import build_board_snapshot -from app.services.openclaw.provisioning import OpenClawProvisioningService +from app.services.openclaw.provisioning import OpenClawGatewayProvisioner from app.services.openclaw.shared import GatewayTransportError from app.services.organizations import OrganizationContext, board_access_filter @@ -287,7 +287,7 @@ async def delete_board( if config: try: for agent in agents: - await OpenClawProvisioningService().delete_agent_lifecycle( + await OpenClawGatewayProvisioner().delete_agent_lifecycle( agent=agent, gateway=config, ) diff --git a/backend/app/api/gateways.py b/backend/app/api/gateways.py index ffb14478..53548396 100644 --- a/backend/app/api/gateways.py +++ b/backend/app/api/gateways.py @@ -70,9 +70,6 @@ async def list_gateways( ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> LimitOffsetPage[GatewayRead]: """List gateways for the caller's organization.""" - service = GatewayAdminLifecycleService(session) - gateways = await Gateway.objects.filter_by(organization_id=ctx.organization.id).all(session) - await service.ensure_gateway_agents_exist(gateways) statement = ( Gateway.objects.filter_by(organization_id=ctx.organization.id) .order_by(col(Gateway.created_at).desc()) @@ -111,7 +108,6 @@ async def get_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) - await service.ensure_gateway_agents_exist([gateway]) return gateway diff --git a/backend/app/services/openclaw/admin_service.py b/backend/app/services/openclaw/admin_service.py index 518810e7..62633734 100644 --- a/backend/app/services/openclaw/admin_service.py +++ b/backend/app/services/openclaw/admin_service.py @@ -26,7 +26,8 @@ from app.models.gateways import Gateway from app.models.tasks import Task from app.schemas.gateways import GatewayTemplatesSyncResult from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG -from app.services.openclaw.provisioning import ( +from app.services.openclaw.provisioning import OpenClawGatewayProvisioner +from app.services.openclaw.provisioning_db import ( GatewayTemplateSyncOptions, OpenClawProvisioningService, ) @@ -203,6 +204,11 @@ class GatewayAdminLifecycleService: self.session, organization_id=gateway.organization_id, ) + if template_user is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Organization owner not found (required for gateway agent USER.md rendering).", + ) raw_token = generate_agent_token() agent.agent_token_hash = hash_agent_token(raw_token) agent.provision_requested_at = utcnow() @@ -215,8 +221,9 @@ class GatewayAdminLifecycleService: await self.session.refresh(agent) if not gateway.url: return agent + try: - await OpenClawProvisioningService().apply_agent_lifecycle( + await OpenClawGatewayProvisioner().apply_agent_lifecycle( agent=agent, gateway=gateway, board=None, @@ -226,19 +233,17 @@ class GatewayAdminLifecycleService: wake=notify, deliver_wakeup=True, ) - self.logger.info( - "gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s", - gateway.id, - agent.id, - action, - ) except OpenClawGatewayError as exc: - self.logger.warning( + self.logger.error( "gateway.main_agent.provision_failed_gateway gateway_id=%s agent_id=%s error=%s", gateway.id, agent.id, str(exc), ) + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Gateway {action} failed: {exc}", + ) from exc except (OSError, RuntimeError, ValueError) as exc: self.logger.error( "gateway.main_agent.provision_failed gateway_id=%s agent_id=%s error=%s", @@ -246,15 +251,25 @@ class GatewayAdminLifecycleService: agent.id, str(exc), ) - except Exception as exc: # pragma: no cover - defensive fallback - self.logger.critical( - "gateway.main_agent.provision_failed_unexpected gateway_id=%s agent_id=%s " - "error_type=%s error=%s", - gateway.id, - agent.id, - exc.__class__.__name__, - str(exc), - ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Unexpected error {action}ing gateway provisioning.", + ) from exc + + agent.status = "online" + agent.provision_requested_at = None + agent.provision_action = None + agent.updated_at = utcnow() + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + + self.logger.info( + "gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s", + gateway.id, + agent.id, + action, + ) return agent async def ensure_main_agent( diff --git a/backend/app/services/openclaw/agent_service.py b/backend/app/services/openclaw/agent_service.py index 559c9a36..cbd018f9 100644 --- a/backend/app/services/openclaw/agent_service.py +++ b/backend/app/services/openclaw/agent_service.py @@ -6,7 +6,6 @@ import asyncio import json import logging import re -from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import UTC, datetime from typing import TYPE_CHECKING, Any, Literal, Protocol @@ -49,11 +48,12 @@ from app.services.openclaw.constants import ( OFFLINE_AFTER, ) from app.services.openclaw.policies import OpenClawAuthorizationPolicy -from app.services.openclaw.provisioning import OpenClawProvisioningService +from app.services.openclaw.provisioning import OpenClawGatewayProvisioner from app.services.openclaw.shared import GatewayAgentIdentity from app.services.organizations import ( OrganizationContext, get_active_membership, + get_org_owner_user, has_board_access, is_org_admin, list_accessible_board_ids, @@ -95,7 +95,6 @@ class AgentUpdateProvisionTarget: is_main_agent: bool board: Board | None gateway: Gateway - client_config: GatewayClientConfig @dataclass(frozen=True, slots=True) @@ -108,175 +107,6 @@ class AgentUpdateProvisionRequest: force_bootstrap: bool -class AbstractProvisionExecution(ABC): - """Shared async execution contract for board/main agent provisioning actions.""" - - def __init__( - self, - *, - service: AgentLifecycleService, - agent: Agent, - provision_request: AgentUpdateProvisionRequest, - action: str, - wakeup_verb: str, - raise_gateway_errors: bool, - ) -> None: - self._service = service - self._agent = agent - self._request = provision_request - self._action = action - self._wakeup_verb = wakeup_verb - self._raise_gateway_errors = raise_gateway_errors - - @property - def agent(self) -> Agent: - return self._agent - - @agent.setter - def agent(self, value: Agent) -> None: - if not isinstance(value, Agent): - msg = "agent must be an Agent model" - raise TypeError(msg) - self._agent = value - - @property - def request(self) -> AgentUpdateProvisionRequest: - return self._request - - @request.setter - def request(self, value: AgentUpdateProvisionRequest) -> None: - if not isinstance(value, AgentUpdateProvisionRequest): - msg = "request must be an AgentUpdateProvisionRequest" - raise TypeError(msg) - self._request = value - - @property - def logger(self) -> logging.Logger: - return self._service.logger - - @abstractmethod - async def _provision(self) -> None: - raise NotImplementedError - - async def execute(self) -> None: - self.logger.log( - 5, - "agent.provision.start action=%s agent_id=%s target_main=%s", - self._action, - self.agent.id, - self.request.target.is_main_agent, - ) - try: - await self._provision() - self.agent.provision_confirm_token_hash = None - self.agent.provision_requested_at = None - self.agent.provision_action = None - self.agent.status = "online" - self.agent.updated_at = utcnow() - self._service.session.add(self.agent) - await self._service.session.commit() - record_activity( - self._service.session, - event_type=f"agent.{self._action}.direct", - message=f"{self._action.capitalize()}d directly for {self.agent.name}.", - agent_id=self.agent.id, - ) - record_activity( - self._service.session, - event_type="agent.wakeup.sent", - message=f"Wakeup message sent to {self.agent.name}.", - agent_id=self.agent.id, - ) - await self._service.session.commit() - self.logger.info( - "agent.provision.success action=%s agent_id=%s", - self._action, - self.agent.id, - ) - except OpenClawGatewayError as exc: - self._service.record_instruction_failure( - self._service.session, - self.agent, - str(exc), - self._action, - ) - await self._service.session.commit() - self.logger.error( - "agent.provision.gateway_error action=%s agent_id=%s error=%s", - self._action, - self.agent.id, - str(exc), - ) - if self._raise_gateway_errors: - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail=f"Gateway {self._action} failed: {exc}", - ) from exc - except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover - self._service.record_instruction_failure( - self._service.session, - self.agent, - str(exc), - self._action, - ) - await self._service.session.commit() - self.logger.critical( - "agent.provision.runtime_error action=%s agent_id=%s error=%s", - self._action, - self.agent.id, - str(exc), - ) - if self._raise_gateway_errors: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Unexpected error {self._action}ing agent provisioning.", - ) from exc - - -class BoardAgentProvisionExecution(AbstractProvisionExecution): - """Provision execution for board-scoped agents.""" - - async def _provision(self) -> None: - board = self.request.target.board - if board is None: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="board is required for non-main agent provisioning", - ) - await OpenClawProvisioningService().apply_agent_lifecycle( - agent=self.agent, - gateway=self.request.target.gateway, - board=board, - auth_token=self.request.raw_token, - user=self.request.user, - action=self._action, - force_bootstrap=self.request.force_bootstrap, - reset_session=True, - wake=True, - deliver_wakeup=True, - wakeup_verb=self._wakeup_verb, - ) - - -class MainAgentProvisionExecution(AbstractProvisionExecution): - """Provision execution for gateway-main agents.""" - - async def _provision(self) -> None: - await OpenClawProvisioningService().apply_agent_lifecycle( - agent=self.agent, - gateway=self.request.target.gateway, - board=None, - auth_token=self.request.raw_token, - user=self.request.user, - action=self._action, - force_bootstrap=self.request.force_bootstrap, - reset_session=True, - wake=True, - deliver_wakeup=True, - wakeup_verb=self._wakeup_verb, - ) - - class AgentLifecycleService: """Async service encapsulating agent lifecycle behavior for API routes.""" @@ -611,6 +441,122 @@ class AgentLifecycleService: await self.session.refresh(agent) return agent, raw_token + async def _apply_gateway_provisioning( + self, + *, + agent: Agent, + target: AgentUpdateProvisionTarget, + auth_token: str, + user: User | None, + action: str, + wakeup_verb: str, + force_bootstrap: bool, + raise_gateway_errors: bool, + ) -> None: + self.logger.log( + 5, + "agent.provision.start action=%s agent_id=%s target_main=%s", + action, + agent.id, + target.is_main_agent, + ) + try: + if not target.is_main_agent and target.board is None: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="board is required for non-main agent provisioning", + ) + template_user = user + if target.is_main_agent and template_user is None: + template_user = await get_org_owner_user( + self.session, + organization_id=target.gateway.organization_id, + ) + if template_user is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=( + "User context is required to provision the gateway main agent " + "(org owner not found)." + ), + ) + await OpenClawGatewayProvisioner().apply_agent_lifecycle( + agent=agent, + gateway=target.gateway, + board=target.board if not target.is_main_agent else None, + auth_token=auth_token, + user=template_user, + action=action, + force_bootstrap=force_bootstrap, + reset_session=True, + wake=True, + deliver_wakeup=True, + wakeup_verb=wakeup_verb, + ) + agent.provision_confirm_token_hash = None + agent.provision_requested_at = None + agent.provision_action = None + agent.status = "online" + agent.updated_at = utcnow() + self.session.add(agent) + await self.session.commit() + record_activity( + self.session, + event_type=f"agent.{action}.direct", + message=f"{action.capitalize()}d directly for {agent.name}.", + agent_id=agent.id, + ) + record_activity( + self.session, + event_type="agent.wakeup.sent", + message=f"Wakeup message sent to {agent.name}.", + agent_id=agent.id, + ) + await self.session.commit() + self.logger.info( + "agent.provision.success action=%s agent_id=%s", + action, + agent.id, + ) + except OpenClawGatewayError as exc: + self.record_instruction_failure( + self.session, + agent, + str(exc), + action, + ) + await self.session.commit() + self.logger.error( + "agent.provision.gateway_error action=%s agent_id=%s error=%s", + action, + agent.id, + str(exc), + ) + if raise_gateway_errors: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Gateway {action} failed: {exc}", + ) from exc + except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover + self.record_instruction_failure( + self.session, + agent, + str(exc), + action, + ) + await self.session.commit() + self.logger.critical( + "agent.provision.runtime_error action=%s agent_id=%s error=%s", + action, + agent.id, + str(exc), + ) + if raise_gateway_errors: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Unexpected error {action}ing agent provisioning.", + ) from exc + async def provision_new_agent( self, *, @@ -620,27 +566,17 @@ class AgentLifecycleService: auth_token: str, user: User | None, force_bootstrap: bool, - client_config: GatewayClientConfig, ) -> None: - execution = BoardAgentProvisionExecution( - service=self, + await self._apply_gateway_provisioning( agent=agent, - provision_request=AgentUpdateProvisionRequest( - target=AgentUpdateProvisionTarget( - is_main_agent=False, - board=board, - gateway=gateway, - client_config=client_config, - ), - raw_token=auth_token, - user=user, - force_bootstrap=force_bootstrap, - ), + target=AgentUpdateProvisionTarget(is_main_agent=False, board=board, gateway=gateway), + auth_token=auth_token, + user=user, action="provision", wakeup_verb="provisioned", - raise_gateway_errors=False, + force_bootstrap=force_bootstrap, + raise_gateway_errors=True, ) - await execution.execute() async def validate_agent_update_inputs( self, @@ -756,7 +692,6 @@ class AgentLifecycleService: is_main_agent=True, board=None, gateway=gateway_for_main, - client_config=self.gateway_client_config(gateway_for_main), ) if make_main is None and agent.board_id is None and main_gateway is not None: @@ -764,7 +699,6 @@ class AgentLifecycleService: is_main_agent=True, board=None, gateway=main_gateway, - client_config=self.gateway_client_config(main_gateway), ) if agent.board_id is None: @@ -773,12 +707,11 @@ class AgentLifecycleService: detail="board_id is required for non-main agents", ) board = await self.require_board(agent.board_id) - gateway, client_config = await self.require_gateway(board) + gateway, _client_config = await self.require_gateway(board) return AgentUpdateProvisionTarget( is_main_agent=False, board=board, gateway=gateway, - client_config=client_config, ) @staticmethod @@ -796,26 +729,16 @@ class AgentLifecycleService: agent: Agent, request: AgentUpdateProvisionRequest, ) -> None: - execution: AbstractProvisionExecution - if request.target.is_main_agent: - execution = MainAgentProvisionExecution( - service=self, - agent=agent, - provision_request=request, - action="update", - wakeup_verb="updated", - raise_gateway_errors=True, - ) - else: - execution = BoardAgentProvisionExecution( - service=self, - agent=agent, - provision_request=request, - action="update", - wakeup_verb="updated", - raise_gateway_errors=True, - ) - await execution.execute() + await self._apply_gateway_provisioning( + agent=agent, + target=request.target, + auth_token=request.raw_token, + user=request.user, + action="update", + wakeup_verb="updated", + force_bootstrap=request.force_bootstrap, + raise_gateway_errors=True, + ) @staticmethod def heartbeat_lookup_statement(payload: AgentHeartbeatCreate) -> SelectOfScalar[Agent]: @@ -841,7 +764,7 @@ class AgentLifecycleService: user=actor.user, write=True, ) - gateway, client_config = await self.require_gateway(board) + gateway, _client_config = await self.require_gateway(board) data: dict[str, Any] = { "name": payload.name, "board_id": board.id, @@ -856,7 +779,6 @@ class AgentLifecycleService: auth_token=raw_token, user=actor.user, force_bootstrap=False, - client_config=client_config, ) return agent @@ -886,7 +808,7 @@ class AgentLifecycleService: user=user, write=True, ) - gateway, client_config = await self.require_gateway(board) + gateway, _client_config = await self.require_gateway(board) await self.provision_new_agent( agent=agent, board=board, @@ -894,7 +816,6 @@ class AgentLifecycleService: auth_token=raw_token, user=user, force_bootstrap=False, - client_config=client_config, ) async def ensure_heartbeat_session_key( @@ -1046,7 +967,7 @@ class AgentLifecycleService: user=actor.user if actor.actor_type == "user" else None, write=actor.actor_type == "user", ) - gateway, client_config = await self.require_gateway(board) + gateway, _client_config = await self.require_gateway(board) data = payload.model_dump() data["gateway_id"] = gateway.id requested_name = (data.get("name") or "").strip() @@ -1063,7 +984,6 @@ class AgentLifecycleService: auth_token=raw_token, user=actor.user if actor.actor_type == "user" else None, force_bootstrap=False, - client_config=client_config, ) self.logger.info("agent.create.success agent_id=%s board_id=%s", agent.id, board.id) return self.to_agent_read(self.with_computed_status(agent)) @@ -1224,7 +1144,7 @@ class AgentLifecycleService: if gateway and gateway.url: client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) try: - workspace_path = await OpenClawProvisioningService().delete_agent_lifecycle( + workspace_path = await OpenClawGatewayProvisioner().delete_agent_lifecycle( agent=agent, gateway=gateway, ) @@ -1246,7 +1166,7 @@ class AgentLifecycleService: board = await self.require_board(str(agent.board_id)) gateway, client_config = await self.require_gateway(board) try: - workspace_path = await OpenClawProvisioningService().delete_agent_lifecycle( + workspace_path = await OpenClawGatewayProvisioner().delete_agent_lifecycle( agent=agent, gateway=gateway, ) diff --git a/backend/app/services/openclaw/coordination_service.py b/backend/app/services/openclaw/coordination_service.py index 3233f152..1aefd238 100644 --- a/backend/app/services/openclaw/coordination_service.py +++ b/backend/app/services/openclaw/coordination_service.py @@ -36,7 +36,7 @@ from app.services.openclaw.exceptions import ( ) from app.services.openclaw.internal import agent_key, with_coordination_gateway_retry from app.services.openclaw.policies import OpenClawAuthorizationPolicy -from app.services.openclaw.provisioning import ( +from app.services.openclaw.provisioning_db import ( LeadAgentOptions, LeadAgentRequest, OpenClawProvisioningService, @@ -542,7 +542,9 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): board: Board, message: str, ) -> tuple[Agent, bool]: - lead, lead_created = await OpenClawProvisioningService(self.session).ensure_board_lead_agent( + lead, lead_created = await OpenClawProvisioningService( + self.session + ).ensure_board_lead_agent( request=LeadAgentRequest( board=board, gateway=gateway, diff --git a/backend/app/services/openclaw/provisioning.py b/backend/app/services/openclaw/provisioning.py index 1eb05290..63146fe4 100644 --- a/backend/app/services/openclaw/provisioning.py +++ b/backend/app/services/openclaw/provisioning.py @@ -1,25 +1,23 @@ -"""Provisioning, template sync, and board-lead lifecycle orchestration.""" +"""Gateway-only provisioning and lifecycle orchestration. + +This module is the low-level layer that talks to the OpenClaw gateway RPC surface. +DB-backed workflows (template sync, lead-agent record creation) live in +`app.services.openclaw.provisioning_db`. +""" from __future__ import annotations -import asyncio import json import re from abc import ABC, abstractmethod -from collections.abc import Awaitable, Callable -from contextlib import suppress -from dataclasses import dataclass, field +from dataclasses import dataclass from pathlib import Path -from typing import TYPE_CHECKING, Any, TypeVar -from uuid import UUID, uuid4 +from typing import TYPE_CHECKING, Any +from uuid import uuid4 from jinja2 import Environment, FileSystemLoader, StrictUndefined, select_autoescape -from sqlalchemy import func -from sqlmodel import col, select -from app.core.agent_tokens import generate_agent_token, hash_agent_token, verify_agent_token from app.core.config import settings -from app.core.time import utcnow from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig from app.integrations.openclaw_gateway import ( OpenClawGatewayError, @@ -28,15 +26,9 @@ from app.integrations.openclaw_gateway import ( send_message, ) from app.models.agents import Agent -from app.models.board_memory import BoardMemory from app.models.boards import Board from app.models.gateways import Gateway -from app.schemas.gateways import GatewayTemplatesSyncError, GatewayTemplatesSyncResult from app.services.openclaw.constants import ( - _NON_TRANSIENT_GATEWAY_ERROR_MARKERS, - _SECURE_RANDOM, - _TOOLS_KV_RE, - _TRANSIENT_GATEWAY_ERROR_MARKERS, DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY, DEFAULT_GATEWAY_FILES, DEFAULT_HEARTBEAT_CONFIG, @@ -50,11 +42,8 @@ from app.services.openclaw.constants import ( ) from app.services.openclaw.internal import agent_key as _agent_key from app.services.openclaw.shared import GatewayAgentIdentity -from app.services.organizations import get_org_owner_user if TYPE_CHECKING: - from sqlmodel.ext.asyncio.session import AsyncSession - from app.models.users import User @@ -64,7 +53,21 @@ class ProvisionOptions: action: str = "provision" force_bootstrap: bool = False - reset_session: bool = False + + +def _is_missing_session_error(exc: OpenClawGatewayError) -> bool: + message = str(exc).lower() + if not message: + return False + return any( + marker in message + for marker in ( + "not found", + "unknown session", + "no such session", + "session does not exist", + ) + ) def _repo_root() -> Path: @@ -295,9 +298,11 @@ def _render_agent_files( else _heartbeat_template_name(agent) ) heartbeat_path = _templates_root() / heartbeat_template - if heartbeat_path.exists(): - rendered[name] = env.get_template(heartbeat_template).render(**context).strip() - continue + if not heartbeat_path.exists(): + msg = f"Missing template file: {heartbeat_template}" + raise FileNotFoundError(msg) + rendered[name] = env.get_template(heartbeat_template).render(**context).strip() + continue override = overrides.get(name) if override: rendered[name] = env.from_string(override).render(**context).strip() @@ -306,14 +311,10 @@ def _render_agent_files( template_overrides[name] if template_overrides and name in template_overrides else name ) path = _templates_root() / template_name - if path.exists(): - rendered[name] = env.get_template(template_name).render(**context).strip() - continue - if name == "MEMORY.md": - # Back-compat fallback for gateways that do not ship MEMORY.md. - rendered[name] = "# MEMORY\n\nBootstrap pending.\n" - continue - rendered[name] = "" + if not path.exists(): + msg = f"Missing template file: {template_name}" + raise FileNotFoundError(msg) + rendered[name] = env.get_template(template_name).render(**context).strip() return rendered @@ -330,6 +331,10 @@ class GatewayAgentRegistration: class GatewayControlPlane(ABC): """Abstract gateway runtime interface used by agent lifecycle managers.""" + @abstractmethod + async def health(self) -> object: + raise NotImplementedError + @abstractmethod async def ensure_agent_session(self, session_key: str, *, label: str | None = None) -> None: raise NotImplementedError @@ -354,6 +359,10 @@ class GatewayControlPlane(ABC): async def list_agent_files(self, agent_id: str) -> dict[str, dict[str, Any]]: raise NotImplementedError + @abstractmethod + async def get_agent_file_payload(self, *, agent_id: str, name: str) -> object: + raise NotImplementedError + @abstractmethod async def set_agent_file(self, *, agent_id: str, name: str, content: str) -> None: raise NotImplementedError @@ -372,6 +381,9 @@ class OpenClawGatewayControlPlane(GatewayControlPlane): def __init__(self, config: GatewayClientConfig) -> None: self._config = config + async def health(self) -> object: + return await openclaw_call("health", config=self._config) + async def ensure_agent_session(self, session_key: str, *, label: str | None = None) -> None: if not session_key: return @@ -389,7 +401,7 @@ class OpenClawGatewayControlPlane(GatewayControlPlane): async def upsert_agent(self, registration: GatewayAgentRegistration) -> None: # Prefer an idempotent "create then update" flow. - # - Avoids a dependency on `agents.list` (which may surface gateway defaults like `main`). + # - Avoids enumerating gateway agents for existence checks. # - Ensures we always hit the "create" RPC first, per lifecycle expectations. try: await openclaw_call( @@ -402,7 +414,9 @@ class OpenClawGatewayControlPlane(GatewayControlPlane): ) except OpenClawGatewayError as exc: message = str(exc).lower() - if not any(marker in message for marker in ("already", "exist", "duplicate", "conflict")): + if not any( + marker in message for marker in ("already", "exist", "duplicate", "conflict") + ): raise await openclaw_call( "agents.update", @@ -446,6 +460,13 @@ class OpenClawGatewayControlPlane(GatewayControlPlane): index[name] = dict(item) return index + async def get_agent_file_payload(self, *, agent_id: str, name: str) -> object: + return await openclaw_call( + "agents.files.get", + {"agentId": agent_id, "name": name}, + config=self._config, + ) + async def set_agent_file(self, *, agent_id: str, name: str, content: str) -> None: await openclaw_call( "agents.files.set", @@ -654,10 +675,6 @@ class BaseAgentLifecycleManager(ABC): existing_files=existing_files, action=options.action, ) - if options.reset_session: - # Session resets are useful but should never block file sync. - with suppress(OpenClawGatewayError): - await self._control_plane.reset_agent_session(session_key) class BoardAgentLifecycleManager(BaseAgentLifecycleManager): @@ -752,21 +769,8 @@ def _wakeup_text(agent: Agent, *, verb: str) -> str: ) -class OpenClawProvisioningService: - """High-level agent provisioning interface (create -> files -> wake). - - This is the public entrypoint for agent lifecycle orchestration. Internals are - implemented as module-private helpers and lifecycle manager classes. - """ - - def __init__(self, session: AsyncSession | None = None) -> None: - self._session = session - - def _require_session(self) -> AsyncSession: - if self._session is None: - msg = "AsyncSession is required for this operation" - raise ValueError(msg) - return self._session +class OpenClawGatewayProvisioner: + """Gateway-only agent lifecycle interface (create -> files -> wake).""" async def sync_gateway_agent_heartbeats(self, gateway: Gateway, agents: list[Agent]) -> None: """Sync current Agent.heartbeat_config values to the gateway config.""" @@ -807,7 +811,8 @@ class OpenClawProvisioningService: """ if not gateway.url: - return + msg = "Gateway url is required" + raise ValueError(msg) # Guard against accidental main-agent provisioning without a board. if board is None and getattr(agent, "board_id", None) is not None: @@ -816,7 +821,9 @@ class OpenClawProvisioningService: # Resolve session key and agent type. if board is None: - session_key = (agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "").strip() + session_key = ( + agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "" + ).strip() if not session_key: msg = "gateway main agent session_key is required" raise ValueError(msg) @@ -833,17 +840,16 @@ class OpenClawProvisioningService: session_key=session_key, auth_token=auth_token, user=user, - options=ProvisionOptions( - action=action, - force_bootstrap=force_bootstrap, - reset_session=False, # handled below - ), + options=ProvisionOptions(action=action, force_bootstrap=force_bootstrap), session_label=agent.name or "Gateway Agent", ) if reset_session: - with suppress(OpenClawGatewayError): + try: await control_plane.reset_agent_session(session_key) + except OpenClawGatewayError as exc: + if not _is_missing_session_error(exc): + raise if not wake: return @@ -869,7 +875,8 @@ class OpenClawProvisioningService: """Remove agent runtime state from the gateway (agent + optional session).""" if not gateway.url: - return None + msg = "Gateway url is required" + raise ValueError(msg) if not gateway.workspace_root: msg = "gateway_workspace_root is required" raise ValueError(msg) @@ -885,671 +892,16 @@ class OpenClawProvisioningService: if delete_session: if agent.board_id is None: - session_key = (agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "").strip() + session_key = ( + agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "" + ).strip() else: session_key = _session_key(agent) if session_key: - with suppress(OpenClawGatewayError): + try: await control_plane.delete_agent_session(session_key) + except OpenClawGatewayError as exc: + if not _is_missing_session_error(exc): + raise return workspace_path - - async def sync_gateway_templates( - self, - gateway: Gateway, - options: GatewayTemplateSyncOptions, - ) -> GatewayTemplatesSyncResult: - """Synchronize AGENTS/TOOLS/etc templates to gateway-connected agents.""" - session = self._require_session() - template_user = options.user - if template_user is None: - template_user = await get_org_owner_user( - session, - organization_id=gateway.organization_id, - ) - options = GatewayTemplateSyncOptions( - user=template_user, - include_main=options.include_main, - reset_sessions=options.reset_sessions, - rotate_tokens=options.rotate_tokens, - force_bootstrap=options.force_bootstrap, - board_id=options.board_id, - ) - result = _base_result( - gateway, - include_main=options.include_main, - reset_sessions=options.reset_sessions, - ) - if not gateway.url: - _append_sync_error( - result, - message="Gateway URL is not configured for this gateway.", - ) - return result - - ctx = _SyncContext( - session=session, - gateway=gateway, - config=GatewayClientConfig(url=gateway.url, token=gateway.token), - backoff=_GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"), - options=options, - provisioner=self, - ) - if not await _ping_gateway(ctx, result): - return result - - boards = await Board.objects.filter_by(gateway_id=gateway.id).all(session) - boards_by_id = _boards_by_id(boards, board_id=options.board_id) - if boards_by_id is None: - _append_sync_error( - result, - message="Board does not belong to this gateway.", - ) - return result - paused_board_ids = await _paused_board_ids(session, list(boards_by_id.keys())) - if boards_by_id: - agents = await ( - Agent.objects.by_field_in("board_id", list(boards_by_id.keys())) - .order_by(col(Agent.created_at).asc()) - .all(session) - ) - else: - agents = [] - - stop_sync = False - for agent in agents: - board = boards_by_id.get(agent.board_id) if agent.board_id is not None else None - if board is None: - result.agents_skipped += 1 - _append_sync_error( - result, - agent=agent, - message="Skipping agent: board not found for agent.", - ) - continue - if board.id in paused_board_ids: - result.agents_skipped += 1 - continue - stop_sync = await _sync_one_agent(ctx, result, agent, board) - if stop_sync: - break - - if not stop_sync and options.include_main: - await _sync_main_agent(ctx, result) - return result - - @staticmethod - def lead_session_key(board: Board) -> str: - """Return the deterministic session key for a board lead agent.""" - return f"agent:lead-{board.id}:main" - - @staticmethod - def lead_agent_name(_: Board) -> str: - """Return the default display name for board lead agents.""" - return "Lead Agent" - - async def ensure_board_lead_agent( - self, - *, - request: LeadAgentRequest, - ) -> tuple[Agent, bool]: - """Ensure a board has a lead agent; return `(agent, created)`.""" - session = self._require_session() - board = request.board - config_options = request.options - existing = ( - await session.exec( - select(Agent) - .where(Agent.board_id == board.id) - .where(col(Agent.is_board_lead).is_(True)), - ) - ).first() - if existing: - desired_name = config_options.agent_name or self.lead_agent_name(board) - changed = False - if existing.name != desired_name: - existing.name = desired_name - changed = True - if existing.gateway_id != request.gateway.id: - existing.gateway_id = request.gateway.id - changed = True - desired_session_key = self.lead_session_key(board) - if existing.openclaw_session_id != desired_session_key: - existing.openclaw_session_id = desired_session_key - changed = True - if changed: - existing.updated_at = utcnow() - session.add(existing) - await session.commit() - await session.refresh(existing) - return existing, False - - merged_identity_profile: dict[str, Any] = { - "role": "Board Lead", - "communication_style": "direct, concise, practical", - "emoji": ":gear:", - } - if config_options.identity_profile: - merged_identity_profile.update( - { - key: value.strip() - for key, value in config_options.identity_profile.items() - if value.strip() - }, - ) - - agent = Agent( - name=config_options.agent_name or self.lead_agent_name(board), - status="provisioning", - board_id=board.id, - gateway_id=request.gateway.id, - is_board_lead=True, - heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), - identity_profile=merged_identity_profile, - openclaw_session_id=self.lead_session_key(board), - provision_requested_at=utcnow(), - provision_action=config_options.action, - ) - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - session.add(agent) - await session.commit() - await session.refresh(agent) - - try: - await self.apply_agent_lifecycle( - agent=agent, - gateway=request.gateway, - board=board, - auth_token=raw_token, - user=request.user, - action=config_options.action, - wake=True, - deliver_wakeup=True, - ) - except OpenClawGatewayError: - # Best-effort provisioning. The board/agent rows should still exist. - pass - - return agent, True - - -_T = TypeVar("_T") - - -@dataclass(frozen=True) -class GatewayTemplateSyncOptions: - """Runtime options controlling gateway template synchronization.""" - - user: User | None - include_main: bool = True - reset_sessions: bool = False - rotate_tokens: bool = False - force_bootstrap: bool = False - board_id: UUID | None = None - - -@dataclass(frozen=True) -class _SyncContext: - """Shared state passed to sync helper functions.""" - - session: AsyncSession - gateway: Gateway - config: GatewayClientConfig - backoff: _GatewayBackoff - options: GatewayTemplateSyncOptions - provisioner: OpenClawProvisioningService - - -def _is_transient_gateway_error(exc: Exception) -> bool: - if not isinstance(exc, OpenClawGatewayError): - return False - message = str(exc).lower() - if not message: - return False - if any(marker in message for marker in _NON_TRANSIENT_GATEWAY_ERROR_MARKERS): - return False - return ("503" in message and "websocket" in message) or any( - marker in message for marker in _TRANSIENT_GATEWAY_ERROR_MARKERS - ) - - -def _gateway_timeout_message( - exc: OpenClawGatewayError, - *, - timeout_s: float, - context: str, -) -> str: - rounded_timeout = int(timeout_s) - timeout_text = f"{rounded_timeout} seconds" - if rounded_timeout >= 120: - timeout_text = f"{rounded_timeout // 60} minutes" - return f"Gateway unreachable after {timeout_text} ({context} timeout). Last error: {exc}" - - -class _GatewayBackoff: - def __init__( - self, - *, - timeout_s: float = 10 * 60, - base_delay_s: float = 0.75, - max_delay_s: float = 30.0, - jitter: float = 0.2, - timeout_context: str = "gateway operation", - ) -> None: - self._timeout_s = timeout_s - self._base_delay_s = base_delay_s - self._max_delay_s = max_delay_s - self._jitter = jitter - self._timeout_context = timeout_context - self._delay_s = base_delay_s - - def reset(self) -> None: - self._delay_s = self._base_delay_s - - @staticmethod - async def _attempt( - fn: Callable[[], Awaitable[_T]], - ) -> tuple[_T | None, OpenClawGatewayError | None]: - try: - return await fn(), None - except OpenClawGatewayError as exc: - return None, exc - - async def run(self, fn: Callable[[], Awaitable[_T]]) -> _T: - # Use per-call deadlines so long-running syncs can still tolerate a later - # gateway restart without having an already-expired retry window. - deadline_s = asyncio.get_running_loop().time() + self._timeout_s - while True: - value, error = await self._attempt(fn) - if error is not None: - exc = error - if not _is_transient_gateway_error(exc): - raise exc - now = asyncio.get_running_loop().time() - remaining = deadline_s - now - if remaining <= 0: - raise TimeoutError( - _gateway_timeout_message( - exc, - timeout_s=self._timeout_s, - context=self._timeout_context, - ), - ) from exc - - sleep_s = min(self._delay_s, remaining) - if self._jitter: - sleep_s *= 1.0 + _SECURE_RANDOM.uniform( - -self._jitter, - self._jitter, - ) - sleep_s = max(0.0, min(sleep_s, remaining)) - await asyncio.sleep(sleep_s) - self._delay_s = min(self._delay_s * 2.0, self._max_delay_s) - continue - self.reset() - if value is None: - msg = "Gateway retry produced no value without an error" - raise RuntimeError(msg) - return value - - -async def _with_gateway_retry( - fn: Callable[[], Awaitable[_T]], - *, - backoff: _GatewayBackoff, -) -> _T: - return await backoff.run(fn) - - -def _parse_tools_md(content: str) -> dict[str, str]: - values: dict[str, str] = {} - for raw in content.splitlines(): - line = raw.strip() - if not line or line.startswith("#"): - continue - match = _TOOLS_KV_RE.match(line) - if not match: - continue - values[match.group("key")] = match.group("value").strip() - return values - - -async def _get_agent_file( - *, - agent_gateway_id: str, - name: str, - config: GatewayClientConfig, - backoff: _GatewayBackoff | None = None, -) -> str | None: - try: - - async def _do_get() -> object: - return await openclaw_call( - "agents.files.get", - {"agentId": agent_gateway_id, "name": name}, - config=config, - ) - - payload = await (backoff.run(_do_get) if backoff else _do_get()) - except OpenClawGatewayError: - return None - if isinstance(payload, str): - return payload - if isinstance(payload, dict): - content = payload.get("content") - if isinstance(content, str): - return content - file_obj = payload.get("file") - if isinstance(file_obj, dict): - nested = file_obj.get("content") - if isinstance(nested, str): - return nested - return None - - -async def _get_existing_auth_token( - *, - agent_gateway_id: str, - config: GatewayClientConfig, - backoff: _GatewayBackoff | None = None, -) -> str | None: - tools = await _get_agent_file( - agent_gateway_id=agent_gateway_id, - name="TOOLS.md", - config=config, - backoff=backoff, - ) - if not tools: - return None - values = _parse_tools_md(tools) - token = values.get("AUTH_TOKEN") - if not token: - return None - token = token.strip() - return token or None - - -async def _paused_board_ids(session: AsyncSession, board_ids: list[UUID]) -> set[UUID]: - if not board_ids: - return set() - - commands = {"/pause", "/resume"} - statement = ( - select(BoardMemory.board_id, BoardMemory.content) - .where(col(BoardMemory.board_id).in_(board_ids)) - .where(col(BoardMemory.is_chat).is_(True)) - .where(func.lower(func.trim(col(BoardMemory.content))).in_(commands)) - .order_by(col(BoardMemory.board_id), col(BoardMemory.created_at).desc()) - # Postgres: DISTINCT ON (board_id) to get latest command per board. - .distinct(col(BoardMemory.board_id)) - ) - - paused: set[UUID] = set() - for board_id, content in await session.exec(statement): - cmd = (content or "").strip().lower() - if cmd == "/pause": - paused.add(board_id) - return paused - - -def _append_sync_error( - result: GatewayTemplatesSyncResult, - *, - message: str, - agent: Agent | None = None, - board: Board | None = None, -) -> None: - result.errors.append( - GatewayTemplatesSyncError( - agent_id=agent.id if agent else None, - agent_name=agent.name if agent else None, - board_id=board.id if board else None, - message=message, - ), - ) - - -async def _rotate_agent_token(session: AsyncSession, agent: Agent) -> str: - token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(token) - agent.updated_at = utcnow() - session.add(agent) - await session.commit() - await session.refresh(agent) - return token - - -async def _ping_gateway(ctx: _SyncContext, result: GatewayTemplatesSyncResult) -> bool: - try: - - async def _do_ping() -> object: - # Use a lightweight health probe; avoid enumerating gateway agents. - return await openclaw_call("health", config=ctx.config) - - await ctx.backoff.run(_do_ping) - except (TimeoutError, OpenClawGatewayError) as exc: - _append_sync_error(result, message=str(exc)) - return False - else: - return True - - -def _base_result( - gateway: Gateway, - *, - include_main: bool, - reset_sessions: bool, -) -> GatewayTemplatesSyncResult: - return GatewayTemplatesSyncResult( - gateway_id=gateway.id, - include_main=include_main, - reset_sessions=reset_sessions, - agents_updated=0, - agents_skipped=0, - main_updated=False, - ) - - -def _boards_by_id( - boards: list[Board], - *, - board_id: UUID | None, -) -> dict[UUID, Board] | None: - boards_by_id = {board.id: board for board in boards} - if board_id is None: - return boards_by_id - board = boards_by_id.get(board_id) - if board is None: - return None - return {board_id: board} - - -async def _resolve_agent_auth_token( - ctx: _SyncContext, - result: GatewayTemplatesSyncResult, - agent: Agent, - board: Board | None, - *, - agent_gateway_id: str, -) -> tuple[str | None, bool]: - try: - auth_token = await _get_existing_auth_token( - agent_gateway_id=agent_gateway_id, - config=ctx.config, - backoff=ctx.backoff, - ) - except TimeoutError as exc: - _append_sync_error(result, agent=agent, board=board, message=str(exc)) - return None, True - - if not auth_token: - if not ctx.options.rotate_tokens: - result.agents_skipped += 1 - _append_sync_error( - result, - agent=agent, - board=board, - message=( - "Skipping agent: unable to read AUTH_TOKEN from TOOLS.md " - "(run with rotate_tokens=true to re-key)." - ), - ) - return None, False - auth_token = await _rotate_agent_token(ctx.session, agent) - - if agent.agent_token_hash and not verify_agent_token( - auth_token, - agent.agent_token_hash, - ): - if ctx.options.rotate_tokens: - auth_token = await _rotate_agent_token(ctx.session, agent) - else: - _append_sync_error( - result, - agent=agent, - board=board, - message=( - "Warning: AUTH_TOKEN in TOOLS.md does not match backend " - "token hash (agent auth may be broken)." - ), - ) - return auth_token, False - - -async def _sync_one_agent( - ctx: _SyncContext, - result: GatewayTemplatesSyncResult, - agent: Agent, - board: Board, -) -> bool: - auth_token, fatal = await _resolve_agent_auth_token( - ctx, - result, - agent, - board, - agent_gateway_id=_agent_key(agent), - ) - if fatal: - return True - if not auth_token: - return False - try: - - async def _do_provision() -> bool: - await ctx.provisioner.apply_agent_lifecycle( - agent=agent, - gateway=ctx.gateway, - board=board, - auth_token=auth_token, - user=ctx.options.user, - action="update", - force_bootstrap=ctx.options.force_bootstrap, - reset_session=ctx.options.reset_sessions, - wake=False, - ) - return True - - await _with_gateway_retry(_do_provision, backoff=ctx.backoff) - result.agents_updated += 1 - except TimeoutError as exc: # pragma: no cover - gateway/network dependent - result.agents_skipped += 1 - _append_sync_error(result, agent=agent, board=board, message=str(exc)) - return True - except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover - result.agents_skipped += 1 - _append_sync_error( - result, - agent=agent, - board=board, - message=f"Failed to sync templates: {exc}", - ) - return False - else: - return False - - -async def _sync_main_agent( - ctx: _SyncContext, - result: GatewayTemplatesSyncResult, -) -> bool: - main_agent = ( - await Agent.objects.all() - .filter(col(Agent.gateway_id) == ctx.gateway.id) - .filter(col(Agent.board_id).is_(None)) - .first(ctx.session) - ) - if main_agent is None: - _append_sync_error( - result, - message="Gateway agent record not found; " "skipping gateway agent template sync.", - ) - return True - main_gateway_agent_id = GatewayAgentIdentity.openclaw_agent_id(ctx.gateway) - - token, fatal = await _resolve_agent_auth_token( - ctx, - result, - main_agent, - board=None, - agent_gateway_id=main_gateway_agent_id, - ) - if fatal: - return True - if not token: - _append_sync_error( - result, - agent=main_agent, - message="Skipping gateway agent: unable to read AUTH_TOKEN from TOOLS.md.", - ) - return True - stop_sync = False - try: - - async def _do_provision_main() -> bool: - await ctx.provisioner.apply_agent_lifecycle( - agent=main_agent, - gateway=ctx.gateway, - board=None, - auth_token=token, - user=ctx.options.user, - action="update", - force_bootstrap=ctx.options.force_bootstrap, - reset_session=ctx.options.reset_sessions, - wake=False, - ) - return True - - await _with_gateway_retry(_do_provision_main, backoff=ctx.backoff) - except TimeoutError as exc: # pragma: no cover - gateway/network dependent - _append_sync_error(result, agent=main_agent, message=str(exc)) - stop_sync = True - except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover - _append_sync_error( - result, - agent=main_agent, - message=f"Failed to sync gateway agent templates: {exc}", - ) - else: - result.main_updated = True - return stop_sync - - -@dataclass(frozen=True, slots=True) -class LeadAgentOptions: - """Optional overrides for board-lead provisioning behavior.""" - - agent_name: str | None = None - identity_profile: dict[str, str] | None = None - action: str = "provision" - - -@dataclass(frozen=True, slots=True) -class LeadAgentRequest: - """Inputs required to ensure or provision a board lead agent.""" - - board: Board - gateway: Gateway - config: GatewayClientConfig - user: User | None - options: LeadAgentOptions = field(default_factory=LeadAgentOptions) diff --git a/backend/app/services/openclaw/provisioning_db.py b/backend/app/services/openclaw/provisioning_db.py new file mode 100644 index 00000000..127a6771 --- /dev/null +++ b/backend/app/services/openclaw/provisioning_db.py @@ -0,0 +1,718 @@ +"""DB-backed OpenClaw provisioning orchestration. + +Layering: +- `app.services.openclaw.provisioning` contains gateway-only lifecycle operations (no DB calls). +- This module builds on top of that layer using AsyncSession for token rotation, lead-agent records, + and bulk template synchronization. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, TypeVar +from uuid import UUID + +from sqlalchemy import func +from sqlmodel import col, select + +from app.core.agent_tokens import generate_agent_token, hash_agent_token, verify_agent_token +from app.core.time import utcnow +from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig +from app.integrations.openclaw_gateway import OpenClawGatewayError +from app.models.agents import Agent +from app.models.board_memory import BoardMemory +from app.models.boards import Board +from app.models.gateways import Gateway +from app.schemas.gateways import GatewayTemplatesSyncError, GatewayTemplatesSyncResult +from app.services.openclaw.constants import ( + _NON_TRANSIENT_GATEWAY_ERROR_MARKERS, + _SECURE_RANDOM, + _TOOLS_KV_RE, + _TRANSIENT_GATEWAY_ERROR_MARKERS, + DEFAULT_HEARTBEAT_CONFIG, +) +from app.services.openclaw.internal import agent_key as _agent_key +from app.services.openclaw.provisioning import ( + OpenClawGatewayControlPlane, + OpenClawGatewayProvisioner, +) +from app.services.openclaw.shared import GatewayAgentIdentity +from app.services.organizations import get_org_owner_user + +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable + + from sqlmodel.ext.asyncio.session import AsyncSession + + from app.models.users import User + + +_T = TypeVar("_T") + + +@dataclass(frozen=True) +class GatewayTemplateSyncOptions: + """Runtime options controlling gateway template synchronization.""" + + user: User | None + include_main: bool = True + reset_sessions: bool = False + rotate_tokens: bool = False + force_bootstrap: bool = False + board_id: UUID | None = None + + +@dataclass(frozen=True, slots=True) +class LeadAgentOptions: + """Optional overrides for board-lead provisioning behavior.""" + + agent_name: str | None = None + identity_profile: dict[str, str] | None = None + action: str = "provision" + + +@dataclass(frozen=True, slots=True) +class LeadAgentRequest: + """Inputs required to ensure or provision a board lead agent.""" + + board: Board + gateway: Gateway + config: GatewayClientConfig + user: User | None + options: LeadAgentOptions = field(default_factory=LeadAgentOptions) + + +class OpenClawProvisioningService: + """DB-backed provisioning workflows (bulk template sync, lead-agent record).""" + + def __init__(self, session: AsyncSession) -> None: + self._session = session + self._gateway = OpenClawGatewayProvisioner() + + @property + def session(self) -> AsyncSession: + return self._session + + @staticmethod + def lead_session_key(board: Board) -> str: + return f"agent:lead-{board.id}:main" + + @staticmethod + def lead_agent_name(_: Board) -> str: + return "Lead Agent" + + async def ensure_board_lead_agent( + self, + *, + request: LeadAgentRequest, + ) -> tuple[Agent, bool]: + """Ensure a board has a lead agent; return `(agent, created)`.""" + board = request.board + config_options = request.options + + existing = ( + await self.session.exec( + select(Agent) + .where(Agent.board_id == board.id) + .where(col(Agent.is_board_lead).is_(True)), + ) + ).first() + if existing: + desired_name = config_options.agent_name or self.lead_agent_name(board) + changed = False + if existing.name != desired_name: + existing.name = desired_name + changed = True + if existing.gateway_id != request.gateway.id: + existing.gateway_id = request.gateway.id + changed = True + desired_session_key = self.lead_session_key(board) + if existing.openclaw_session_id != desired_session_key: + existing.openclaw_session_id = desired_session_key + changed = True + if changed: + existing.updated_at = utcnow() + self.session.add(existing) + await self.session.commit() + await self.session.refresh(existing) + return existing, False + + merged_identity_profile: dict[str, Any] = { + "role": "Board Lead", + "communication_style": "direct, concise, practical", + "emoji": ":gear:", + } + if config_options.identity_profile: + merged_identity_profile.update( + { + key: value.strip() + for key, value in config_options.identity_profile.items() + if value.strip() + }, + ) + + agent = Agent( + name=config_options.agent_name or self.lead_agent_name(board), + status="provisioning", + board_id=board.id, + gateway_id=request.gateway.id, + is_board_lead=True, + heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), + identity_profile=merged_identity_profile, + openclaw_session_id=self.lead_session_key(board), + provision_requested_at=utcnow(), + provision_action=config_options.action, + ) + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + + # Strict behavior: provisioning errors surface to the caller. The DB row exists + # so a later retry can succeed with the same deterministic identity/session key. + await self._gateway.apply_agent_lifecycle( + agent=agent, + gateway=request.gateway, + board=board, + auth_token=raw_token, + user=request.user, + action=config_options.action, + wake=True, + deliver_wakeup=True, + ) + + agent.status = "online" + agent.provision_requested_at = None + agent.provision_action = None + agent.updated_at = utcnow() + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + + return agent, True + + async def sync_gateway_templates( + self, + gateway: Gateway, + options: GatewayTemplateSyncOptions, + ) -> GatewayTemplatesSyncResult: + """Synchronize AGENTS/TOOLS/etc templates to gateway-connected agents.""" + template_user = options.user + if template_user is None: + template_user = await get_org_owner_user( + self.session, + organization_id=gateway.organization_id, + ) + options = GatewayTemplateSyncOptions( + user=template_user, + include_main=options.include_main, + reset_sessions=options.reset_sessions, + rotate_tokens=options.rotate_tokens, + force_bootstrap=options.force_bootstrap, + board_id=options.board_id, + ) + + result = _base_result( + gateway, + include_main=options.include_main, + reset_sessions=options.reset_sessions, + ) + if not gateway.url: + _append_sync_error( + result, + message="Gateway URL is not configured for this gateway.", + ) + return result + + control_plane = OpenClawGatewayControlPlane( + GatewayClientConfig(url=gateway.url, token=gateway.token), + ) + ctx = _SyncContext( + session=self.session, + gateway=gateway, + control_plane=control_plane, + backoff=_GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"), + options=options, + provisioner=self._gateway, + ) + if not await _ping_gateway(ctx, result): + return result + + boards = await Board.objects.filter_by(gateway_id=gateway.id).all(self.session) + boards_by_id = _boards_by_id(boards, board_id=options.board_id) + if boards_by_id is None: + _append_sync_error( + result, + message="Board does not belong to this gateway.", + ) + return result + paused_board_ids = await _paused_board_ids(self.session, list(boards_by_id.keys())) + if boards_by_id: + agents = await ( + Agent.objects.by_field_in("board_id", list(boards_by_id.keys())) + .order_by(col(Agent.created_at).asc()) + .all(self.session) + ) + else: + agents = [] + + stop_sync = False + for agent in agents: + board = boards_by_id.get(agent.board_id) if agent.board_id is not None else None + if board is None: + result.agents_skipped += 1 + _append_sync_error( + result, + agent=agent, + message="Skipping agent: board not found for agent.", + ) + continue + if board.id in paused_board_ids: + result.agents_skipped += 1 + continue + stop_sync = await _sync_one_agent(ctx, result, agent, board) + if stop_sync: + break + + if not stop_sync and options.include_main: + await _sync_main_agent(ctx, result) + return result + + +@dataclass(frozen=True) +class _SyncContext: + session: AsyncSession + gateway: Gateway + control_plane: OpenClawGatewayControlPlane + backoff: _GatewayBackoff + options: GatewayTemplateSyncOptions + provisioner: OpenClawGatewayProvisioner + + +def _is_transient_gateway_error(exc: Exception) -> bool: + if not isinstance(exc, OpenClawGatewayError): + return False + message = str(exc).lower() + if not message: + return False + if any(marker in message for marker in _NON_TRANSIENT_GATEWAY_ERROR_MARKERS): + return False + return ("503" in message and "websocket" in message) or any( + marker in message for marker in _TRANSIENT_GATEWAY_ERROR_MARKERS + ) + + +def _gateway_timeout_message( + exc: OpenClawGatewayError, + *, + timeout_s: float, + context: str, +) -> str: + rounded_timeout = int(timeout_s) + timeout_text = f"{rounded_timeout} seconds" + if rounded_timeout >= 120: + timeout_text = f"{rounded_timeout // 60} minutes" + return f"Gateway unreachable after {timeout_text} ({context} timeout). Last error: {exc}" + + +class _GatewayBackoff: + def __init__( + self, + *, + timeout_s: float = 10 * 60, + base_delay_s: float = 0.75, + max_delay_s: float = 30.0, + jitter: float = 0.2, + timeout_context: str = "gateway operation", + ) -> None: + self._timeout_s = timeout_s + self._base_delay_s = base_delay_s + self._max_delay_s = max_delay_s + self._jitter = jitter + self._timeout_context = timeout_context + self._delay_s = base_delay_s + + def reset(self) -> None: + self._delay_s = self._base_delay_s + + @staticmethod + async def _attempt( + fn: Callable[[], Awaitable[_T]], + ) -> tuple[_T | None, OpenClawGatewayError | None]: + try: + return await fn(), None + except OpenClawGatewayError as exc: + return None, exc + + async def run(self, fn: Callable[[], Awaitable[_T]]) -> _T: + deadline_s = asyncio.get_running_loop().time() + self._timeout_s + while True: + value, error = await self._attempt(fn) + if error is not None: + exc = error + if not _is_transient_gateway_error(exc): + raise exc + now = asyncio.get_running_loop().time() + remaining = deadline_s - now + if remaining <= 0: + raise TimeoutError( + _gateway_timeout_message( + exc, + timeout_s=self._timeout_s, + context=self._timeout_context, + ), + ) from exc + + sleep_s = min(self._delay_s, remaining) + if self._jitter: + sleep_s *= 1.0 + _SECURE_RANDOM.uniform( + -self._jitter, + self._jitter, + ) + sleep_s = max(0.0, min(sleep_s, remaining)) + await asyncio.sleep(sleep_s) + self._delay_s = min(self._delay_s * 2.0, self._max_delay_s) + continue + self.reset() + if value is None: + msg = "Gateway retry produced no value without an error" + raise RuntimeError(msg) + return value + + +async def _with_gateway_retry( + fn: Callable[[], Awaitable[_T]], + *, + backoff: _GatewayBackoff, +) -> _T: + return await backoff.run(fn) + + +def _parse_tools_md(content: str) -> dict[str, str]: + values: dict[str, str] = {} + for raw in content.splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + match = _TOOLS_KV_RE.match(line) + if not match: + continue + values[match.group("key")] = match.group("value").strip() + return values + + +async def _get_agent_file( + *, + agent_gateway_id: str, + name: str, + control_plane: OpenClawGatewayControlPlane, + backoff: _GatewayBackoff | None = None, +) -> str | None: + try: + + async def _do_get() -> object: + return await control_plane.get_agent_file_payload(agent_id=agent_gateway_id, name=name) + + payload = await (backoff.run(_do_get) if backoff else _do_get()) + except OpenClawGatewayError: + return None + if isinstance(payload, str): + return payload + if isinstance(payload, dict): + content = payload.get("content") + if isinstance(content, str): + return content + file_obj = payload.get("file") + if isinstance(file_obj, dict): + nested = file_obj.get("content") + if isinstance(nested, str): + return nested + return None + + +async def _get_existing_auth_token( + *, + agent_gateway_id: str, + control_plane: OpenClawGatewayControlPlane, + backoff: _GatewayBackoff | None = None, +) -> str | None: + tools = await _get_agent_file( + agent_gateway_id=agent_gateway_id, + name="TOOLS.md", + control_plane=control_plane, + backoff=backoff, + ) + if not tools: + return None + values = _parse_tools_md(tools) + token = values.get("AUTH_TOKEN") + if not token: + return None + token = token.strip() + return token or None + + +async def _paused_board_ids(session: AsyncSession, board_ids: list[UUID]) -> set[UUID]: + if not board_ids: + return set() + + commands = {"/pause", "/resume"} + statement = ( + select(BoardMemory.board_id, BoardMemory.content) + .where(col(BoardMemory.board_id).in_(board_ids)) + .where(col(BoardMemory.is_chat).is_(True)) + .where(func.lower(func.trim(col(BoardMemory.content))).in_(commands)) + .order_by(col(BoardMemory.board_id), col(BoardMemory.created_at).desc()) + # Postgres: DISTINCT ON (board_id) to get latest command per board. + .distinct(col(BoardMemory.board_id)) + ) + + paused: set[UUID] = set() + for board_id, content in await session.exec(statement): + cmd = (content or "").strip().lower() + if cmd == "/pause": + paused.add(board_id) + return paused + + +def _append_sync_error( + result: GatewayTemplatesSyncResult, + *, + message: str, + agent: Agent | None = None, + board: Board | None = None, +) -> None: + result.errors.append( + GatewayTemplatesSyncError( + agent_id=agent.id if agent else None, + agent_name=agent.name if agent else None, + board_id=board.id if board else None, + message=message, + ), + ) + + +async def _rotate_agent_token(session: AsyncSession, agent: Agent) -> str: + token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(token) + agent.updated_at = utcnow() + session.add(agent) + await session.commit() + await session.refresh(agent) + return token + + +async def _ping_gateway(ctx: _SyncContext, result: GatewayTemplatesSyncResult) -> bool: + try: + + async def _do_ping() -> object: + return await ctx.control_plane.health() + + await ctx.backoff.run(_do_ping) + except (TimeoutError, OpenClawGatewayError) as exc: + _append_sync_error(result, message=str(exc)) + return False + else: + return True + + +def _base_result( + gateway: Gateway, + *, + include_main: bool, + reset_sessions: bool, +) -> GatewayTemplatesSyncResult: + return GatewayTemplatesSyncResult( + gateway_id=gateway.id, + include_main=include_main, + reset_sessions=reset_sessions, + agents_updated=0, + agents_skipped=0, + main_updated=False, + ) + + +def _boards_by_id( + boards: list[Board], + *, + board_id: UUID | None, +) -> dict[UUID, Board] | None: + boards_by_id = {board.id: board for board in boards} + if board_id is None: + return boards_by_id + board = boards_by_id.get(board_id) + if board is None: + return None + return {board_id: board} + + +async def _resolve_agent_auth_token( + ctx: _SyncContext, + result: GatewayTemplatesSyncResult, + agent: Agent, + board: Board | None, + *, + agent_gateway_id: str, +) -> tuple[str | None, bool]: + try: + auth_token = await _get_existing_auth_token( + agent_gateway_id=agent_gateway_id, + control_plane=ctx.control_plane, + backoff=ctx.backoff, + ) + except TimeoutError as exc: + _append_sync_error(result, agent=agent, board=board, message=str(exc)) + return None, True + + if not auth_token: + if not ctx.options.rotate_tokens: + result.agents_skipped += 1 + _append_sync_error( + result, + agent=agent, + board=board, + message=( + "Skipping agent: unable to read AUTH_TOKEN from TOOLS.md " + "(run with rotate_tokens=true to re-key)." + ), + ) + return None, False + auth_token = await _rotate_agent_token(ctx.session, agent) + + if agent.agent_token_hash and not verify_agent_token( + auth_token, + agent.agent_token_hash, + ): + if ctx.options.rotate_tokens: + auth_token = await _rotate_agent_token(ctx.session, agent) + else: + _append_sync_error( + result, + agent=agent, + board=board, + message=( + "Warning: AUTH_TOKEN in TOOLS.md does not match backend " + "token hash (agent auth may be broken)." + ), + ) + return auth_token, False + + +async def _sync_one_agent( + ctx: _SyncContext, + result: GatewayTemplatesSyncResult, + agent: Agent, + board: Board, +) -> bool: + auth_token, fatal = await _resolve_agent_auth_token( + ctx, + result, + agent, + board, + agent_gateway_id=_agent_key(agent), + ) + if fatal: + return True + if not auth_token: + return False + try: + + async def _do_provision() -> bool: + await ctx.provisioner.apply_agent_lifecycle( + agent=agent, + gateway=ctx.gateway, + board=board, + auth_token=auth_token, + user=ctx.options.user, + action="update", + force_bootstrap=ctx.options.force_bootstrap, + reset_session=ctx.options.reset_sessions, + wake=False, + ) + return True + + await _with_gateway_retry(_do_provision, backoff=ctx.backoff) + result.agents_updated += 1 + except TimeoutError as exc: # pragma: no cover - gateway/network dependent + result.agents_skipped += 1 + _append_sync_error(result, agent=agent, board=board, message=str(exc)) + return True + except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover + result.agents_skipped += 1 + _append_sync_error( + result, + agent=agent, + board=board, + message=f"Failed to sync templates: {exc}", + ) + return False + else: + return False + + +async def _sync_main_agent( + ctx: _SyncContext, + result: GatewayTemplatesSyncResult, +) -> bool: + main_agent = ( + await Agent.objects.all() + .filter(col(Agent.gateway_id) == ctx.gateway.id) + .filter(col(Agent.board_id).is_(None)) + .first(ctx.session) + ) + if main_agent is None: + _append_sync_error( + result, + message="Gateway agent record not found; skipping gateway agent template sync.", + ) + return True + + main_gateway_agent_id = GatewayAgentIdentity.openclaw_agent_id(ctx.gateway) + token, fatal = await _resolve_agent_auth_token( + ctx, + result, + main_agent, + board=None, + agent_gateway_id=main_gateway_agent_id, + ) + if fatal: + return True + if not token: + _append_sync_error( + result, + agent=main_agent, + message="Skipping gateway agent: unable to read AUTH_TOKEN from TOOLS.md.", + ) + return True + stop_sync = False + try: + + async def _do_provision_main() -> bool: + await ctx.provisioner.apply_agent_lifecycle( + agent=main_agent, + gateway=ctx.gateway, + board=None, + auth_token=token, + user=ctx.options.user, + action="update", + force_bootstrap=ctx.options.force_bootstrap, + reset_session=ctx.options.reset_sessions, + wake=False, + ) + return True + + await _with_gateway_retry(_do_provision_main, backoff=ctx.backoff) + except TimeoutError as exc: # pragma: no cover - gateway/network dependent + _append_sync_error(result, agent=main_agent, message=str(exc)) + stop_sync = True + except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover + _append_sync_error( + result, + agent=main_agent, + message=f"Failed to sync gateway agent templates: {exc}", + ) + else: + result.main_updated = True + return stop_sync diff --git a/backend/scripts/sync_gateway_templates.py b/backend/scripts/sync_gateway_templates.py index d3098d64..f6cec160 100644 --- a/backend/scripts/sync_gateway_templates.py +++ b/backend/scripts/sync_gateway_templates.py @@ -52,7 +52,7 @@ def _parse_args() -> argparse.Namespace: async def _run() -> int: from app.db.session import async_session_maker from app.models.gateways import Gateway - from app.services.openclaw.provisioning import ( + from app.services.openclaw.provisioning_db import ( GatewayTemplateSyncOptions, OpenClawProvisioningService, ) diff --git a/backend/tests/test_agent_delete_main_agent.py b/backend/tests/test_agent_delete_main_agent.py index 6e4b8aac..9a3da74f 100644 --- a/backend/tests/test_agent_delete_main_agent.py +++ b/backend/tests/test_agent_delete_main_agent.py @@ -45,7 +45,9 @@ class _GatewayStub: @pytest.mark.asyncio -async def test_delete_gateway_main_agent_does_not_require_board_id(monkeypatch: pytest.MonkeyPatch) -> None: +async def test_delete_gateway_main_agent_does_not_require_board_id( + monkeypatch: pytest.MonkeyPatch, +) -> None: session = _FakeSession() service = agent_service.AgentLifecycleService(session) # type: ignore[arg-type] @@ -63,7 +65,9 @@ async def test_delete_gateway_main_agent_does_not_require_board_id(monkeypatch: token=None, workspace_root="/tmp/openclaw", ) - ctx = SimpleNamespace(organization=SimpleNamespace(id=uuid4()), member=SimpleNamespace(id=uuid4())) + ctx = SimpleNamespace( + organization=SimpleNamespace(id=uuid4()), member=SimpleNamespace(id=uuid4()) + ) async def _fake_first_agent(_session: object) -> _AgentStub: return agent @@ -109,7 +113,7 @@ async def test_delete_gateway_main_agent_does_not_require_board_id(monkeypatch: monkeypatch.setattr(service, "require_board", _should_not_be_called) monkeypatch.setattr(service, "require_gateway", _should_not_be_called) monkeypatch.setattr( - agent_service.OpenClawProvisioningService, + agent_service.OpenClawGatewayProvisioner, "delete_agent_lifecycle", _fake_delete_agent_lifecycle, ) diff --git a/backend/tests/test_agent_provisioning_utils.py b/backend/tests/test_agent_provisioning_utils.py index e7a6f887..cca862ce 100644 --- a/backend/tests/test_agent_provisioning_utils.py +++ b/backend/tests/test_agent_provisioning_utils.py @@ -129,7 +129,7 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch _fake_set_agent_files, ) - await agent_provisioning.OpenClawProvisioningService().apply_agent_lifecycle( + await agent_provisioning.OpenClawGatewayProvisioner().apply_agent_lifecycle( agent=agent, # type: ignore[arg-type] gateway=gateway, # type: ignore[arg-type] board=None,