From ad75871e61b5281b2361f4c6f504213d1a5f9bb1 Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Tue, 10 Feb 2026 22:30:14 +0530 Subject: [PATCH] refactor: replace direct calls to provisioning functions with OpenClawProvisioningService methods --- backend/app/api/board_groups.py | 7 +- backend/app/api/board_onboarding.py | 5 +- backend/app/api/boards.py | 7 +- .../app/services/openclaw/admin_service.py | 87 +- .../app/services/openclaw/agent_service.py | 291 ++---- .../services/openclaw/coordination_service.py | 5 +- backend/app/services/openclaw/provisioning.py | 881 ++++++++---------- backend/app/services/organizations.py | 17 + backend/scripts/sync_gateway_templates.py | 7 +- backend/tests/test_agent_delete_main_agent.py | 23 +- .../tests/test_agent_provisioning_utils.py | 216 +++-- .../tests/test_authenticate_request_flow.py | 1 - 12 files changed, 703 insertions(+), 844 deletions(-) diff --git a/backend/app/api/board_groups.py b/backend/app/api/board_groups.py index 5e93ec38..2f9371f6 100644 --- a/backend/app/api/board_groups.py +++ b/backend/app/api/board_groups.py @@ -30,7 +30,7 @@ from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.view_models import BoardGroupSnapshot from app.services.board_group_snapshot import build_group_snapshot from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG -from app.services.openclaw.provisioning import sync_gateway_agent_heartbeats +from app.services.openclaw.provisioning import OpenClawProvisioningService from app.services.openclaw.shared import GatewayTransportError from app.services.organizations import ( OrganizationContext, @@ -269,7 +269,10 @@ async def _sync_gateway_heartbeats( failed_agent_ids.extend([agent.id for agent in gateway_agents]) continue try: - await sync_gateway_agent_heartbeats(gateway, gateway_agents) + await OpenClawProvisioningService().sync_gateway_agent_heartbeats( + gateway, + gateway_agents, + ) except GatewayTransportError: failed_agent_ids.extend([agent.id for agent in gateway_agents]) return failed_agent_ids diff --git a/backend/app/api/board_onboarding.py b/backend/app/api/board_onboarding.py index 201d51fc..3b91bc12 100644 --- a/backend/app/api/board_onboarding.py +++ b/backend/app/api/board_onboarding.py @@ -38,7 +38,7 @@ from app.services.openclaw.policies import OpenClawAuthorizationPolicy from app.services.openclaw.provisioning import ( LeadAgentOptions, LeadAgentRequest, - ensure_board_lead_agent, + OpenClawProvisioningService, ) from app.services.openclaw.shared import require_gateway_config_for_board @@ -401,8 +401,7 @@ async def confirm_onboarding( session.add(onboarding) await session.commit() await session.refresh(board) - await ensure_board_lead_agent( - session, + await OpenClawProvisioningService(session).ensure_board_lead_agent( request=LeadAgentRequest( board=board, gateway=gateway, diff --git a/backend/app/api/boards.py b/backend/app/api/boards.py index 1f18ef1d..c43fac84 100644 --- a/backend/app/api/boards.py +++ b/backend/app/api/boards.py @@ -39,7 +39,7 @@ from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.view_models import BoardGroupSnapshot, BoardSnapshot from app.services.board_group_snapshot import build_board_group_snapshot from app.services.board_snapshot import build_board_snapshot -from app.services.openclaw.provisioning import cleanup_agent +from app.services.openclaw.provisioning import OpenClawProvisioningService from app.services.openclaw.shared import GatewayTransportError from app.services.organizations import OrganizationContext, board_access_filter @@ -287,7 +287,10 @@ async def delete_board( if config: try: for agent in agents: - await cleanup_agent(agent, config) + await OpenClawProvisioningService().delete_agent_lifecycle( + agent=agent, + gateway=config, + ) except GatewayTransportError as exc: raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, diff --git a/backend/app/services/openclaw/admin_service.py b/backend/app/services/openclaw/admin_service.py index 35a7c193..518810e7 100644 --- a/backend/app/services/openclaw/admin_service.py +++ b/backend/app/services/openclaw/admin_service.py @@ -17,9 +17,7 @@ from app.db import crud from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig from app.integrations.openclaw_gateway import ( OpenClawGatewayError, - ensure_session, openclaw_call, - send_message, ) from app.models.activity_events import ActivityEvent from app.models.agents import Agent @@ -30,13 +28,11 @@ from app.schemas.gateways import GatewayTemplatesSyncResult from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG from app.services.openclaw.provisioning import ( GatewayTemplateSyncOptions, - MainAgentProvisionRequest, - ProvisionOptions, - provision_main_agent, - sync_gateway_templates, + OpenClawProvisioningService, ) from app.services.openclaw.session_service import GatewayTemplateSyncQuery from app.services.openclaw.shared import GatewayAgentIdentity +from app.services.organizations import get_org_owner_user if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession @@ -132,30 +128,6 @@ class GatewayAdminLifecycleService: .first(self.session) ) - @staticmethod - def extract_agent_id_from_entry(item: object) -> str | None: - if isinstance(item, str): - value = item.strip() - return value or None - if not isinstance(item, dict): - return None - for key in ("id", "agentId", "agent_id"): - raw = item.get(key) - if isinstance(raw, str) and raw.strip(): - return raw.strip() - return None - - @staticmethod - def extract_agents_list(payload: object) -> list[object]: - if isinstance(payload, list): - return [item for item in payload] - if not isinstance(payload, dict): - return [] - agents = payload.get("agents") or [] - if not isinstance(agents, list): - return [] - return [item for item in agents] - async def upsert_main_agent_record(self, gateway: Gateway) -> tuple[Agent, bool]: changed = False session_key = GatewayAgentIdentity.session_key(gateway) @@ -210,13 +182,13 @@ class GatewayAdminLifecycleService: config = GatewayClientConfig(url=gateway.url, token=gateway.token) target_id = GatewayAgentIdentity.openclaw_agent_id(gateway) try: - payload = await openclaw_call("agents.list", config=config) - except OpenClawGatewayError: + await openclaw_call("agents.files.list", {"agentId": target_id}, config=config) + except OpenClawGatewayError as exc: + message = str(exc).lower() + if any(marker in message for marker in ("not found", "unknown agent", "no such agent")): + return False return True - for item in self.extract_agents_list(payload): - if self.extract_agent_id_from_entry(item) == target_id: - return True - return False + return True async def provision_main_agent_record( self, @@ -227,7 +199,10 @@ class GatewayAdminLifecycleService: action: str, notify: bool, ) -> Agent: - session_key = GatewayAgentIdentity.session_key(gateway) + template_user = user or await get_org_owner_user( + self.session, + organization_id=gateway.organization_id, + ) raw_token = generate_agent_token() agent.agent_token_hash = hash_agent_token(raw_token) agent.provision_requested_at = utcnow() @@ -241,33 +216,16 @@ class GatewayAdminLifecycleService: if not gateway.url: return agent try: - await provision_main_agent( - agent, - MainAgentProvisionRequest( - gateway=gateway, - auth_token=raw_token, - user=user, - session_key=session_key, - options=ProvisionOptions(action=action), - ), + await OpenClawProvisioningService().apply_agent_lifecycle( + agent=agent, + gateway=gateway, + board=None, + auth_token=raw_token, + user=template_user, + action=action, + wake=notify, + deliver_wakeup=True, ) - await ensure_session( - session_key, - config=GatewayClientConfig(url=gateway.url, token=gateway.token), - label=agent.name, - ) - if notify: - await send_message( - ( - f"Hello {agent.name}. Your gateway provisioning was updated.\n\n" - "Please re-read AGENTS.md, USER.md, HEARTBEAT.md, and TOOLS.md. " - "If BOOTSTRAP.md exists, run it once then delete it. " - "Begin heartbeats after startup." - ), - session_key=session_key, - config=GatewayClientConfig(url=gateway.url, token=gateway.token), - deliver=True, - ) self.logger.info( "gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s", gateway.id, @@ -388,8 +346,7 @@ class GatewayAdminLifecycleService: query.include_main, ) await self.ensure_gateway_agents_exist([gateway]) - result = await sync_gateway_templates( - self.session, + result = await OpenClawProvisioningService(self.session).sync_gateway_templates( gateway, GatewayTemplateSyncOptions( user=auth.user, diff --git a/backend/app/services/openclaw/agent_service.py b/backend/app/services/openclaw/agent_service.py index b9ae2566..559c9a36 100644 --- a/backend/app/services/openclaw/agent_service.py +++ b/backend/app/services/openclaw/agent_service.py @@ -49,15 +49,7 @@ from app.services.openclaw.constants import ( OFFLINE_AFTER, ) from app.services.openclaw.policies import OpenClawAuthorizationPolicy -from app.services.openclaw.provisioning import ( - AgentProvisionRequest, - MainAgentProvisionRequest, - ProvisionOptions, - cleanup_agent, - cleanup_main_agent, - provision_agent, - provision_main_agent, -) +from app.services.openclaw.provisioning import OpenClawProvisioningService from app.services.openclaw.shared import GatewayAgentIdentity from app.services.organizations import ( OrganizationContext, @@ -176,11 +168,6 @@ class AbstractProvisionExecution(ABC): ) try: await self._provision() - await self._service.send_wakeup_message( - self.agent, - self.request.target.client_config, - verb=self._wakeup_verb, - ) self.agent.provision_confirm_token_hash = None self.agent.provision_requested_at = None self.agent.provision_action = None @@ -256,19 +243,18 @@ class BoardAgentProvisionExecution(AbstractProvisionExecution): status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="board is required for non-main agent provisioning", ) - await provision_agent( - self.agent, - AgentProvisionRequest( - board=board, - gateway=self.request.target.gateway, - auth_token=self.request.raw_token, - user=self.request.user, - options=ProvisionOptions( - action=self._action, - force_bootstrap=self.request.force_bootstrap, - reset_session=True, - ), - ), + await OpenClawProvisioningService().apply_agent_lifecycle( + agent=self.agent, + gateway=self.request.target.gateway, + board=board, + auth_token=self.request.raw_token, + user=self.request.user, + action=self._action, + force_bootstrap=self.request.force_bootstrap, + reset_session=True, + wake=True, + deliver_wakeup=True, + wakeup_verb=self._wakeup_verb, ) @@ -276,19 +262,18 @@ class MainAgentProvisionExecution(AbstractProvisionExecution): """Provision execution for gateway-main agents.""" async def _provision(self) -> None: - await provision_main_agent( - self.agent, - MainAgentProvisionRequest( - gateway=self.request.target.gateway, - auth_token=self.request.raw_token, - user=self.request.user, - session_key=self.agent.openclaw_session_id, - options=ProvisionOptions( - action=self._action, - force_bootstrap=self.request.force_bootstrap, - reset_session=True, - ), - ), + await OpenClawProvisioningService().apply_agent_lifecycle( + agent=self.agent, + gateway=self.request.target.gateway, + board=None, + auth_token=self.request.raw_token, + user=self.request.user, + action=self._action, + force_bootstrap=self.request.force_bootstrap, + reset_session=True, + wake=True, + deliver_wakeup=True, + wakeup_verb=self._wakeup_verb, ) @@ -337,8 +322,25 @@ class AgentLifecycleService: return slug or uuid4().hex @classmethod - def build_session_key(cls, agent_name: str) -> str: - return f"{AGENT_SESSION_PREFIX}:{cls.slugify(agent_name)}:main" + def resolve_session_key(cls, agent: Agent) -> str: + """Resolve the gateway session key for an agent. + + Notes: + - For board-scoped agents, default to a UUID-based key to avoid name collisions. + """ + + existing = (agent.openclaw_session_id or "").strip() + if agent.board_id is None: + # Gateway-main agents must have an explicit deterministic key (set elsewhere). + if existing: + return existing + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway main agent session key is required", + ) + if agent.is_board_lead: + return f"{AGENT_SESSION_PREFIX}:lead-{agent.board_id}:main" + return f"{AGENT_SESSION_PREFIX}:mc-{agent.id}:main" @classmethod def workspace_path(cls, agent_name: str, workspace_root: str | None) -> str: @@ -439,23 +441,6 @@ class AgentLifecycleService: return None return await Gateway.objects.by_id(agent.gateway_id).first(self.session) - async def ensure_gateway_session( - self, - agent_name: str, - config: GatewayClientConfig, - ) -> tuple[str, str | None]: - session_key = self.build_session_key(agent_name) - try: - await ensure_session(session_key, config=config, label=agent_name) - except OpenClawGatewayError as exc: - self.logger.warning( - "agent.session.ensure_failed agent_name=%s error=%s", - agent_name, - str(exc), - ) - return session_key, str(exc) - return session_key, None - @classmethod def with_computed_status(cls, agent: Agent) -> Agent: now = utcnow() @@ -607,30 +592,11 @@ class AgentLifecycleService: detail="An agent with this name already exists in this gateway workspace.", ) - desired_session_key = self.build_session_key(requested_name) - existing_session_key = ( - await self.session.exec( - select(Agent) - .join(Board, col(Agent.board_id) == col(Board.id)) - .where(col(Board.gateway_id) == gateway.id) - .where(col(Agent.openclaw_session_id) == desired_session_key), - ) - ).first() - if existing_session_key: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=( - "This agent name would collide with an existing workspace " - "session key. Pick a different name." - ), - ) - async def persist_new_agent( self, *, data: dict[str, Any], - client_config: GatewayClientConfig, - ) -> tuple[Agent, str, str | None]: + ) -> tuple[Agent, str]: agent = Agent.model_validate(data) agent.status = "provisioning" raw_token = generate_agent_token() @@ -639,58 +605,21 @@ class AgentLifecycleService: agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() agent.provision_requested_at = utcnow() agent.provision_action = "provision" - session_key, session_error = await self.ensure_gateway_session( - agent.name, - client_config, - ) - agent.openclaw_session_id = session_key + agent.openclaw_session_id = self.resolve_session_key(agent) self.session.add(agent) await self.session.commit() await self.session.refresh(agent) - return agent, raw_token, session_error - - async def record_session_creation( - self, - *, - agent: Agent, - session_error: str | None, - ) -> None: - if session_error: - record_activity( - self.session, - event_type="agent.session.failed", - message=f"Session sync failed for {agent.name}: {session_error}", - agent_id=agent.id, - ) - else: - record_activity( - self.session, - event_type="agent.session.created", - message=f"Session created for {agent.name}.", - agent_id=agent.id, - ) - await self.session.commit() - - async def send_wakeup_message( - self, - agent: Agent, - config: GatewayClientConfig, - verb: str = "provisioned", - ) -> None: - session_key = agent.openclaw_session_id or self.build_session_key(agent.name) - await ensure_session(session_key, config=config, label=agent.name) - message = ( - f"Hello {agent.name}. Your workspace has been {verb}.\n\n" - "Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run it once " - "then delete it. Begin heartbeats after startup." - ) - await send_message(message, session_key=session_key, config=config, deliver=True) + return agent, raw_token async def provision_new_agent( self, *, agent: Agent, - request: AgentProvisionRequest, + board: Board, + gateway: Gateway, + auth_token: str, + user: User | None, + force_bootstrap: bool, client_config: GatewayClientConfig, ) -> None: execution = BoardAgentProvisionExecution( @@ -699,13 +628,13 @@ class AgentLifecycleService: provision_request=AgentUpdateProvisionRequest( target=AgentUpdateProvisionTarget( is_main_agent=False, - board=request.board, - gateway=request.gateway, + board=board, + gateway=gateway, client_config=client_config, ), - raw_token=request.auth_token, - user=request.user, - force_bootstrap=request.options.force_bootstrap, + raw_token=auth_token, + user=user, + force_bootstrap=force_bootstrap, ), action="provision", wakeup_verb="provisioned", @@ -852,24 +781,6 @@ class AgentLifecycleService: client_config=client_config, ) - async def ensure_agent_update_session( - self, - *, - agent: Agent, - client_config: GatewayClientConfig, - ) -> None: - session_key = agent.openclaw_session_id or self.build_session_key(agent.name) - try: - await ensure_session(session_key, config=client_config, label=agent.name) - if not agent.openclaw_session_id: - agent.openclaw_session_id = session_key - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) - except OpenClawGatewayError as exc: - self.record_instruction_failure(self.session, agent, str(exc), "update") - await self.session.commit() - @staticmethod def mark_agent_update_pending(agent: Agent) -> str: raw_token = generate_agent_token() @@ -937,23 +848,14 @@ class AgentLifecycleService: "gateway_id": gateway.id, "heartbeat_config": DEFAULT_HEARTBEAT_CONFIG.copy(), } - agent, raw_token, session_error = await self.persist_new_agent( - data=data, - client_config=client_config, - ) - await self.record_session_creation( - agent=agent, - session_error=session_error, - ) + agent, raw_token = await self.persist_new_agent(data=data) await self.provision_new_agent( agent=agent, - request=AgentProvisionRequest( - board=board, - gateway=gateway, - auth_token=raw_token, - user=actor.user, - options=ProvisionOptions(action="provision"), - ), + board=board, + gateway=gateway, + auth_token=raw_token, + user=actor.user, + force_bootstrap=False, client_config=client_config, ) return agent @@ -987,13 +889,11 @@ class AgentLifecycleService: gateway, client_config = await self.require_gateway(board) await self.provision_new_agent( agent=agent, - request=AgentProvisionRequest( - board=board, - gateway=gateway, - auth_token=raw_token, - user=user, - options=ProvisionOptions(action="provision"), - ), + board=board, + gateway=gateway, + auth_token=raw_token, + user=user, + force_bootstrap=False, client_config=client_config, ) @@ -1003,24 +903,17 @@ class AgentLifecycleService: agent: Agent, actor: ActorContextLike, ) -> None: - if agent.openclaw_session_id: + _ = actor + if agent.board_id is None: return - board = await self.require_board( - str(agent.board_id) if agent.board_id else None, - user=actor.user if actor.actor_type == "user" else None, - write=actor.actor_type == "user", - ) - _, client_config = await self.require_gateway(board) - session_key, session_error = await self.ensure_gateway_session( - agent.name, - client_config, - ) - agent.openclaw_session_id = session_key + desired = self.resolve_session_key(agent) + existing = (agent.openclaw_session_id or "").strip() + if existing == desired: + return + agent.openclaw_session_id = desired self.session.add(agent) - await self.record_session_creation( - agent=agent, - session_error=session_error, - ) + await self.session.commit() + await self.session.refresh(agent) async def commit_heartbeat( self, @@ -1162,24 +1055,14 @@ class AgentLifecycleService: gateway=gateway, requested_name=requested_name, ) - agent, raw_token, session_error = await self.persist_new_agent( - data=data, - client_config=client_config, - ) - await self.record_session_creation( + agent, raw_token = await self.persist_new_agent(data=data) + await self.provision_new_agent( agent=agent, - session_error=session_error, - ) - provision_request = AgentProvisionRequest( board=board, gateway=gateway, auth_token=raw_token, user=actor.user if actor.actor_type == "user" else None, - options=ProvisionOptions(action="provision"), - ) - await self.provision_new_agent( - agent=agent, - request=provision_request, + force_bootstrap=False, client_config=client_config, ) self.logger.info("agent.create.success agent_id=%s board_id=%s", agent.id, board.id) @@ -1229,10 +1112,6 @@ class AgentLifecycleService: main_gateway=main_gateway, gateway_for_main=gateway_for_main, ) - await self.ensure_agent_update_session( - agent=agent, - client_config=target.client_config, - ) raw_token = self.mark_agent_update_pending(agent) self.session.add(agent) await self.session.commit() @@ -1345,7 +1224,10 @@ class AgentLifecycleService: if gateway and gateway.url: client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) try: - workspace_path = await cleanup_main_agent(agent, gateway) + workspace_path = await OpenClawProvisioningService().delete_agent_lifecycle( + agent=agent, + gateway=gateway, + ) except OpenClawGatewayError as exc: self.record_instruction_failure(self.session, agent, str(exc), "delete") await self.session.commit() @@ -1364,7 +1246,10 @@ class AgentLifecycleService: board = await self.require_board(str(agent.board_id)) gateway, client_config = await self.require_gateway(board) try: - workspace_path = await cleanup_agent(agent, gateway) + workspace_path = await OpenClawProvisioningService().delete_agent_lifecycle( + agent=agent, + gateway=gateway, + ) except OpenClawGatewayError as exc: self.record_instruction_failure(self.session, agent, str(exc), "delete") await self.session.commit() diff --git a/backend/app/services/openclaw/coordination_service.py b/backend/app/services/openclaw/coordination_service.py index 99f970bc..3233f152 100644 --- a/backend/app/services/openclaw/coordination_service.py +++ b/backend/app/services/openclaw/coordination_service.py @@ -39,7 +39,7 @@ from app.services.openclaw.policies import OpenClawAuthorizationPolicy from app.services.openclaw.provisioning import ( LeadAgentOptions, LeadAgentRequest, - ensure_board_lead_agent, + OpenClawProvisioningService, ) from app.services.openclaw.shared import ( GatewayAgentIdentity, @@ -542,8 +542,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): board: Board, message: str, ) -> tuple[Agent, bool]: - lead, lead_created = await ensure_board_lead_agent( - self.session, + lead, lead_created = await OpenClawProvisioningService(self.session).ensure_board_lead_agent( request=LeadAgentRequest( board=board, gateway=gateway, diff --git a/backend/app/services/openclaw/provisioning.py b/backend/app/services/openclaw/provisioning.py index 31a4eefb..1eb05290 100644 --- a/backend/app/services/openclaw/provisioning.py +++ b/backend/app/services/openclaw/provisioning.py @@ -33,12 +33,8 @@ from app.models.boards import Board from app.models.gateways import Gateway from app.schemas.gateways import GatewayTemplatesSyncError, GatewayTemplatesSyncResult from app.services.openclaw.constants import ( - _COORDINATION_GATEWAY_BASE_DELAY_S, - _COORDINATION_GATEWAY_MAX_DELAY_S, - _COORDINATION_GATEWAY_TIMEOUT_S, _NON_TRANSIENT_GATEWAY_ERROR_MARKERS, _SECURE_RANDOM, - _SESSION_KEY_PARTS_MIN, _TOOLS_KV_RE, _TRANSIENT_GATEWAY_ERROR_MARKERS, DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY, @@ -52,7 +48,9 @@ from app.services.openclaw.constants import ( MAIN_TEMPLATE_MAP, PRESERVE_AGENT_EDITABLE_FILES, ) +from app.services.openclaw.internal import agent_key as _agent_key from app.services.openclaw.shared import GatewayAgentIdentity +from app.services.organizations import get_org_owner_user if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession @@ -69,28 +67,6 @@ class ProvisionOptions: reset_session: bool = False -@dataclass(frozen=True, slots=True) -class AgentProvisionRequest: - """Inputs required to provision a board-scoped agent.""" - - board: Board - gateway: Gateway - auth_token: str - user: User | None - options: ProvisionOptions = field(default_factory=ProvisionOptions) - - -@dataclass(frozen=True, slots=True) -class MainAgentProvisionRequest: - """Inputs required to provision a gateway main agent.""" - - gateway: Gateway - auth_token: str - user: User | None - session_key: str | None = None - options: ProvisionOptions = field(default_factory=ProvisionOptions) - - def _repo_root() -> Path: return Path(__file__).resolve().parents[3] @@ -104,62 +80,6 @@ def _slugify(value: str) -> str: return slug or uuid4().hex -def _clean_str(value: object) -> str | None: - if isinstance(value, str) and value.strip(): - return value.strip() - return None - - -def _extract_agent_id_from_item(item: object) -> str | None: - if isinstance(item, str): - return _clean_str(item) - if not isinstance(item, dict): - return None - for key in ("id", "agentId", "agent_id"): - agent_id = _clean_str(item.get(key)) - if agent_id: - return agent_id - return None - - -def _extract_agent_id_from_list(items: object) -> str | None: - if not isinstance(items, list): - return None - for item in items: - agent_id = _extract_agent_id_from_item(item) - if agent_id: - return agent_id - return None - - -def _extract_agent_id(payload: object) -> str | None: - default_keys = ("defaultId", "default_id", "defaultAgentId", "default_agent_id") - collection_keys = ("agents", "items", "list", "data") - - if isinstance(payload, list): - return _extract_agent_id_from_list(payload) - if not isinstance(payload, dict): - return None - for key in default_keys: - agent_id = _clean_str(payload.get(key)) - if agent_id: - return agent_id - for key in collection_keys: - agent_id = _extract_agent_id_from_list(payload.get(key)) - if agent_id: - return agent_id - return None - - -def _agent_key(agent: Agent) -> str: - session_key = agent.openclaw_session_id or "" - if session_key.startswith("agent:"): - parts = session_key.split(":") - if len(parts) >= _SESSION_KEY_PARTS_MIN and parts[1]: - return parts[1] - return _slugify(agent.name) - - def _heartbeat_config(agent: Agent) -> dict[str, Any]: merged = DEFAULT_HEARTBEAT_CONFIG.copy() if isinstance(agent.heartbeat_config, dict): @@ -339,9 +259,14 @@ def _build_main_context( def _session_key(agent: Agent) -> str: - if agent.openclaw_session_id: - return agent.openclaw_session_id - return f"agent:{_agent_key(agent)}:main" + """Return the deterministic session key for a board-scoped agent. + + Note: Never derive session keys from a human-provided name; use stable ids instead. + """ + + if agent.is_board_lead and agent.board_id is not None: + return f"agent:lead-{agent.board_id}:main" + return f"agent:mc-{agent.id}:main" def _render_agent_files( @@ -425,10 +350,6 @@ class GatewayControlPlane(ABC): async def delete_agent(self, agent_id: str, *, delete_files: bool = True) -> None: raise NotImplementedError - @abstractmethod - async def list_supported_files(self) -> set[str]: - raise NotImplementedError - @abstractmethod async def list_agent_files(self, agent_id: str) -> dict[str, dict[str, Any]]: raise NotImplementedError @@ -466,35 +387,11 @@ class OpenClawGatewayControlPlane(GatewayControlPlane): return await openclaw_call("sessions.delete", {"key": session_key}, config=self._config) - async def _agent_ids(self) -> set[str]: - payload = await openclaw_call("agents.list", config=self._config) - raw_agents: object = payload - if isinstance(payload, dict): - raw_agents = payload.get("agents") or [] - if not isinstance(raw_agents, list): - return set() - ids: set[str] = set() - for item in raw_agents: - agent_id = _extract_agent_id_from_item(item) - if agent_id: - ids.add(agent_id) - return ids - async def upsert_agent(self, registration: GatewayAgentRegistration) -> None: - agent_ids = await self._agent_ids() - if registration.agent_id in agent_ids: - await openclaw_call( - "agents.update", - { - "agentId": registration.agent_id, - "name": registration.name, - "workspace": registration.workspace_path, - }, - config=self._config, - ) - else: - # `agents.create` derives `agentId` from `name`, so create with the target id - # and then set the human-facing name in a follow-up update. + # Prefer an idempotent "create then update" flow. + # - Avoids a dependency on `agents.list` (which may surface gateway defaults like `main`). + # - Ensures we always hit the "create" RPC first, per lifecycle expectations. + try: await openclaw_call( "agents.create", { @@ -503,16 +400,19 @@ class OpenClawGatewayControlPlane(GatewayControlPlane): }, config=self._config, ) - if registration.name != registration.agent_id: - await openclaw_call( - "agents.update", - { - "agentId": registration.agent_id, - "name": registration.name, - "workspace": registration.workspace_path, - }, - config=self._config, - ) + except OpenClawGatewayError as exc: + message = str(exc).lower() + if not any(marker in message for marker in ("already", "exist", "duplicate", "conflict")): + raise + await openclaw_call( + "agents.update", + { + "agentId": registration.agent_id, + "name": registration.name, + "workspace": registration.workspace_path, + }, + config=self._config, + ) await self.patch_agent_heartbeats( [(registration.agent_id, registration.workspace_path, registration.heartbeat)], ) @@ -524,37 +424,6 @@ class OpenClawGatewayControlPlane(GatewayControlPlane): config=self._config, ) - async def list_supported_files(self) -> set[str]: - agents_payload = await openclaw_call("agents.list", config=self._config) - agent_id = _extract_agent_id(agents_payload) - if not agent_id: - return set(DEFAULT_GATEWAY_FILES) - files_payload = await openclaw_call( - "agents.files.list", - {"agentId": agent_id}, - config=self._config, - ) - if not isinstance(files_payload, dict): - return set(DEFAULT_GATEWAY_FILES) - files = files_payload.get("files") or [] - if not isinstance(files, list): - return set(DEFAULT_GATEWAY_FILES) - supported: set[str] = set() - for item in files: - if not isinstance(item, dict): - continue - name = item.get("name") - if not isinstance(name, str) or not name: - name = item.get("path") - if isinstance(name, str) and name: - supported.add(name) - - # Always include Mission Control's expected template files even if the gateway's default - # agent reports a different file set (e.g. `prompts/system.md`). This prevents provisioning - # from silently skipping our templates due to gateway-side defaults or version skew. - supported.update(DEFAULT_GATEWAY_FILES) - return supported - async def list_agent_files(self, agent_id: str) -> dict[str, dict[str, Any]]: payload = await openclaw_call( "agents.files.list", @@ -689,6 +558,10 @@ class BaseAgentLifecycleManager(ABC): def _template_overrides(self) -> dict[str, str] | None: return None + def _preserve_files(self) -> set[str]: + """Files that are expected to evolve inside the agent workspace.""" + return set(PRESERVE_AGENT_EDITABLE_FILES) + async def _set_agent_files( self, *, @@ -703,10 +576,15 @@ class BaseAgentLifecycleManager(ABC): # Preserve "editable" files only during updates. During first-time provisioning, # the gateway may pre-create defaults for USER/SELF/etc, and we still want to # apply Mission Control's templates. - if action == "update" and name in PRESERVE_AGENT_EDITABLE_FILES: + if action == "update" and name in self._preserve_files(): entry = existing_files.get(name) if entry and not bool(entry.get("missing")): - continue + size = entry.get("size") + if isinstance(size, int) and size == 0: + # Treat 0-byte placeholders as missing so update can fill them. + pass + else: + continue try: await self._control_plane.set_agent_file( agent_id=agent_id, @@ -732,12 +610,8 @@ class BaseAgentLifecycleManager(ABC): if not self._gateway.workspace_root: msg = "gateway_workspace_root is required" raise ValueError(msg) - if not agent.openclaw_session_id: - agent.openclaw_session_id = session_key - await self._control_plane.ensure_agent_session( - session_key, - label=session_label or agent.name, - ) + # Ensure templates render with the active deterministic session key. + agent.openclaw_session_id = session_key agent_id = self._agent_id(agent) workspace_path = _workspace_path(agent, self._gateway.workspace_root) @@ -757,8 +631,9 @@ class BaseAgentLifecycleManager(ABC): user=user, board=board, ) - supported = await self._control_plane.list_supported_files() - supported.update({"USER.md", "SELF.md", "AUTONOMY.md"}) + # Always attempt to sync Mission Control's full template set. + # Do not introspect gateway defaults (avoids touching gateway "main" agent state). + file_names = set(DEFAULT_GATEWAY_FILES) existing_files = await self._control_plane.list_agent_files(agent_id) include_bootstrap = _should_include_bootstrap( action=options.action, @@ -768,7 +643,7 @@ class BaseAgentLifecycleManager(ABC): rendered = _render_agent_files( context, agent, - supported, + file_names, include_bootstrap=include_bootstrap, template_overrides=self._template_overrides(), ) @@ -780,7 +655,9 @@ class BaseAgentLifecycleManager(ABC): action=options.action, ) if options.reset_session: - await self._control_plane.reset_agent_session(session_key) + # Session resets are useful but should never block file sync. + with suppress(OpenClawGatewayError): + await self._control_plane.reset_agent_session(session_key) class BoardAgentLifecycleManager(BaseAgentLifecycleManager): @@ -823,6 +700,13 @@ class GatewayMainAgentLifecycleManager(BaseAgentLifecycleManager): def _template_overrides(self) -> dict[str, str] | None: return MAIN_TEMPLATE_MAP + def _preserve_files(self) -> set[str]: + # For gateway-main agents, USER.md is system-managed (derived from org/user context), + # so keep it in sync even during updates. + preserved = super()._preserve_files() + preserved.discard("USER.md") + return preserved + def _control_plane_for_gateway(gateway: Gateway) -> OpenClawGatewayControlPlane: if not gateway.url: @@ -833,7 +717,7 @@ def _control_plane_for_gateway(gateway: Gateway) -> OpenClawGatewayControlPlane: ) -async def patch_gateway_agent_heartbeats( +async def _patch_gateway_agent_heartbeats( gateway: Gateway, *, entries: list[tuple[str, str, dict[str, Any]]], @@ -846,22 +730,6 @@ async def patch_gateway_agent_heartbeats( await control_plane.patch_agent_heartbeats(entries) -async def sync_gateway_agent_heartbeats(gateway: Gateway, agents: list[Agent]) -> None: - """Sync current Agent.heartbeat_config values to the gateway config.""" - if not gateway.workspace_root: - msg = "gateway workspace_root is required" - raise OpenClawGatewayError(msg) - entries: list[tuple[str, str, dict[str, Any]]] = [] - for agent in agents: - agent_id = _agent_key(agent) - workspace_path = _workspace_path(agent, gateway.workspace_root) - heartbeat = _heartbeat_config(agent) - entries.append((agent_id, workspace_path, heartbeat)) - if not entries: - return - await patch_gateway_agent_heartbeats(gateway, entries=entries) - - def _should_include_bootstrap( *, action: str, @@ -876,92 +744,334 @@ def _should_include_bootstrap( return not bool(entry and entry.get("missing")) -async def provision_agent( - agent: Agent, - request: AgentProvisionRequest, -) -> None: - """Provision or update a regular board agent workspace.""" - gateway = request.gateway - if not gateway.url: - return - session_key = _session_key(agent) - control_plane = _control_plane_for_gateway(gateway) - manager = BoardAgentLifecycleManager(gateway, control_plane) - await manager.provision( - agent=agent, - board=request.board, - session_key=session_key, - auth_token=request.auth_token, - user=request.user, - options=request.options, +def _wakeup_text(agent: Agent, *, verb: str) -> str: + return ( + f"Hello {agent.name}. Your workspace has been {verb}.\n\n" + "Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run it once " + "then delete it. Begin heartbeats after startup." ) -async def provision_main_agent( - agent: Agent, - request: MainAgentProvisionRequest, -) -> None: - """Provision or update the gateway main agent workspace.""" - gateway = request.gateway - if not gateway.url: - return - session_key = (request.session_key or GatewayAgentIdentity.session_key(gateway) or "").strip() - if not session_key: - msg = "gateway main agent session_key is required" - raise ValueError(msg) - control_plane = _control_plane_for_gateway(gateway) - manager = GatewayMainAgentLifecycleManager(gateway, control_plane) - await manager.provision( - agent=agent, - session_key=session_key, - auth_token=request.auth_token, - user=request.user, - options=request.options, - session_label=agent.name or "Gateway Agent", - ) +class OpenClawProvisioningService: + """High-level agent provisioning interface (create -> files -> wake). + This is the public entrypoint for agent lifecycle orchestration. Internals are + implemented as module-private helpers and lifecycle manager classes. + """ -async def cleanup_agent( - agent: Agent, - gateway: Gateway, -) -> str | None: - """Remove an agent from gateway config and delete its session.""" - if not gateway.url: - return None - if not gateway.workspace_root: - msg = "gateway_workspace_root is required" - raise ValueError(msg) - control_plane = _control_plane_for_gateway(gateway) - agent_id = _agent_key(agent) - await control_plane.delete_agent(agent_id, delete_files=True) + def __init__(self, session: AsyncSession | None = None) -> None: + self._session = session - session_key = _session_key(agent) - with suppress(OpenClawGatewayError): - await control_plane.delete_agent_session(session_key) - return None + def _require_session(self) -> AsyncSession: + if self._session is None: + msg = "AsyncSession is required for this operation" + raise ValueError(msg) + return self._session + async def sync_gateway_agent_heartbeats(self, gateway: Gateway, agents: list[Agent]) -> None: + """Sync current Agent.heartbeat_config values to the gateway config.""" + if not gateway.workspace_root: + msg = "gateway workspace_root is required" + raise OpenClawGatewayError(msg) + entries: list[tuple[str, str, dict[str, Any]]] = [] + for agent in agents: + agent_id = _agent_key(agent) + workspace_path = _workspace_path(agent, gateway.workspace_root) + heartbeat = _heartbeat_config(agent) + entries.append((agent_id, workspace_path, heartbeat)) + if not entries: + return + await _patch_gateway_agent_heartbeats(gateway, entries=entries) -async def cleanup_main_agent( - agent: Agent, - gateway: Gateway, -) -> str | None: - """Remove the gateway-main agent from gateway config and delete its session.""" - if not gateway.url: - return None - if not gateway.workspace_root: - msg = "gateway_workspace_root is required" - raise ValueError(msg) + async def apply_agent_lifecycle( + self, + *, + agent: Agent, + gateway: Gateway, + board: Board | None, + auth_token: str, + user: User | None, + action: str = "provision", + force_bootstrap: bool = False, + reset_session: bool = False, + wake: bool = True, + deliver_wakeup: bool = True, + wakeup_verb: str | None = None, + ) -> None: + """Create/update an agent, sync all template files, and optionally wake the agent. - workspace_path = _workspace_path(agent, gateway.workspace_root) - control_plane = _control_plane_for_gateway(gateway) - agent_id = GatewayAgentIdentity.openclaw_agent_id(gateway) - await control_plane.delete_agent(agent_id, delete_files=True) + Lifecycle steps (same for all agent types): + 1) create agent (idempotent) + 2) set/update all template files (best-effort for unsupported files) + 3) wake the agent session (chat.send) + """ - session_key = (agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "").strip() - if session_key: - with suppress(OpenClawGatewayError): - await control_plane.delete_agent_session(session_key) - return workspace_path + if not gateway.url: + return + + # Guard against accidental main-agent provisioning without a board. + if board is None and getattr(agent, "board_id", None) is not None: + msg = "board is required for board-scoped agent lifecycle" + raise ValueError(msg) + + # Resolve session key and agent type. + if board is None: + session_key = (agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "").strip() + if not session_key: + msg = "gateway main agent session_key is required" + raise ValueError(msg) + manager_type: type[BaseAgentLifecycleManager] = GatewayMainAgentLifecycleManager + else: + session_key = _session_key(agent) + manager_type = BoardAgentLifecycleManager + + control_plane = _control_plane_for_gateway(gateway) + manager = manager_type(gateway, control_plane) + await manager.provision( + agent=agent, + board=board, + session_key=session_key, + auth_token=auth_token, + user=user, + options=ProvisionOptions( + action=action, + force_bootstrap=force_bootstrap, + reset_session=False, # handled below + ), + session_label=agent.name or "Gateway Agent", + ) + + if reset_session: + with suppress(OpenClawGatewayError): + await control_plane.reset_agent_session(session_key) + + if not wake: + return + + client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) + await ensure_session(session_key, config=client_config, label=agent.name) + verb = wakeup_verb or ("provisioned" if action == "provision" else "updated") + await send_message( + _wakeup_text(agent, verb=verb), + session_key=session_key, + config=client_config, + deliver=deliver_wakeup, + ) + + async def delete_agent_lifecycle( + self, + *, + agent: Agent, + gateway: Gateway, + delete_files: bool = True, + delete_session: bool = True, + ) -> str | None: + """Remove agent runtime state from the gateway (agent + optional session).""" + + if not gateway.url: + return None + if not gateway.workspace_root: + msg = "gateway_workspace_root is required" + raise ValueError(msg) + + workspace_path = _workspace_path(agent, gateway.workspace_root) + control_plane = _control_plane_for_gateway(gateway) + + if agent.board_id is None: + agent_gateway_id = GatewayAgentIdentity.openclaw_agent_id(gateway) + else: + agent_gateway_id = _agent_key(agent) + await control_plane.delete_agent(agent_gateway_id, delete_files=delete_files) + + if delete_session: + if agent.board_id is None: + session_key = (agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "").strip() + else: + session_key = _session_key(agent) + if session_key: + with suppress(OpenClawGatewayError): + await control_plane.delete_agent_session(session_key) + + return workspace_path + + async def sync_gateway_templates( + self, + gateway: Gateway, + options: GatewayTemplateSyncOptions, + ) -> GatewayTemplatesSyncResult: + """Synchronize AGENTS/TOOLS/etc templates to gateway-connected agents.""" + session = self._require_session() + template_user = options.user + if template_user is None: + template_user = await get_org_owner_user( + session, + organization_id=gateway.organization_id, + ) + options = GatewayTemplateSyncOptions( + user=template_user, + include_main=options.include_main, + reset_sessions=options.reset_sessions, + rotate_tokens=options.rotate_tokens, + force_bootstrap=options.force_bootstrap, + board_id=options.board_id, + ) + result = _base_result( + gateway, + include_main=options.include_main, + reset_sessions=options.reset_sessions, + ) + if not gateway.url: + _append_sync_error( + result, + message="Gateway URL is not configured for this gateway.", + ) + return result + + ctx = _SyncContext( + session=session, + gateway=gateway, + config=GatewayClientConfig(url=gateway.url, token=gateway.token), + backoff=_GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"), + options=options, + provisioner=self, + ) + if not await _ping_gateway(ctx, result): + return result + + boards = await Board.objects.filter_by(gateway_id=gateway.id).all(session) + boards_by_id = _boards_by_id(boards, board_id=options.board_id) + if boards_by_id is None: + _append_sync_error( + result, + message="Board does not belong to this gateway.", + ) + return result + paused_board_ids = await _paused_board_ids(session, list(boards_by_id.keys())) + if boards_by_id: + agents = await ( + Agent.objects.by_field_in("board_id", list(boards_by_id.keys())) + .order_by(col(Agent.created_at).asc()) + .all(session) + ) + else: + agents = [] + + stop_sync = False + for agent in agents: + board = boards_by_id.get(agent.board_id) if agent.board_id is not None else None + if board is None: + result.agents_skipped += 1 + _append_sync_error( + result, + agent=agent, + message="Skipping agent: board not found for agent.", + ) + continue + if board.id in paused_board_ids: + result.agents_skipped += 1 + continue + stop_sync = await _sync_one_agent(ctx, result, agent, board) + if stop_sync: + break + + if not stop_sync and options.include_main: + await _sync_main_agent(ctx, result) + return result + + @staticmethod + def lead_session_key(board: Board) -> str: + """Return the deterministic session key for a board lead agent.""" + return f"agent:lead-{board.id}:main" + + @staticmethod + def lead_agent_name(_: Board) -> str: + """Return the default display name for board lead agents.""" + return "Lead Agent" + + async def ensure_board_lead_agent( + self, + *, + request: LeadAgentRequest, + ) -> tuple[Agent, bool]: + """Ensure a board has a lead agent; return `(agent, created)`.""" + session = self._require_session() + board = request.board + config_options = request.options + existing = ( + await session.exec( + select(Agent) + .where(Agent.board_id == board.id) + .where(col(Agent.is_board_lead).is_(True)), + ) + ).first() + if existing: + desired_name = config_options.agent_name or self.lead_agent_name(board) + changed = False + if existing.name != desired_name: + existing.name = desired_name + changed = True + if existing.gateway_id != request.gateway.id: + existing.gateway_id = request.gateway.id + changed = True + desired_session_key = self.lead_session_key(board) + if existing.openclaw_session_id != desired_session_key: + existing.openclaw_session_id = desired_session_key + changed = True + if changed: + existing.updated_at = utcnow() + session.add(existing) + await session.commit() + await session.refresh(existing) + return existing, False + + merged_identity_profile: dict[str, Any] = { + "role": "Board Lead", + "communication_style": "direct, concise, practical", + "emoji": ":gear:", + } + if config_options.identity_profile: + merged_identity_profile.update( + { + key: value.strip() + for key, value in config_options.identity_profile.items() + if value.strip() + }, + ) + + agent = Agent( + name=config_options.agent_name or self.lead_agent_name(board), + status="provisioning", + board_id=board.id, + gateway_id=request.gateway.id, + is_board_lead=True, + heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), + identity_profile=merged_identity_profile, + openclaw_session_id=self.lead_session_key(board), + provision_requested_at=utcnow(), + provision_action=config_options.action, + ) + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + session.add(agent) + await session.commit() + await session.refresh(agent) + + try: + await self.apply_agent_lifecycle( + agent=agent, + gateway=request.gateway, + board=board, + auth_token=raw_token, + user=request.user, + action=config_options.action, + wake=True, + deliver_wakeup=True, + ) + except OpenClawGatewayError: + # Best-effort provisioning. The board/agent rows should still exist. + pass + + return agent, True _T = TypeVar("_T") @@ -988,6 +1098,7 @@ class _SyncContext: config: GatewayClientConfig backoff: _GatewayBackoff options: GatewayTemplateSyncOptions + provisioner: OpenClawProvisioningService def _is_transient_gateway_error(exc: Exception) -> bool: @@ -1091,19 +1202,6 @@ async def _with_gateway_retry( return await backoff.run(fn) -async def _with_coordination_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T: - return await _with_gateway_retry( - fn, - backoff=_GatewayBackoff( - timeout_s=_COORDINATION_GATEWAY_TIMEOUT_S, - base_delay_s=_COORDINATION_GATEWAY_BASE_DELAY_S, - max_delay_s=_COORDINATION_GATEWAY_MAX_DELAY_S, - jitter=0.15, - timeout_context="gateway coordination", - ), - ) - - def _parse_tools_md(content: str) -> dict[str, str]: values: dict[str, str] = {} for raw in content.splitlines(): @@ -1226,7 +1324,8 @@ async def _ping_gateway(ctx: _SyncContext, result: GatewayTemplatesSyncResult) - try: async def _do_ping() -> object: - return await openclaw_call("agents.list", config=ctx.config) + # Use a lightweight health probe; avoid enumerating gateway agents. + return await openclaw_call("health", config=ctx.config) await ctx.backoff.run(_do_ping) except (TimeoutError, OpenClawGatewayError) as exc: @@ -1338,19 +1437,16 @@ async def _sync_one_agent( try: async def _do_provision() -> bool: - await provision_agent( - agent, - AgentProvisionRequest( - board=board, - gateway=ctx.gateway, - auth_token=auth_token, - user=ctx.options.user, - options=ProvisionOptions( - action="update", - force_bootstrap=ctx.options.force_bootstrap, - reset_session=ctx.options.reset_sessions, - ), - ), + await ctx.provisioner.apply_agent_lifecycle( + agent=agent, + gateway=ctx.gateway, + board=board, + auth_token=auth_token, + user=ctx.options.user, + action="update", + force_bootstrap=ctx.options.force_bootstrap, + reset_session=ctx.options.reset_sessions, + wake=False, ) return True @@ -1377,7 +1473,6 @@ async def _sync_main_agent( ctx: _SyncContext, result: GatewayTemplatesSyncResult, ) -> bool: - main_session_key = GatewayAgentIdentity.session_key(ctx.gateway) main_agent = ( await Agent.objects.all() .filter(col(Agent.gateway_id) == ctx.gateway.id) @@ -1412,19 +1507,16 @@ async def _sync_main_agent( try: async def _do_provision_main() -> bool: - await provision_main_agent( - main_agent, - MainAgentProvisionRequest( - gateway=ctx.gateway, - auth_token=token, - user=ctx.options.user, - session_key=main_session_key, - options=ProvisionOptions( - action="update", - force_bootstrap=ctx.options.force_bootstrap, - reset_session=ctx.options.reset_sessions, - ), - ), + await ctx.provisioner.apply_agent_lifecycle( + agent=main_agent, + gateway=ctx.gateway, + board=None, + auth_token=token, + user=ctx.options.user, + action="update", + force_bootstrap=ctx.options.force_bootstrap, + reset_session=ctx.options.reset_sessions, + wake=False, ) return True @@ -1443,86 +1535,6 @@ async def _sync_main_agent( return stop_sync -async def sync_gateway_templates( - session: AsyncSession, - gateway: Gateway, - options: GatewayTemplateSyncOptions, -) -> GatewayTemplatesSyncResult: - """Synchronize AGENTS/TOOLS/etc templates to gateway-connected agents.""" - result = _base_result( - gateway, - include_main=options.include_main, - reset_sessions=options.reset_sessions, - ) - if not gateway.url: - _append_sync_error( - result, - message="Gateway URL is not configured for this gateway.", - ) - return result - - ctx = _SyncContext( - session=session, - gateway=gateway, - config=GatewayClientConfig(url=gateway.url, token=gateway.token), - backoff=_GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"), - options=options, - ) - if not await _ping_gateway(ctx, result): - return result - - boards = await Board.objects.filter_by(gateway_id=gateway.id).all(session) - boards_by_id = _boards_by_id(boards, board_id=options.board_id) - if boards_by_id is None: - _append_sync_error( - result, - message="Board does not belong to this gateway.", - ) - return result - paused_board_ids = await _paused_board_ids(session, list(boards_by_id.keys())) - if boards_by_id: - agents = await ( - Agent.objects.by_field_in("board_id", list(boards_by_id.keys())) - .order_by(col(Agent.created_at).asc()) - .all(session) - ) - else: - agents = [] - - stop_sync = False - for agent in agents: - board = boards_by_id.get(agent.board_id) if agent.board_id is not None else None - if board is None: - result.agents_skipped += 1 - _append_sync_error( - result, - agent=agent, - message="Skipping agent: board not found for agent.", - ) - continue - if board.id in paused_board_ids: - result.agents_skipped += 1 - continue - stop_sync = await _sync_one_agent(ctx, result, agent, board) - if stop_sync: - break - - if not stop_sync and options.include_main: - await _sync_main_agent(ctx, result) - return result - - -# Board lead lifecycle primitives consolidated from app.services.board_leads. -def lead_session_key(board: Board) -> str: - """Return the deterministic main session key for a board lead agent.""" - return f"agent:lead-{board.id}:main" - - -def lead_agent_name(_: Board) -> str: - """Return the default display name for board lead agents.""" - return "Lead Agent" - - @dataclass(frozen=True, slots=True) class LeadAgentOptions: """Optional overrides for board-lead provisioning behavior.""" @@ -1541,104 +1553,3 @@ class LeadAgentRequest: config: GatewayClientConfig user: User | None options: LeadAgentOptions = field(default_factory=LeadAgentOptions) - - -async def ensure_board_lead_agent( - session: AsyncSession, - *, - request: LeadAgentRequest, -) -> tuple[Agent, bool]: - """Ensure a board has a lead agent; return `(agent, created)`.""" - board = request.board - config_options = request.options - existing = ( - await session.exec( - select(Agent) - .where(Agent.board_id == board.id) - .where(col(Agent.is_board_lead).is_(True)), - ) - ).first() - if existing: - desired_name = config_options.agent_name or lead_agent_name(board) - changed = False - if existing.name != desired_name: - existing.name = desired_name - changed = True - if existing.gateway_id != request.gateway.id: - existing.gateway_id = request.gateway.id - changed = True - desired_session_key = lead_session_key(board) - if not existing.openclaw_session_id: - existing.openclaw_session_id = desired_session_key - changed = True - if changed: - existing.updated_at = utcnow() - session.add(existing) - await session.commit() - await session.refresh(existing) - return existing, False - - merged_identity_profile: dict[str, Any] = { - "role": "Board Lead", - "communication_style": "direct, concise, practical", - "emoji": ":gear:", - } - if config_options.identity_profile: - merged_identity_profile.update( - { - key: value.strip() - for key, value in config_options.identity_profile.items() - if value.strip() - }, - ) - - agent = Agent( - name=config_options.agent_name or lead_agent_name(board), - status="provisioning", - board_id=board.id, - gateway_id=request.gateway.id, - is_board_lead=True, - heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), - identity_profile=merged_identity_profile, - openclaw_session_id=lead_session_key(board), - provision_requested_at=utcnow(), - provision_action=config_options.action, - ) - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - session.add(agent) - await session.commit() - await session.refresh(agent) - - try: - await provision_agent( - agent, - AgentProvisionRequest( - board=board, - gateway=request.gateway, - auth_token=raw_token, - user=request.user, - options=ProvisionOptions(action=config_options.action), - ), - ) - if agent.openclaw_session_id: - await ensure_session( - agent.openclaw_session_id, - config=request.config, - label=agent.name, - ) - await send_message( - ( - f"Hello {agent.name}. Your workspace has been provisioned.\n\n" - "Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run " - "it once then delete it. Begin heartbeats after startup." - ), - session_key=agent.openclaw_session_id, - config=request.config, - deliver=True, - ) - except OpenClawGatewayError: - # Best-effort provisioning. The board/agent rows should still exist. - pass - - return agent, True diff --git a/backend/app/services/organizations.py b/backend/app/services/organizations.py index 15223daa..84a426d9 100644 --- a/backend/app/services/organizations.py +++ b/backend/app/services/organizations.py @@ -61,6 +61,23 @@ async def get_member( ).first(session) +async def get_org_owner_user( + session: AsyncSession, + *, + organization_id: UUID, +) -> User | None: + """Return the org owner User, if one exists.""" + owner = ( + await OrganizationMember.objects.filter_by(organization_id=organization_id) + .filter(col(OrganizationMember.role) == "owner") + .order_by(col(OrganizationMember.created_at).asc()) + .first(session) + ) + if owner is None: + return None + return await User.objects.by_id(owner.user_id).first(session) + + async def get_first_membership( session: AsyncSession, user_id: UUID, diff --git a/backend/scripts/sync_gateway_templates.py b/backend/scripts/sync_gateway_templates.py index 2312a6da..d3098d64 100644 --- a/backend/scripts/sync_gateway_templates.py +++ b/backend/scripts/sync_gateway_templates.py @@ -54,7 +54,7 @@ async def _run() -> int: from app.models.gateways import Gateway from app.services.openclaw.provisioning import ( GatewayTemplateSyncOptions, - sync_gateway_templates, + OpenClawProvisioningService, ) args = _parse_args() @@ -67,10 +67,9 @@ async def _run() -> int: message = f"Gateway not found: {gateway_id}" raise SystemExit(message) - result = await sync_gateway_templates( - session, + result = await OpenClawProvisioningService(session).sync_gateway_templates( gateway, - options=GatewayTemplateSyncOptions( + GatewayTemplateSyncOptions( user=None, include_main=bool(args.include_main), reset_sessions=bool(args.reset_sessions), diff --git a/backend/tests/test_agent_delete_main_agent.py b/backend/tests/test_agent_delete_main_agent.py index 648df6aa..6e4b8aac 100644 --- a/backend/tests/test_agent_delete_main_agent.py +++ b/backend/tests/test_agent_delete_main_agent.py @@ -88,10 +88,18 @@ async def test_delete_gateway_main_agent_does_not_require_board_id(monkeypatch: async def _should_not_be_called(*_args, **_kwargs): raise AssertionError("require_board/require_gateway should not be called for main agents") - called: dict[str, int] = {"cleanup_main": 0} + called: dict[str, int] = {"delete_lifecycle": 0} - async def _fake_cleanup_main_agent(_agent: object, _gateway: object) -> str | None: - called["cleanup_main"] += 1 + async def _fake_delete_agent_lifecycle( + _self, + *, + agent: object, + gateway: object, + delete_files: bool = True, + delete_session: bool = True, + ) -> str | None: + _ = (_self, agent, gateway, delete_files, delete_session) + called["delete_lifecycle"] += 1 return "/tmp/openclaw/workspace-gateway-x" async def _fake_update_where(*_args, **_kwargs) -> None: @@ -100,13 +108,16 @@ async def test_delete_gateway_main_agent_does_not_require_board_id(monkeypatch: monkeypatch.setattr(service, "require_agent_access", _no_access_check) monkeypatch.setattr(service, "require_board", _should_not_be_called) monkeypatch.setattr(service, "require_gateway", _should_not_be_called) - monkeypatch.setattr(agent_service, "cleanup_main_agent", _fake_cleanup_main_agent) + monkeypatch.setattr( + agent_service.OpenClawProvisioningService, + "delete_agent_lifecycle", + _fake_delete_agent_lifecycle, + ) monkeypatch.setattr(agent_service.crud, "update_where", _fake_update_where) monkeypatch.setattr(agent_service, "record_activity", lambda *_a, **_k: None) result = await service.delete_agent(agent_id=str(agent.id), ctx=ctx) # type: ignore[arg-type] assert result.ok is True - assert called["cleanup_main"] == 1 + assert called["delete_lifecycle"] == 1 assert session.deleted and session.deleted[0] == agent - diff --git a/backend/tests/test_agent_provisioning_utils.py b/backend/tests/test_agent_provisioning_utils.py index debccddb..e7a6f887 100644 --- a/backend/tests/test_agent_provisioning_utils.py +++ b/backend/tests/test_agent_provisioning_utils.py @@ -25,27 +25,6 @@ def test_slugify_falls_back_to_uuid_hex(monkeypatch): assert agent_provisioning._slugify("!!!") == "deadbeef" -def test_extract_agent_id_supports_lists_and_dicts(): - assert agent_provisioning._extract_agent_id(["", " ", "abc"]) == "abc" - assert agent_provisioning._extract_agent_id([{"agent_id": "xyz"}]) == "xyz" - - payload = { - "defaultAgentId": "dflt", - "agents": [{"id": "ignored"}], - } - assert agent_provisioning._extract_agent_id(payload) == "dflt" - - payload2 = { - "agents": [{"id": ""}, {"agentId": "foo"}], - } - assert agent_provisioning._extract_agent_id(payload2) == "foo" - - -def test_extract_agent_id_returns_none_for_unknown_shapes(): - assert agent_provisioning._extract_agent_id("nope") is None - assert agent_provisioning._extract_agent_id({"agents": "not-a-list"}) is None - - @dataclass class _AgentStub: name: str @@ -58,13 +37,13 @@ class _AgentStub: soul_template: str | None = None -def test_agent_key_uses_session_key_when_present(monkeypatch): +def test_agent_key_uses_session_key_when_present(): agent = _AgentStub(name="Alice", openclaw_session_id="agent:alice:main") assert agent_provisioning._agent_key(agent) == "alice" - monkeypatch.setattr(agent_provisioning, "_slugify", lambda value: "slugged") - agent2 = _AgentStub(name="Alice", openclaw_session_id=None) - assert agent_provisioning._agent_key(agent2) == "slugged" + agent2 = _AgentStub(name="Hello, World", openclaw_session_id=None) + assert agent_provisioning._agent_key(agent2) == "hello-world" + def test_workspace_path_preserves_tilde_in_workspace_root(): # Mission Control accepts a user-entered workspace root (from the UI) and must @@ -118,9 +97,6 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch captured["patched_agent_id"] = registration.agent_id captured["workspace_path"] = registration.workspace_path - async def _fake_list_supported_files(self): - return set() - async def _fake_list_agent_files(self, agent_id): captured["files_index_agent_id"] = agent_id return {} @@ -141,11 +117,6 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch "upsert_agent", _fake_upsert_agent, ) - monkeypatch.setattr( - agent_provisioning.OpenClawGatewayControlPlane, - "list_supported_files", - _fake_list_supported_files, - ) monkeypatch.setattr( agent_provisioning.OpenClawGatewayControlPlane, "list_agent_files", @@ -158,14 +129,14 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch _fake_set_agent_files, ) - await agent_provisioning.provision_main_agent( - agent, - agent_provisioning.MainAgentProvisionRequest( - gateway=gateway, - auth_token="secret-token", - user=None, - session_key=session_key, - ), + await agent_provisioning.OpenClawProvisioningService().apply_agent_lifecycle( + agent=agent, # type: ignore[arg-type] + gateway=gateway, # type: ignore[arg-type] + board=None, + auth_token="secret-token", + user=None, + action="provision", + wake=False, ) expected_agent_id = GatewayAgentIdentity.openclaw_agent_id_for_id(gateway_id) @@ -173,31 +144,6 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch assert captured["files_index_agent_id"] == expected_agent_id -@pytest.mark.asyncio -async def test_list_supported_files_always_includes_default_gateway_files(monkeypatch): - """Provisioning should not depend solely on whatever the gateway's default agent reports.""" - - async def _fake_openclaw_call(method, params=None, config=None): - _ = config - if method == "agents.list": - return {"defaultId": "main"} - if method == "agents.files.list": - assert params == {"agentId": "main"} - return {"files": [{"path": "prompts/system.md", "missing": True}]} - raise AssertionError(f"Unexpected method: {method}") - - monkeypatch.setattr(agent_provisioning, "openclaw_call", _fake_openclaw_call) - cp = agent_provisioning.OpenClawGatewayControlPlane( - agent_provisioning.GatewayClientConfig(url="ws://gateway.example/ws", token=None), - ) - supported = await cp.list_supported_files() - - # Newer gateways may surface other file paths, but we still must include our templates. - assert "prompts/system.md" in supported - for required in agent_provisioning.DEFAULT_GATEWAY_FILES: - assert required in supported - - @pytest.mark.asyncio async def test_provision_overwrites_user_md_on_first_provision(monkeypatch): """Gateway may pre-create USER.md; we still want MC's template on first provision.""" @@ -221,10 +167,6 @@ async def test_provision_overwrites_user_md_on_first_provision(monkeypatch): async def delete_agent(self, agent_id, *, delete_files=True): return None - async def list_supported_files(self): - # Minimal set. - return {"USER.md"} - async def list_agent_files(self, agent_id): # Pretend gateway created USER.md already. return {"USER.md": {"name": "USER.md", "missing": False}} @@ -268,3 +210,137 @@ async def test_provision_overwrites_user_md_on_first_provision(monkeypatch): action="provision", ) assert ("USER.md", "from-mc") in cp.writes + + +@pytest.mark.asyncio +async def test_set_agent_files_update_writes_zero_size_user_md(): + """Treat empty placeholder files as missing during update.""" + + class _ControlPlaneStub: + def __init__(self): + self.writes: list[tuple[str, str]] = [] + + async def ensure_agent_session(self, session_key, *, label=None): + return None + + async def reset_agent_session(self, session_key): + return None + + async def delete_agent_session(self, session_key): + return None + + async def upsert_agent(self, registration): + return None + + async def delete_agent(self, agent_id, *, delete_files=True): + return None + + async def list_agent_files(self, agent_id): + return {} + + async def set_agent_file(self, *, agent_id, name, content): + self.writes.append((name, content)) + + async def patch_agent_heartbeats(self, entries): + return None + + @dataclass + class _GatewayTiny: + id: UUID + name: str + url: str + token: str | None + workspace_root: str + + class _Manager(agent_provisioning.BaseAgentLifecycleManager): + def _agent_id(self, agent): + return "agent-x" + + def _build_context(self, *, agent, auth_token, user, board): + return {} + + gateway = _GatewayTiny( + id=uuid4(), + name="G", + url="ws://x", + token=None, + workspace_root="/tmp", + ) + cp = _ControlPlaneStub() + mgr = _Manager(gateway, cp) # type: ignore[arg-type] + + await mgr._set_agent_files( + agent_id="agent-x", + rendered={"USER.md": "filled"}, + existing_files={"USER.md": {"name": "USER.md", "missing": False, "size": 0}}, + action="update", + ) + assert ("USER.md", "filled") in cp.writes + + +@pytest.mark.asyncio +async def test_control_plane_upsert_agent_create_then_update(monkeypatch): + calls: list[tuple[str, dict[str, object] | None]] = [] + + async def _fake_openclaw_call(method, params=None, config=None): + _ = config + calls.append((method, params)) + if method == "agents.create": + return {"ok": True} + if method == "agents.update": + return {"ok": True} + if method == "config.get": + return {"hash": None, "config": {"agents": {"list": []}}} + if method == "config.patch": + return {"ok": True} + raise AssertionError(f"Unexpected method: {method}") + + monkeypatch.setattr(agent_provisioning, "openclaw_call", _fake_openclaw_call) + cp = agent_provisioning.OpenClawGatewayControlPlane( + agent_provisioning.GatewayClientConfig(url="ws://gateway.example/ws", token=None), + ) + await cp.upsert_agent( + agent_provisioning.GatewayAgentRegistration( + agent_id="board-agent-a", + name="Board Agent A", + workspace_path="/tmp/workspace-board-agent-a", + heartbeat={"every": "10m", "target": "none", "includeReasoning": False}, + ), + ) + + assert calls[0][0] == "agents.create" + assert calls[1][0] == "agents.update" + + +@pytest.mark.asyncio +async def test_control_plane_upsert_agent_handles_already_exists(monkeypatch): + calls: list[tuple[str, dict[str, object] | None]] = [] + + async def _fake_openclaw_call(method, params=None, config=None): + _ = config + calls.append((method, params)) + if method == "agents.create": + raise agent_provisioning.OpenClawGatewayError("already exists") + if method == "agents.update": + return {"ok": True} + if method == "config.get": + return {"hash": None, "config": {"agents": {"list": []}}} + if method == "config.patch": + return {"ok": True} + raise AssertionError(f"Unexpected method: {method}") + + monkeypatch.setattr(agent_provisioning, "openclaw_call", _fake_openclaw_call) + cp = agent_provisioning.OpenClawGatewayControlPlane( + agent_provisioning.GatewayClientConfig(url="ws://gateway.example/ws", token=None), + ) + await cp.upsert_agent( + agent_provisioning.GatewayAgentRegistration( + agent_id="board-agent-a", + name="Board Agent A", + workspace_path="/tmp/workspace-board-agent-a", + heartbeat={"every": "10m", "target": "none", "includeReasoning": False}, + ), + ) + + assert calls[0][0] == "agents.create" + assert calls[1][0] == "agents.update" diff --git a/backend/tests/test_authenticate_request_flow.py b/backend/tests/test_authenticate_request_flow.py index e721180a..130bf1ff 100644 --- a/backend/tests/test_authenticate_request_flow.py +++ b/backend/tests/test_authenticate_request_flow.py @@ -93,4 +93,3 @@ async def test_get_auth_context_optional_returns_none_for_agent_token( session=_FakeSession(), # type: ignore[arg-type] ) assert out is None -