refactor: replace direct calls to provisioning functions with OpenClawProvisioningService methods

This commit is contained in:
Abhimanyu Saharan
2026-02-10 22:30:14 +05:30
parent d9199f8d8d
commit ad75871e61
12 changed files with 703 additions and 844 deletions

View File

@@ -30,7 +30,7 @@ from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.view_models import BoardGroupSnapshot
from app.services.board_group_snapshot import build_group_snapshot
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
from app.services.openclaw.provisioning import sync_gateway_agent_heartbeats
from app.services.openclaw.provisioning import OpenClawProvisioningService
from app.services.openclaw.shared import GatewayTransportError
from app.services.organizations import (
OrganizationContext,
@@ -269,7 +269,10 @@ async def _sync_gateway_heartbeats(
failed_agent_ids.extend([agent.id for agent in gateway_agents])
continue
try:
await sync_gateway_agent_heartbeats(gateway, gateway_agents)
await OpenClawProvisioningService().sync_gateway_agent_heartbeats(
gateway,
gateway_agents,
)
except GatewayTransportError:
failed_agent_ids.extend([agent.id for agent in gateway_agents])
return failed_agent_ids

View File

@@ -38,7 +38,7 @@ from app.services.openclaw.policies import OpenClawAuthorizationPolicy
from app.services.openclaw.provisioning import (
LeadAgentOptions,
LeadAgentRequest,
ensure_board_lead_agent,
OpenClawProvisioningService,
)
from app.services.openclaw.shared import require_gateway_config_for_board
@@ -401,8 +401,7 @@ async def confirm_onboarding(
session.add(onboarding)
await session.commit()
await session.refresh(board)
await ensure_board_lead_agent(
session,
await OpenClawProvisioningService(session).ensure_board_lead_agent(
request=LeadAgentRequest(
board=board,
gateway=gateway,

View File

@@ -39,7 +39,7 @@ from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.view_models import BoardGroupSnapshot, BoardSnapshot
from app.services.board_group_snapshot import build_board_group_snapshot
from app.services.board_snapshot import build_board_snapshot
from app.services.openclaw.provisioning import cleanup_agent
from app.services.openclaw.provisioning import OpenClawProvisioningService
from app.services.openclaw.shared import GatewayTransportError
from app.services.organizations import OrganizationContext, board_access_filter
@@ -287,7 +287,10 @@ async def delete_board(
if config:
try:
for agent in agents:
await cleanup_agent(agent, config)
await OpenClawProvisioningService().delete_agent_lifecycle(
agent=agent,
gateway=config,
)
except GatewayTransportError as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,

View File

@@ -17,9 +17,7 @@ from app.db import crud
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import (
OpenClawGatewayError,
ensure_session,
openclaw_call,
send_message,
)
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
@@ -30,13 +28,11 @@ from app.schemas.gateways import GatewayTemplatesSyncResult
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
from app.services.openclaw.provisioning import (
GatewayTemplateSyncOptions,
MainAgentProvisionRequest,
ProvisionOptions,
provision_main_agent,
sync_gateway_templates,
OpenClawProvisioningService,
)
from app.services.openclaw.session_service import GatewayTemplateSyncQuery
from app.services.openclaw.shared import GatewayAgentIdentity
from app.services.organizations import get_org_owner_user
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -132,30 +128,6 @@ class GatewayAdminLifecycleService:
.first(self.session)
)
@staticmethod
def extract_agent_id_from_entry(item: object) -> str | None:
if isinstance(item, str):
value = item.strip()
return value or None
if not isinstance(item, dict):
return None
for key in ("id", "agentId", "agent_id"):
raw = item.get(key)
if isinstance(raw, str) and raw.strip():
return raw.strip()
return None
@staticmethod
def extract_agents_list(payload: object) -> list[object]:
if isinstance(payload, list):
return [item for item in payload]
if not isinstance(payload, dict):
return []
agents = payload.get("agents") or []
if not isinstance(agents, list):
return []
return [item for item in agents]
async def upsert_main_agent_record(self, gateway: Gateway) -> tuple[Agent, bool]:
changed = False
session_key = GatewayAgentIdentity.session_key(gateway)
@@ -210,13 +182,13 @@ class GatewayAdminLifecycleService:
config = GatewayClientConfig(url=gateway.url, token=gateway.token)
target_id = GatewayAgentIdentity.openclaw_agent_id(gateway)
try:
payload = await openclaw_call("agents.list", config=config)
except OpenClawGatewayError:
return True
for item in self.extract_agents_list(payload):
if self.extract_agent_id_from_entry(item) == target_id:
return True
await openclaw_call("agents.files.list", {"agentId": target_id}, config=config)
except OpenClawGatewayError as exc:
message = str(exc).lower()
if any(marker in message for marker in ("not found", "unknown agent", "no such agent")):
return False
return True
return True
async def provision_main_agent_record(
self,
@@ -227,7 +199,10 @@ class GatewayAdminLifecycleService:
action: str,
notify: bool,
) -> Agent:
session_key = GatewayAgentIdentity.session_key(gateway)
template_user = user or await get_org_owner_user(
self.session,
organization_id=gateway.organization_id,
)
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
agent.provision_requested_at = utcnow()
@@ -241,32 +216,15 @@ class GatewayAdminLifecycleService:
if not gateway.url:
return agent
try:
await provision_main_agent(
agent,
MainAgentProvisionRequest(
await OpenClawProvisioningService().apply_agent_lifecycle(
agent=agent,
gateway=gateway,
board=None,
auth_token=raw_token,
user=user,
session_key=session_key,
options=ProvisionOptions(action=action),
),
)
await ensure_session(
session_key,
config=GatewayClientConfig(url=gateway.url, token=gateway.token),
label=agent.name,
)
if notify:
await send_message(
(
f"Hello {agent.name}. Your gateway provisioning was updated.\n\n"
"Please re-read AGENTS.md, USER.md, HEARTBEAT.md, and TOOLS.md. "
"If BOOTSTRAP.md exists, run it once then delete it. "
"Begin heartbeats after startup."
),
session_key=session_key,
config=GatewayClientConfig(url=gateway.url, token=gateway.token),
deliver=True,
user=template_user,
action=action,
wake=notify,
deliver_wakeup=True,
)
self.logger.info(
"gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s",
@@ -388,8 +346,7 @@ class GatewayAdminLifecycleService:
query.include_main,
)
await self.ensure_gateway_agents_exist([gateway])
result = await sync_gateway_templates(
self.session,
result = await OpenClawProvisioningService(self.session).sync_gateway_templates(
gateway,
GatewayTemplateSyncOptions(
user=auth.user,

View File

@@ -49,15 +49,7 @@ from app.services.openclaw.constants import (
OFFLINE_AFTER,
)
from app.services.openclaw.policies import OpenClawAuthorizationPolicy
from app.services.openclaw.provisioning import (
AgentProvisionRequest,
MainAgentProvisionRequest,
ProvisionOptions,
cleanup_agent,
cleanup_main_agent,
provision_agent,
provision_main_agent,
)
from app.services.openclaw.provisioning import OpenClawProvisioningService
from app.services.openclaw.shared import GatewayAgentIdentity
from app.services.organizations import (
OrganizationContext,
@@ -176,11 +168,6 @@ class AbstractProvisionExecution(ABC):
)
try:
await self._provision()
await self._service.send_wakeup_message(
self.agent,
self.request.target.client_config,
verb=self._wakeup_verb,
)
self.agent.provision_confirm_token_hash = None
self.agent.provision_requested_at = None
self.agent.provision_action = None
@@ -256,19 +243,18 @@ class BoardAgentProvisionExecution(AbstractProvisionExecution):
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="board is required for non-main agent provisioning",
)
await provision_agent(
self.agent,
AgentProvisionRequest(
board=board,
await OpenClawProvisioningService().apply_agent_lifecycle(
agent=self.agent,
gateway=self.request.target.gateway,
board=board,
auth_token=self.request.raw_token,
user=self.request.user,
options=ProvisionOptions(
action=self._action,
force_bootstrap=self.request.force_bootstrap,
reset_session=True,
),
),
wake=True,
deliver_wakeup=True,
wakeup_verb=self._wakeup_verb,
)
@@ -276,19 +262,18 @@ class MainAgentProvisionExecution(AbstractProvisionExecution):
"""Provision execution for gateway-main agents."""
async def _provision(self) -> None:
await provision_main_agent(
self.agent,
MainAgentProvisionRequest(
await OpenClawProvisioningService().apply_agent_lifecycle(
agent=self.agent,
gateway=self.request.target.gateway,
board=None,
auth_token=self.request.raw_token,
user=self.request.user,
session_key=self.agent.openclaw_session_id,
options=ProvisionOptions(
action=self._action,
force_bootstrap=self.request.force_bootstrap,
reset_session=True,
),
),
wake=True,
deliver_wakeup=True,
wakeup_verb=self._wakeup_verb,
)
@@ -337,8 +322,25 @@ class AgentLifecycleService:
return slug or uuid4().hex
@classmethod
def build_session_key(cls, agent_name: str) -> str:
return f"{AGENT_SESSION_PREFIX}:{cls.slugify(agent_name)}:main"
def resolve_session_key(cls, agent: Agent) -> str:
"""Resolve the gateway session key for an agent.
Notes:
- For board-scoped agents, default to a UUID-based key to avoid name collisions.
"""
existing = (agent.openclaw_session_id or "").strip()
if agent.board_id is None:
# Gateway-main agents must have an explicit deterministic key (set elsewhere).
if existing:
return existing
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Gateway main agent session key is required",
)
if agent.is_board_lead:
return f"{AGENT_SESSION_PREFIX}:lead-{agent.board_id}:main"
return f"{AGENT_SESSION_PREFIX}:mc-{agent.id}:main"
@classmethod
def workspace_path(cls, agent_name: str, workspace_root: str | None) -> str:
@@ -439,23 +441,6 @@ class AgentLifecycleService:
return None
return await Gateway.objects.by_id(agent.gateway_id).first(self.session)
async def ensure_gateway_session(
self,
agent_name: str,
config: GatewayClientConfig,
) -> tuple[str, str | None]:
session_key = self.build_session_key(agent_name)
try:
await ensure_session(session_key, config=config, label=agent_name)
except OpenClawGatewayError as exc:
self.logger.warning(
"agent.session.ensure_failed agent_name=%s error=%s",
agent_name,
str(exc),
)
return session_key, str(exc)
return session_key, None
@classmethod
def with_computed_status(cls, agent: Agent) -> Agent:
now = utcnow()
@@ -607,30 +592,11 @@ class AgentLifecycleService:
detail="An agent with this name already exists in this gateway workspace.",
)
desired_session_key = self.build_session_key(requested_name)
existing_session_key = (
await self.session.exec(
select(Agent)
.join(Board, col(Agent.board_id) == col(Board.id))
.where(col(Board.gateway_id) == gateway.id)
.where(col(Agent.openclaw_session_id) == desired_session_key),
)
).first()
if existing_session_key:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=(
"This agent name would collide with an existing workspace "
"session key. Pick a different name."
),
)
async def persist_new_agent(
self,
*,
data: dict[str, Any],
client_config: GatewayClientConfig,
) -> tuple[Agent, str, str | None]:
) -> tuple[Agent, str]:
agent = Agent.model_validate(data)
agent.status = "provisioning"
raw_token = generate_agent_token()
@@ -639,58 +605,21 @@ class AgentLifecycleService:
agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy()
agent.provision_requested_at = utcnow()
agent.provision_action = "provision"
session_key, session_error = await self.ensure_gateway_session(
agent.name,
client_config,
)
agent.openclaw_session_id = session_key
agent.openclaw_session_id = self.resolve_session_key(agent)
self.session.add(agent)
await self.session.commit()
await self.session.refresh(agent)
return agent, raw_token, session_error
async def record_session_creation(
self,
*,
agent: Agent,
session_error: str | None,
) -> None:
if session_error:
record_activity(
self.session,
event_type="agent.session.failed",
message=f"Session sync failed for {agent.name}: {session_error}",
agent_id=agent.id,
)
else:
record_activity(
self.session,
event_type="agent.session.created",
message=f"Session created for {agent.name}.",
agent_id=agent.id,
)
await self.session.commit()
async def send_wakeup_message(
self,
agent: Agent,
config: GatewayClientConfig,
verb: str = "provisioned",
) -> None:
session_key = agent.openclaw_session_id or self.build_session_key(agent.name)
await ensure_session(session_key, config=config, label=agent.name)
message = (
f"Hello {agent.name}. Your workspace has been {verb}.\n\n"
"Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run it once "
"then delete it. Begin heartbeats after startup."
)
await send_message(message, session_key=session_key, config=config, deliver=True)
return agent, raw_token
async def provision_new_agent(
self,
*,
agent: Agent,
request: AgentProvisionRequest,
board: Board,
gateway: Gateway,
auth_token: str,
user: User | None,
force_bootstrap: bool,
client_config: GatewayClientConfig,
) -> None:
execution = BoardAgentProvisionExecution(
@@ -699,13 +628,13 @@ class AgentLifecycleService:
provision_request=AgentUpdateProvisionRequest(
target=AgentUpdateProvisionTarget(
is_main_agent=False,
board=request.board,
gateway=request.gateway,
board=board,
gateway=gateway,
client_config=client_config,
),
raw_token=request.auth_token,
user=request.user,
force_bootstrap=request.options.force_bootstrap,
raw_token=auth_token,
user=user,
force_bootstrap=force_bootstrap,
),
action="provision",
wakeup_verb="provisioned",
@@ -852,24 +781,6 @@ class AgentLifecycleService:
client_config=client_config,
)
async def ensure_agent_update_session(
self,
*,
agent: Agent,
client_config: GatewayClientConfig,
) -> None:
session_key = agent.openclaw_session_id or self.build_session_key(agent.name)
try:
await ensure_session(session_key, config=client_config, label=agent.name)
if not agent.openclaw_session_id:
agent.openclaw_session_id = session_key
self.session.add(agent)
await self.session.commit()
await self.session.refresh(agent)
except OpenClawGatewayError as exc:
self.record_instruction_failure(self.session, agent, str(exc), "update")
await self.session.commit()
@staticmethod
def mark_agent_update_pending(agent: Agent) -> str:
raw_token = generate_agent_token()
@@ -937,23 +848,14 @@ class AgentLifecycleService:
"gateway_id": gateway.id,
"heartbeat_config": DEFAULT_HEARTBEAT_CONFIG.copy(),
}
agent, raw_token, session_error = await self.persist_new_agent(
data=data,
client_config=client_config,
)
await self.record_session_creation(
agent=agent,
session_error=session_error,
)
agent, raw_token = await self.persist_new_agent(data=data)
await self.provision_new_agent(
agent=agent,
request=AgentProvisionRequest(
board=board,
gateway=gateway,
auth_token=raw_token,
user=actor.user,
options=ProvisionOptions(action="provision"),
),
force_bootstrap=False,
client_config=client_config,
)
return agent
@@ -987,13 +889,11 @@ class AgentLifecycleService:
gateway, client_config = await self.require_gateway(board)
await self.provision_new_agent(
agent=agent,
request=AgentProvisionRequest(
board=board,
gateway=gateway,
auth_token=raw_token,
user=user,
options=ProvisionOptions(action="provision"),
),
force_bootstrap=False,
client_config=client_config,
)
@@ -1003,24 +903,17 @@ class AgentLifecycleService:
agent: Agent,
actor: ActorContextLike,
) -> None:
if agent.openclaw_session_id:
_ = actor
if agent.board_id is None:
return
board = await self.require_board(
str(agent.board_id) if agent.board_id else None,
user=actor.user if actor.actor_type == "user" else None,
write=actor.actor_type == "user",
)
_, client_config = await self.require_gateway(board)
session_key, session_error = await self.ensure_gateway_session(
agent.name,
client_config,
)
agent.openclaw_session_id = session_key
desired = self.resolve_session_key(agent)
existing = (agent.openclaw_session_id or "").strip()
if existing == desired:
return
agent.openclaw_session_id = desired
self.session.add(agent)
await self.record_session_creation(
agent=agent,
session_error=session_error,
)
await self.session.commit()
await self.session.refresh(agent)
async def commit_heartbeat(
self,
@@ -1162,24 +1055,14 @@ class AgentLifecycleService:
gateway=gateway,
requested_name=requested_name,
)
agent, raw_token, session_error = await self.persist_new_agent(
data=data,
client_config=client_config,
)
await self.record_session_creation(
agent, raw_token = await self.persist_new_agent(data=data)
await self.provision_new_agent(
agent=agent,
session_error=session_error,
)
provision_request = AgentProvisionRequest(
board=board,
gateway=gateway,
auth_token=raw_token,
user=actor.user if actor.actor_type == "user" else None,
options=ProvisionOptions(action="provision"),
)
await self.provision_new_agent(
agent=agent,
request=provision_request,
force_bootstrap=False,
client_config=client_config,
)
self.logger.info("agent.create.success agent_id=%s board_id=%s", agent.id, board.id)
@@ -1229,10 +1112,6 @@ class AgentLifecycleService:
main_gateway=main_gateway,
gateway_for_main=gateway_for_main,
)
await self.ensure_agent_update_session(
agent=agent,
client_config=target.client_config,
)
raw_token = self.mark_agent_update_pending(agent)
self.session.add(agent)
await self.session.commit()
@@ -1345,7 +1224,10 @@ class AgentLifecycleService:
if gateway and gateway.url:
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
try:
workspace_path = await cleanup_main_agent(agent, gateway)
workspace_path = await OpenClawProvisioningService().delete_agent_lifecycle(
agent=agent,
gateway=gateway,
)
except OpenClawGatewayError as exc:
self.record_instruction_failure(self.session, agent, str(exc), "delete")
await self.session.commit()
@@ -1364,7 +1246,10 @@ class AgentLifecycleService:
board = await self.require_board(str(agent.board_id))
gateway, client_config = await self.require_gateway(board)
try:
workspace_path = await cleanup_agent(agent, gateway)
workspace_path = await OpenClawProvisioningService().delete_agent_lifecycle(
agent=agent,
gateway=gateway,
)
except OpenClawGatewayError as exc:
self.record_instruction_failure(self.session, agent, str(exc), "delete")
await self.session.commit()

View File

@@ -39,7 +39,7 @@ from app.services.openclaw.policies import OpenClawAuthorizationPolicy
from app.services.openclaw.provisioning import (
LeadAgentOptions,
LeadAgentRequest,
ensure_board_lead_agent,
OpenClawProvisioningService,
)
from app.services.openclaw.shared import (
GatewayAgentIdentity,
@@ -542,8 +542,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
board: Board,
message: str,
) -> tuple[Agent, bool]:
lead, lead_created = await ensure_board_lead_agent(
self.session,
lead, lead_created = await OpenClawProvisioningService(self.session).ensure_board_lead_agent(
request=LeadAgentRequest(
board=board,
gateway=gateway,

View File

@@ -33,12 +33,8 @@ from app.models.boards import Board
from app.models.gateways import Gateway
from app.schemas.gateways import GatewayTemplatesSyncError, GatewayTemplatesSyncResult
from app.services.openclaw.constants import (
_COORDINATION_GATEWAY_BASE_DELAY_S,
_COORDINATION_GATEWAY_MAX_DELAY_S,
_COORDINATION_GATEWAY_TIMEOUT_S,
_NON_TRANSIENT_GATEWAY_ERROR_MARKERS,
_SECURE_RANDOM,
_SESSION_KEY_PARTS_MIN,
_TOOLS_KV_RE,
_TRANSIENT_GATEWAY_ERROR_MARKERS,
DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY,
@@ -52,7 +48,9 @@ from app.services.openclaw.constants import (
MAIN_TEMPLATE_MAP,
PRESERVE_AGENT_EDITABLE_FILES,
)
from app.services.openclaw.internal import agent_key as _agent_key
from app.services.openclaw.shared import GatewayAgentIdentity
from app.services.organizations import get_org_owner_user
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -69,28 +67,6 @@ class ProvisionOptions:
reset_session: bool = False
@dataclass(frozen=True, slots=True)
class AgentProvisionRequest:
"""Inputs required to provision a board-scoped agent."""
board: Board
gateway: Gateway
auth_token: str
user: User | None
options: ProvisionOptions = field(default_factory=ProvisionOptions)
@dataclass(frozen=True, slots=True)
class MainAgentProvisionRequest:
"""Inputs required to provision a gateway main agent."""
gateway: Gateway
auth_token: str
user: User | None
session_key: str | None = None
options: ProvisionOptions = field(default_factory=ProvisionOptions)
def _repo_root() -> Path:
return Path(__file__).resolve().parents[3]
@@ -104,62 +80,6 @@ def _slugify(value: str) -> str:
return slug or uuid4().hex
def _clean_str(value: object) -> str | None:
if isinstance(value, str) and value.strip():
return value.strip()
return None
def _extract_agent_id_from_item(item: object) -> str | None:
if isinstance(item, str):
return _clean_str(item)
if not isinstance(item, dict):
return None
for key in ("id", "agentId", "agent_id"):
agent_id = _clean_str(item.get(key))
if agent_id:
return agent_id
return None
def _extract_agent_id_from_list(items: object) -> str | None:
if not isinstance(items, list):
return None
for item in items:
agent_id = _extract_agent_id_from_item(item)
if agent_id:
return agent_id
return None
def _extract_agent_id(payload: object) -> str | None:
default_keys = ("defaultId", "default_id", "defaultAgentId", "default_agent_id")
collection_keys = ("agents", "items", "list", "data")
if isinstance(payload, list):
return _extract_agent_id_from_list(payload)
if not isinstance(payload, dict):
return None
for key in default_keys:
agent_id = _clean_str(payload.get(key))
if agent_id:
return agent_id
for key in collection_keys:
agent_id = _extract_agent_id_from_list(payload.get(key))
if agent_id:
return agent_id
return None
def _agent_key(agent: Agent) -> str:
session_key = agent.openclaw_session_id or ""
if session_key.startswith("agent:"):
parts = session_key.split(":")
if len(parts) >= _SESSION_KEY_PARTS_MIN and parts[1]:
return parts[1]
return _slugify(agent.name)
def _heartbeat_config(agent: Agent) -> dict[str, Any]:
merged = DEFAULT_HEARTBEAT_CONFIG.copy()
if isinstance(agent.heartbeat_config, dict):
@@ -339,9 +259,14 @@ def _build_main_context(
def _session_key(agent: Agent) -> str:
if agent.openclaw_session_id:
return agent.openclaw_session_id
return f"agent:{_agent_key(agent)}:main"
"""Return the deterministic session key for a board-scoped agent.
Note: Never derive session keys from a human-provided name; use stable ids instead.
"""
if agent.is_board_lead and agent.board_id is not None:
return f"agent:lead-{agent.board_id}:main"
return f"agent:mc-{agent.id}:main"
def _render_agent_files(
@@ -425,10 +350,6 @@ class GatewayControlPlane(ABC):
async def delete_agent(self, agent_id: str, *, delete_files: bool = True) -> None:
raise NotImplementedError
@abstractmethod
async def list_supported_files(self) -> set[str]:
raise NotImplementedError
@abstractmethod
async def list_agent_files(self, agent_id: str) -> dict[str, dict[str, Any]]:
raise NotImplementedError
@@ -466,35 +387,11 @@ class OpenClawGatewayControlPlane(GatewayControlPlane):
return
await openclaw_call("sessions.delete", {"key": session_key}, config=self._config)
async def _agent_ids(self) -> set[str]:
payload = await openclaw_call("agents.list", config=self._config)
raw_agents: object = payload
if isinstance(payload, dict):
raw_agents = payload.get("agents") or []
if not isinstance(raw_agents, list):
return set()
ids: set[str] = set()
for item in raw_agents:
agent_id = _extract_agent_id_from_item(item)
if agent_id:
ids.add(agent_id)
return ids
async def upsert_agent(self, registration: GatewayAgentRegistration) -> None:
agent_ids = await self._agent_ids()
if registration.agent_id in agent_ids:
await openclaw_call(
"agents.update",
{
"agentId": registration.agent_id,
"name": registration.name,
"workspace": registration.workspace_path,
},
config=self._config,
)
else:
# `agents.create` derives `agentId` from `name`, so create with the target id
# and then set the human-facing name in a follow-up update.
# Prefer an idempotent "create then update" flow.
# - Avoids a dependency on `agents.list` (which may surface gateway defaults like `main`).
# - Ensures we always hit the "create" RPC first, per lifecycle expectations.
try:
await openclaw_call(
"agents.create",
{
@@ -503,7 +400,10 @@ class OpenClawGatewayControlPlane(GatewayControlPlane):
},
config=self._config,
)
if registration.name != registration.agent_id:
except OpenClawGatewayError as exc:
message = str(exc).lower()
if not any(marker in message for marker in ("already", "exist", "duplicate", "conflict")):
raise
await openclaw_call(
"agents.update",
{
@@ -524,37 +424,6 @@ class OpenClawGatewayControlPlane(GatewayControlPlane):
config=self._config,
)
async def list_supported_files(self) -> set[str]:
agents_payload = await openclaw_call("agents.list", config=self._config)
agent_id = _extract_agent_id(agents_payload)
if not agent_id:
return set(DEFAULT_GATEWAY_FILES)
files_payload = await openclaw_call(
"agents.files.list",
{"agentId": agent_id},
config=self._config,
)
if not isinstance(files_payload, dict):
return set(DEFAULT_GATEWAY_FILES)
files = files_payload.get("files") or []
if not isinstance(files, list):
return set(DEFAULT_GATEWAY_FILES)
supported: set[str] = set()
for item in files:
if not isinstance(item, dict):
continue
name = item.get("name")
if not isinstance(name, str) or not name:
name = item.get("path")
if isinstance(name, str) and name:
supported.add(name)
# Always include Mission Control's expected template files even if the gateway's default
# agent reports a different file set (e.g. `prompts/system.md`). This prevents provisioning
# from silently skipping our templates due to gateway-side defaults or version skew.
supported.update(DEFAULT_GATEWAY_FILES)
return supported
async def list_agent_files(self, agent_id: str) -> dict[str, dict[str, Any]]:
payload = await openclaw_call(
"agents.files.list",
@@ -689,6 +558,10 @@ class BaseAgentLifecycleManager(ABC):
def _template_overrides(self) -> dict[str, str] | None:
return None
def _preserve_files(self) -> set[str]:
"""Files that are expected to evolve inside the agent workspace."""
return set(PRESERVE_AGENT_EDITABLE_FILES)
async def _set_agent_files(
self,
*,
@@ -703,9 +576,14 @@ class BaseAgentLifecycleManager(ABC):
# Preserve "editable" files only during updates. During first-time provisioning,
# the gateway may pre-create defaults for USER/SELF/etc, and we still want to
# apply Mission Control's templates.
if action == "update" and name in PRESERVE_AGENT_EDITABLE_FILES:
if action == "update" and name in self._preserve_files():
entry = existing_files.get(name)
if entry and not bool(entry.get("missing")):
size = entry.get("size")
if isinstance(size, int) and size == 0:
# Treat 0-byte placeholders as missing so update can fill them.
pass
else:
continue
try:
await self._control_plane.set_agent_file(
@@ -732,12 +610,8 @@ class BaseAgentLifecycleManager(ABC):
if not self._gateway.workspace_root:
msg = "gateway_workspace_root is required"
raise ValueError(msg)
if not agent.openclaw_session_id:
# Ensure templates render with the active deterministic session key.
agent.openclaw_session_id = session_key
await self._control_plane.ensure_agent_session(
session_key,
label=session_label or agent.name,
)
agent_id = self._agent_id(agent)
workspace_path = _workspace_path(agent, self._gateway.workspace_root)
@@ -757,8 +631,9 @@ class BaseAgentLifecycleManager(ABC):
user=user,
board=board,
)
supported = await self._control_plane.list_supported_files()
supported.update({"USER.md", "SELF.md", "AUTONOMY.md"})
# Always attempt to sync Mission Control's full template set.
# Do not introspect gateway defaults (avoids touching gateway "main" agent state).
file_names = set(DEFAULT_GATEWAY_FILES)
existing_files = await self._control_plane.list_agent_files(agent_id)
include_bootstrap = _should_include_bootstrap(
action=options.action,
@@ -768,7 +643,7 @@ class BaseAgentLifecycleManager(ABC):
rendered = _render_agent_files(
context,
agent,
supported,
file_names,
include_bootstrap=include_bootstrap,
template_overrides=self._template_overrides(),
)
@@ -780,6 +655,8 @@ class BaseAgentLifecycleManager(ABC):
action=options.action,
)
if options.reset_session:
# Session resets are useful but should never block file sync.
with suppress(OpenClawGatewayError):
await self._control_plane.reset_agent_session(session_key)
@@ -823,6 +700,13 @@ class GatewayMainAgentLifecycleManager(BaseAgentLifecycleManager):
def _template_overrides(self) -> dict[str, str] | None:
return MAIN_TEMPLATE_MAP
def _preserve_files(self) -> set[str]:
# For gateway-main agents, USER.md is system-managed (derived from org/user context),
# so keep it in sync even during updates.
preserved = super()._preserve_files()
preserved.discard("USER.md")
return preserved
def _control_plane_for_gateway(gateway: Gateway) -> OpenClawGatewayControlPlane:
if not gateway.url:
@@ -833,7 +717,7 @@ def _control_plane_for_gateway(gateway: Gateway) -> OpenClawGatewayControlPlane:
)
async def patch_gateway_agent_heartbeats(
async def _patch_gateway_agent_heartbeats(
gateway: Gateway,
*,
entries: list[tuple[str, str, dict[str, Any]]],
@@ -846,22 +730,6 @@ async def patch_gateway_agent_heartbeats(
await control_plane.patch_agent_heartbeats(entries)
async def sync_gateway_agent_heartbeats(gateway: Gateway, agents: list[Agent]) -> None:
"""Sync current Agent.heartbeat_config values to the gateway config."""
if not gateway.workspace_root:
msg = "gateway workspace_root is required"
raise OpenClawGatewayError(msg)
entries: list[tuple[str, str, dict[str, Any]]] = []
for agent in agents:
agent_id = _agent_key(agent)
workspace_path = _workspace_path(agent, gateway.workspace_root)
heartbeat = _heartbeat_config(agent)
entries.append((agent_id, workspace_path, heartbeat))
if not entries:
return
await patch_gateway_agent_heartbeats(gateway, entries=entries)
def _should_include_bootstrap(
*,
action: str,
@@ -876,76 +744,130 @@ def _should_include_bootstrap(
return not bool(entry and entry.get("missing"))
async def provision_agent(
agent: Agent,
request: AgentProvisionRequest,
) -> None:
"""Provision or update a regular board agent workspace."""
gateway = request.gateway
if not gateway.url:
return
session_key = _session_key(agent)
control_plane = _control_plane_for_gateway(gateway)
manager = BoardAgentLifecycleManager(gateway, control_plane)
await manager.provision(
agent=agent,
board=request.board,
session_key=session_key,
auth_token=request.auth_token,
user=request.user,
options=request.options,
def _wakeup_text(agent: Agent, *, verb: str) -> str:
return (
f"Hello {agent.name}. Your workspace has been {verb}.\n\n"
"Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run it once "
"then delete it. Begin heartbeats after startup."
)
async def provision_main_agent(
class OpenClawProvisioningService:
"""High-level agent provisioning interface (create -> files -> wake).
This is the public entrypoint for agent lifecycle orchestration. Internals are
implemented as module-private helpers and lifecycle manager classes.
"""
def __init__(self, session: AsyncSession | None = None) -> None:
self._session = session
def _require_session(self) -> AsyncSession:
if self._session is None:
msg = "AsyncSession is required for this operation"
raise ValueError(msg)
return self._session
async def sync_gateway_agent_heartbeats(self, gateway: Gateway, agents: list[Agent]) -> None:
"""Sync current Agent.heartbeat_config values to the gateway config."""
if not gateway.workspace_root:
msg = "gateway workspace_root is required"
raise OpenClawGatewayError(msg)
entries: list[tuple[str, str, dict[str, Any]]] = []
for agent in agents:
agent_id = _agent_key(agent)
workspace_path = _workspace_path(agent, gateway.workspace_root)
heartbeat = _heartbeat_config(agent)
entries.append((agent_id, workspace_path, heartbeat))
if not entries:
return
await _patch_gateway_agent_heartbeats(gateway, entries=entries)
async def apply_agent_lifecycle(
self,
*,
agent: Agent,
request: MainAgentProvisionRequest,
gateway: Gateway,
board: Board | None,
auth_token: str,
user: User | None,
action: str = "provision",
force_bootstrap: bool = False,
reset_session: bool = False,
wake: bool = True,
deliver_wakeup: bool = True,
wakeup_verb: str | None = None,
) -> None:
"""Provision or update the gateway main agent workspace."""
gateway = request.gateway
"""Create/update an agent, sync all template files, and optionally wake the agent.
Lifecycle steps (same for all agent types):
1) create agent (idempotent)
2) set/update all template files (best-effort for unsupported files)
3) wake the agent session (chat.send)
"""
if not gateway.url:
return
session_key = (request.session_key or GatewayAgentIdentity.session_key(gateway) or "").strip()
# Guard against accidental main-agent provisioning without a board.
if board is None and getattr(agent, "board_id", None) is not None:
msg = "board is required for board-scoped agent lifecycle"
raise ValueError(msg)
# Resolve session key and agent type.
if board is None:
session_key = (agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "").strip()
if not session_key:
msg = "gateway main agent session_key is required"
raise ValueError(msg)
manager_type: type[BaseAgentLifecycleManager] = GatewayMainAgentLifecycleManager
else:
session_key = _session_key(agent)
manager_type = BoardAgentLifecycleManager
control_plane = _control_plane_for_gateway(gateway)
manager = GatewayMainAgentLifecycleManager(gateway, control_plane)
manager = manager_type(gateway, control_plane)
await manager.provision(
agent=agent,
board=board,
session_key=session_key,
auth_token=request.auth_token,
user=request.user,
options=request.options,
auth_token=auth_token,
user=user,
options=ProvisionOptions(
action=action,
force_bootstrap=force_bootstrap,
reset_session=False, # handled below
),
session_label=agent.name or "Gateway Agent",
)
async def cleanup_agent(
agent: Agent,
gateway: Gateway,
) -> str | None:
"""Remove an agent from gateway config and delete its session."""
if not gateway.url:
return None
if not gateway.workspace_root:
msg = "gateway_workspace_root is required"
raise ValueError(msg)
control_plane = _control_plane_for_gateway(gateway)
agent_id = _agent_key(agent)
await control_plane.delete_agent(agent_id, delete_files=True)
session_key = _session_key(agent)
if reset_session:
with suppress(OpenClawGatewayError):
await control_plane.delete_agent_session(session_key)
return None
await control_plane.reset_agent_session(session_key)
if not wake:
return
async def cleanup_main_agent(
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
await ensure_session(session_key, config=client_config, label=agent.name)
verb = wakeup_verb or ("provisioned" if action == "provision" else "updated")
await send_message(
_wakeup_text(agent, verb=verb),
session_key=session_key,
config=client_config,
deliver=deliver_wakeup,
)
async def delete_agent_lifecycle(
self,
*,
agent: Agent,
gateway: Gateway,
delete_files: bool = True,
delete_session: bool = True,
) -> str | None:
"""Remove the gateway-main agent from gateway config and delete its session."""
"""Remove agent runtime state from the gateway (agent + optional session)."""
if not gateway.url:
return None
if not gateway.workspace_root:
@@ -954,15 +876,203 @@ async def cleanup_main_agent(
workspace_path = _workspace_path(agent, gateway.workspace_root)
control_plane = _control_plane_for_gateway(gateway)
agent_id = GatewayAgentIdentity.openclaw_agent_id(gateway)
await control_plane.delete_agent(agent_id, delete_files=True)
if agent.board_id is None:
agent_gateway_id = GatewayAgentIdentity.openclaw_agent_id(gateway)
else:
agent_gateway_id = _agent_key(agent)
await control_plane.delete_agent(agent_gateway_id, delete_files=delete_files)
if delete_session:
if agent.board_id is None:
session_key = (agent.openclaw_session_id or GatewayAgentIdentity.session_key(gateway) or "").strip()
else:
session_key = _session_key(agent)
if session_key:
with suppress(OpenClawGatewayError):
await control_plane.delete_agent_session(session_key)
return workspace_path
async def sync_gateway_templates(
self,
gateway: Gateway,
options: GatewayTemplateSyncOptions,
) -> GatewayTemplatesSyncResult:
"""Synchronize AGENTS/TOOLS/etc templates to gateway-connected agents."""
session = self._require_session()
template_user = options.user
if template_user is None:
template_user = await get_org_owner_user(
session,
organization_id=gateway.organization_id,
)
options = GatewayTemplateSyncOptions(
user=template_user,
include_main=options.include_main,
reset_sessions=options.reset_sessions,
rotate_tokens=options.rotate_tokens,
force_bootstrap=options.force_bootstrap,
board_id=options.board_id,
)
result = _base_result(
gateway,
include_main=options.include_main,
reset_sessions=options.reset_sessions,
)
if not gateway.url:
_append_sync_error(
result,
message="Gateway URL is not configured for this gateway.",
)
return result
ctx = _SyncContext(
session=session,
gateway=gateway,
config=GatewayClientConfig(url=gateway.url, token=gateway.token),
backoff=_GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"),
options=options,
provisioner=self,
)
if not await _ping_gateway(ctx, result):
return result
boards = await Board.objects.filter_by(gateway_id=gateway.id).all(session)
boards_by_id = _boards_by_id(boards, board_id=options.board_id)
if boards_by_id is None:
_append_sync_error(
result,
message="Board does not belong to this gateway.",
)
return result
paused_board_ids = await _paused_board_ids(session, list(boards_by_id.keys()))
if boards_by_id:
agents = await (
Agent.objects.by_field_in("board_id", list(boards_by_id.keys()))
.order_by(col(Agent.created_at).asc())
.all(session)
)
else:
agents = []
stop_sync = False
for agent in agents:
board = boards_by_id.get(agent.board_id) if agent.board_id is not None else None
if board is None:
result.agents_skipped += 1
_append_sync_error(
result,
agent=agent,
message="Skipping agent: board not found for agent.",
)
continue
if board.id in paused_board_ids:
result.agents_skipped += 1
continue
stop_sync = await _sync_one_agent(ctx, result, agent, board)
if stop_sync:
break
if not stop_sync and options.include_main:
await _sync_main_agent(ctx, result)
return result
@staticmethod
def lead_session_key(board: Board) -> str:
"""Return the deterministic session key for a board lead agent."""
return f"agent:lead-{board.id}:main"
@staticmethod
def lead_agent_name(_: Board) -> str:
"""Return the default display name for board lead agents."""
return "Lead Agent"
async def ensure_board_lead_agent(
self,
*,
request: LeadAgentRequest,
) -> tuple[Agent, bool]:
"""Ensure a board has a lead agent; return `(agent, created)`."""
session = self._require_session()
board = request.board
config_options = request.options
existing = (
await session.exec(
select(Agent)
.where(Agent.board_id == board.id)
.where(col(Agent.is_board_lead).is_(True)),
)
).first()
if existing:
desired_name = config_options.agent_name or self.lead_agent_name(board)
changed = False
if existing.name != desired_name:
existing.name = desired_name
changed = True
if existing.gateway_id != request.gateway.id:
existing.gateway_id = request.gateway.id
changed = True
desired_session_key = self.lead_session_key(board)
if existing.openclaw_session_id != desired_session_key:
existing.openclaw_session_id = desired_session_key
changed = True
if changed:
existing.updated_at = utcnow()
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing, False
merged_identity_profile: dict[str, Any] = {
"role": "Board Lead",
"communication_style": "direct, concise, practical",
"emoji": ":gear:",
}
if config_options.identity_profile:
merged_identity_profile.update(
{
key: value.strip()
for key, value in config_options.identity_profile.items()
if value.strip()
},
)
agent = Agent(
name=config_options.agent_name or self.lead_agent_name(board),
status="provisioning",
board_id=board.id,
gateway_id=request.gateway.id,
is_board_lead=True,
heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(),
identity_profile=merged_identity_profile,
openclaw_session_id=self.lead_session_key(board),
provision_requested_at=utcnow(),
provision_action=config_options.action,
)
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
session.add(agent)
await session.commit()
await session.refresh(agent)
try:
await self.apply_agent_lifecycle(
agent=agent,
gateway=request.gateway,
board=board,
auth_token=raw_token,
user=request.user,
action=config_options.action,
wake=True,
deliver_wakeup=True,
)
except OpenClawGatewayError:
# Best-effort provisioning. The board/agent rows should still exist.
pass
return agent, True
_T = TypeVar("_T")
@@ -988,6 +1098,7 @@ class _SyncContext:
config: GatewayClientConfig
backoff: _GatewayBackoff
options: GatewayTemplateSyncOptions
provisioner: OpenClawProvisioningService
def _is_transient_gateway_error(exc: Exception) -> bool:
@@ -1091,19 +1202,6 @@ async def _with_gateway_retry(
return await backoff.run(fn)
async def _with_coordination_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T:
return await _with_gateway_retry(
fn,
backoff=_GatewayBackoff(
timeout_s=_COORDINATION_GATEWAY_TIMEOUT_S,
base_delay_s=_COORDINATION_GATEWAY_BASE_DELAY_S,
max_delay_s=_COORDINATION_GATEWAY_MAX_DELAY_S,
jitter=0.15,
timeout_context="gateway coordination",
),
)
def _parse_tools_md(content: str) -> dict[str, str]:
values: dict[str, str] = {}
for raw in content.splitlines():
@@ -1226,7 +1324,8 @@ async def _ping_gateway(ctx: _SyncContext, result: GatewayTemplatesSyncResult) -
try:
async def _do_ping() -> object:
return await openclaw_call("agents.list", config=ctx.config)
# Use a lightweight health probe; avoid enumerating gateway agents.
return await openclaw_call("health", config=ctx.config)
await ctx.backoff.run(_do_ping)
except (TimeoutError, OpenClawGatewayError) as exc:
@@ -1338,19 +1437,16 @@ async def _sync_one_agent(
try:
async def _do_provision() -> bool:
await provision_agent(
agent,
AgentProvisionRequest(
board=board,
await ctx.provisioner.apply_agent_lifecycle(
agent=agent,
gateway=ctx.gateway,
board=board,
auth_token=auth_token,
user=ctx.options.user,
options=ProvisionOptions(
action="update",
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
),
),
wake=False,
)
return True
@@ -1377,7 +1473,6 @@ async def _sync_main_agent(
ctx: _SyncContext,
result: GatewayTemplatesSyncResult,
) -> bool:
main_session_key = GatewayAgentIdentity.session_key(ctx.gateway)
main_agent = (
await Agent.objects.all()
.filter(col(Agent.gateway_id) == ctx.gateway.id)
@@ -1412,19 +1507,16 @@ async def _sync_main_agent(
try:
async def _do_provision_main() -> bool:
await provision_main_agent(
main_agent,
MainAgentProvisionRequest(
await ctx.provisioner.apply_agent_lifecycle(
agent=main_agent,
gateway=ctx.gateway,
board=None,
auth_token=token,
user=ctx.options.user,
session_key=main_session_key,
options=ProvisionOptions(
action="update",
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
),
),
wake=False,
)
return True
@@ -1443,86 +1535,6 @@ async def _sync_main_agent(
return stop_sync
async def sync_gateway_templates(
session: AsyncSession,
gateway: Gateway,
options: GatewayTemplateSyncOptions,
) -> GatewayTemplatesSyncResult:
"""Synchronize AGENTS/TOOLS/etc templates to gateway-connected agents."""
result = _base_result(
gateway,
include_main=options.include_main,
reset_sessions=options.reset_sessions,
)
if not gateway.url:
_append_sync_error(
result,
message="Gateway URL is not configured for this gateway.",
)
return result
ctx = _SyncContext(
session=session,
gateway=gateway,
config=GatewayClientConfig(url=gateway.url, token=gateway.token),
backoff=_GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"),
options=options,
)
if not await _ping_gateway(ctx, result):
return result
boards = await Board.objects.filter_by(gateway_id=gateway.id).all(session)
boards_by_id = _boards_by_id(boards, board_id=options.board_id)
if boards_by_id is None:
_append_sync_error(
result,
message="Board does not belong to this gateway.",
)
return result
paused_board_ids = await _paused_board_ids(session, list(boards_by_id.keys()))
if boards_by_id:
agents = await (
Agent.objects.by_field_in("board_id", list(boards_by_id.keys()))
.order_by(col(Agent.created_at).asc())
.all(session)
)
else:
agents = []
stop_sync = False
for agent in agents:
board = boards_by_id.get(agent.board_id) if agent.board_id is not None else None
if board is None:
result.agents_skipped += 1
_append_sync_error(
result,
agent=agent,
message="Skipping agent: board not found for agent.",
)
continue
if board.id in paused_board_ids:
result.agents_skipped += 1
continue
stop_sync = await _sync_one_agent(ctx, result, agent, board)
if stop_sync:
break
if not stop_sync and options.include_main:
await _sync_main_agent(ctx, result)
return result
# Board lead lifecycle primitives consolidated from app.services.board_leads.
def lead_session_key(board: Board) -> str:
"""Return the deterministic main session key for a board lead agent."""
return f"agent:lead-{board.id}:main"
def lead_agent_name(_: Board) -> str:
"""Return the default display name for board lead agents."""
return "Lead Agent"
@dataclass(frozen=True, slots=True)
class LeadAgentOptions:
"""Optional overrides for board-lead provisioning behavior."""
@@ -1541,104 +1553,3 @@ class LeadAgentRequest:
config: GatewayClientConfig
user: User | None
options: LeadAgentOptions = field(default_factory=LeadAgentOptions)
async def ensure_board_lead_agent(
session: AsyncSession,
*,
request: LeadAgentRequest,
) -> tuple[Agent, bool]:
"""Ensure a board has a lead agent; return `(agent, created)`."""
board = request.board
config_options = request.options
existing = (
await session.exec(
select(Agent)
.where(Agent.board_id == board.id)
.where(col(Agent.is_board_lead).is_(True)),
)
).first()
if existing:
desired_name = config_options.agent_name or lead_agent_name(board)
changed = False
if existing.name != desired_name:
existing.name = desired_name
changed = True
if existing.gateway_id != request.gateway.id:
existing.gateway_id = request.gateway.id
changed = True
desired_session_key = lead_session_key(board)
if not existing.openclaw_session_id:
existing.openclaw_session_id = desired_session_key
changed = True
if changed:
existing.updated_at = utcnow()
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing, False
merged_identity_profile: dict[str, Any] = {
"role": "Board Lead",
"communication_style": "direct, concise, practical",
"emoji": ":gear:",
}
if config_options.identity_profile:
merged_identity_profile.update(
{
key: value.strip()
for key, value in config_options.identity_profile.items()
if value.strip()
},
)
agent = Agent(
name=config_options.agent_name or lead_agent_name(board),
status="provisioning",
board_id=board.id,
gateway_id=request.gateway.id,
is_board_lead=True,
heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(),
identity_profile=merged_identity_profile,
openclaw_session_id=lead_session_key(board),
provision_requested_at=utcnow(),
provision_action=config_options.action,
)
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
session.add(agent)
await session.commit()
await session.refresh(agent)
try:
await provision_agent(
agent,
AgentProvisionRequest(
board=board,
gateway=request.gateway,
auth_token=raw_token,
user=request.user,
options=ProvisionOptions(action=config_options.action),
),
)
if agent.openclaw_session_id:
await ensure_session(
agent.openclaw_session_id,
config=request.config,
label=agent.name,
)
await send_message(
(
f"Hello {agent.name}. Your workspace has been provisioned.\n\n"
"Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run "
"it once then delete it. Begin heartbeats after startup."
),
session_key=agent.openclaw_session_id,
config=request.config,
deliver=True,
)
except OpenClawGatewayError:
# Best-effort provisioning. The board/agent rows should still exist.
pass
return agent, True

View File

@@ -61,6 +61,23 @@ async def get_member(
).first(session)
async def get_org_owner_user(
session: AsyncSession,
*,
organization_id: UUID,
) -> User | None:
"""Return the org owner User, if one exists."""
owner = (
await OrganizationMember.objects.filter_by(organization_id=organization_id)
.filter(col(OrganizationMember.role) == "owner")
.order_by(col(OrganizationMember.created_at).asc())
.first(session)
)
if owner is None:
return None
return await User.objects.by_id(owner.user_id).first(session)
async def get_first_membership(
session: AsyncSession,
user_id: UUID,

View File

@@ -54,7 +54,7 @@ async def _run() -> int:
from app.models.gateways import Gateway
from app.services.openclaw.provisioning import (
GatewayTemplateSyncOptions,
sync_gateway_templates,
OpenClawProvisioningService,
)
args = _parse_args()
@@ -67,10 +67,9 @@ async def _run() -> int:
message = f"Gateway not found: {gateway_id}"
raise SystemExit(message)
result = await sync_gateway_templates(
session,
result = await OpenClawProvisioningService(session).sync_gateway_templates(
gateway,
options=GatewayTemplateSyncOptions(
GatewayTemplateSyncOptions(
user=None,
include_main=bool(args.include_main),
reset_sessions=bool(args.reset_sessions),

View File

@@ -88,10 +88,18 @@ async def test_delete_gateway_main_agent_does_not_require_board_id(monkeypatch:
async def _should_not_be_called(*_args, **_kwargs):
raise AssertionError("require_board/require_gateway should not be called for main agents")
called: dict[str, int] = {"cleanup_main": 0}
called: dict[str, int] = {"delete_lifecycle": 0}
async def _fake_cleanup_main_agent(_agent: object, _gateway: object) -> str | None:
called["cleanup_main"] += 1
async def _fake_delete_agent_lifecycle(
_self,
*,
agent: object,
gateway: object,
delete_files: bool = True,
delete_session: bool = True,
) -> str | None:
_ = (_self, agent, gateway, delete_files, delete_session)
called["delete_lifecycle"] += 1
return "/tmp/openclaw/workspace-gateway-x"
async def _fake_update_where(*_args, **_kwargs) -> None:
@@ -100,13 +108,16 @@ async def test_delete_gateway_main_agent_does_not_require_board_id(monkeypatch:
monkeypatch.setattr(service, "require_agent_access", _no_access_check)
monkeypatch.setattr(service, "require_board", _should_not_be_called)
monkeypatch.setattr(service, "require_gateway", _should_not_be_called)
monkeypatch.setattr(agent_service, "cleanup_main_agent", _fake_cleanup_main_agent)
monkeypatch.setattr(
agent_service.OpenClawProvisioningService,
"delete_agent_lifecycle",
_fake_delete_agent_lifecycle,
)
monkeypatch.setattr(agent_service.crud, "update_where", _fake_update_where)
monkeypatch.setattr(agent_service, "record_activity", lambda *_a, **_k: None)
result = await service.delete_agent(agent_id=str(agent.id), ctx=ctx) # type: ignore[arg-type]
assert result.ok is True
assert called["cleanup_main"] == 1
assert called["delete_lifecycle"] == 1
assert session.deleted and session.deleted[0] == agent

View File

@@ -25,27 +25,6 @@ def test_slugify_falls_back_to_uuid_hex(monkeypatch):
assert agent_provisioning._slugify("!!!") == "deadbeef"
def test_extract_agent_id_supports_lists_and_dicts():
assert agent_provisioning._extract_agent_id(["", " ", "abc"]) == "abc"
assert agent_provisioning._extract_agent_id([{"agent_id": "xyz"}]) == "xyz"
payload = {
"defaultAgentId": "dflt",
"agents": [{"id": "ignored"}],
}
assert agent_provisioning._extract_agent_id(payload) == "dflt"
payload2 = {
"agents": [{"id": ""}, {"agentId": "foo"}],
}
assert agent_provisioning._extract_agent_id(payload2) == "foo"
def test_extract_agent_id_returns_none_for_unknown_shapes():
assert agent_provisioning._extract_agent_id("nope") is None
assert agent_provisioning._extract_agent_id({"agents": "not-a-list"}) is None
@dataclass
class _AgentStub:
name: str
@@ -58,13 +37,13 @@ class _AgentStub:
soul_template: str | None = None
def test_agent_key_uses_session_key_when_present(monkeypatch):
def test_agent_key_uses_session_key_when_present():
agent = _AgentStub(name="Alice", openclaw_session_id="agent:alice:main")
assert agent_provisioning._agent_key(agent) == "alice"
monkeypatch.setattr(agent_provisioning, "_slugify", lambda value: "slugged")
agent2 = _AgentStub(name="Alice", openclaw_session_id=None)
assert agent_provisioning._agent_key(agent2) == "slugged"
agent2 = _AgentStub(name="Hello, World", openclaw_session_id=None)
assert agent_provisioning._agent_key(agent2) == "hello-world"
def test_workspace_path_preserves_tilde_in_workspace_root():
# Mission Control accepts a user-entered workspace root (from the UI) and must
@@ -118,9 +97,6 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch
captured["patched_agent_id"] = registration.agent_id
captured["workspace_path"] = registration.workspace_path
async def _fake_list_supported_files(self):
return set()
async def _fake_list_agent_files(self, agent_id):
captured["files_index_agent_id"] = agent_id
return {}
@@ -141,11 +117,6 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch
"upsert_agent",
_fake_upsert_agent,
)
monkeypatch.setattr(
agent_provisioning.OpenClawGatewayControlPlane,
"list_supported_files",
_fake_list_supported_files,
)
monkeypatch.setattr(
agent_provisioning.OpenClawGatewayControlPlane,
"list_agent_files",
@@ -158,14 +129,14 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch
_fake_set_agent_files,
)
await agent_provisioning.provision_main_agent(
agent,
agent_provisioning.MainAgentProvisionRequest(
gateway=gateway,
await agent_provisioning.OpenClawProvisioningService().apply_agent_lifecycle(
agent=agent, # type: ignore[arg-type]
gateway=gateway, # type: ignore[arg-type]
board=None,
auth_token="secret-token",
user=None,
session_key=session_key,
),
action="provision",
wake=False,
)
expected_agent_id = GatewayAgentIdentity.openclaw_agent_id_for_id(gateway_id)
@@ -173,31 +144,6 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch
assert captured["files_index_agent_id"] == expected_agent_id
@pytest.mark.asyncio
async def test_list_supported_files_always_includes_default_gateway_files(monkeypatch):
"""Provisioning should not depend solely on whatever the gateway's default agent reports."""
async def _fake_openclaw_call(method, params=None, config=None):
_ = config
if method == "agents.list":
return {"defaultId": "main"}
if method == "agents.files.list":
assert params == {"agentId": "main"}
return {"files": [{"path": "prompts/system.md", "missing": True}]}
raise AssertionError(f"Unexpected method: {method}")
monkeypatch.setattr(agent_provisioning, "openclaw_call", _fake_openclaw_call)
cp = agent_provisioning.OpenClawGatewayControlPlane(
agent_provisioning.GatewayClientConfig(url="ws://gateway.example/ws", token=None),
)
supported = await cp.list_supported_files()
# Newer gateways may surface other file paths, but we still must include our templates.
assert "prompts/system.md" in supported
for required in agent_provisioning.DEFAULT_GATEWAY_FILES:
assert required in supported
@pytest.mark.asyncio
async def test_provision_overwrites_user_md_on_first_provision(monkeypatch):
"""Gateway may pre-create USER.md; we still want MC's template on first provision."""
@@ -221,10 +167,6 @@ async def test_provision_overwrites_user_md_on_first_provision(monkeypatch):
async def delete_agent(self, agent_id, *, delete_files=True):
return None
async def list_supported_files(self):
# Minimal set.
return {"USER.md"}
async def list_agent_files(self, agent_id):
# Pretend gateway created USER.md already.
return {"USER.md": {"name": "USER.md", "missing": False}}
@@ -268,3 +210,137 @@ async def test_provision_overwrites_user_md_on_first_provision(monkeypatch):
action="provision",
)
assert ("USER.md", "from-mc") in cp.writes
@pytest.mark.asyncio
async def test_set_agent_files_update_writes_zero_size_user_md():
"""Treat empty placeholder files as missing during update."""
class _ControlPlaneStub:
def __init__(self):
self.writes: list[tuple[str, str]] = []
async def ensure_agent_session(self, session_key, *, label=None):
return None
async def reset_agent_session(self, session_key):
return None
async def delete_agent_session(self, session_key):
return None
async def upsert_agent(self, registration):
return None
async def delete_agent(self, agent_id, *, delete_files=True):
return None
async def list_agent_files(self, agent_id):
return {}
async def set_agent_file(self, *, agent_id, name, content):
self.writes.append((name, content))
async def patch_agent_heartbeats(self, entries):
return None
@dataclass
class _GatewayTiny:
id: UUID
name: str
url: str
token: str | None
workspace_root: str
class _Manager(agent_provisioning.BaseAgentLifecycleManager):
def _agent_id(self, agent):
return "agent-x"
def _build_context(self, *, agent, auth_token, user, board):
return {}
gateway = _GatewayTiny(
id=uuid4(),
name="G",
url="ws://x",
token=None,
workspace_root="/tmp",
)
cp = _ControlPlaneStub()
mgr = _Manager(gateway, cp) # type: ignore[arg-type]
await mgr._set_agent_files(
agent_id="agent-x",
rendered={"USER.md": "filled"},
existing_files={"USER.md": {"name": "USER.md", "missing": False, "size": 0}},
action="update",
)
assert ("USER.md", "filled") in cp.writes
@pytest.mark.asyncio
async def test_control_plane_upsert_agent_create_then_update(monkeypatch):
calls: list[tuple[str, dict[str, object] | None]] = []
async def _fake_openclaw_call(method, params=None, config=None):
_ = config
calls.append((method, params))
if method == "agents.create":
return {"ok": True}
if method == "agents.update":
return {"ok": True}
if method == "config.get":
return {"hash": None, "config": {"agents": {"list": []}}}
if method == "config.patch":
return {"ok": True}
raise AssertionError(f"Unexpected method: {method}")
monkeypatch.setattr(agent_provisioning, "openclaw_call", _fake_openclaw_call)
cp = agent_provisioning.OpenClawGatewayControlPlane(
agent_provisioning.GatewayClientConfig(url="ws://gateway.example/ws", token=None),
)
await cp.upsert_agent(
agent_provisioning.GatewayAgentRegistration(
agent_id="board-agent-a",
name="Board Agent A",
workspace_path="/tmp/workspace-board-agent-a",
heartbeat={"every": "10m", "target": "none", "includeReasoning": False},
),
)
assert calls[0][0] == "agents.create"
assert calls[1][0] == "agents.update"
@pytest.mark.asyncio
async def test_control_plane_upsert_agent_handles_already_exists(monkeypatch):
calls: list[tuple[str, dict[str, object] | None]] = []
async def _fake_openclaw_call(method, params=None, config=None):
_ = config
calls.append((method, params))
if method == "agents.create":
raise agent_provisioning.OpenClawGatewayError("already exists")
if method == "agents.update":
return {"ok": True}
if method == "config.get":
return {"hash": None, "config": {"agents": {"list": []}}}
if method == "config.patch":
return {"ok": True}
raise AssertionError(f"Unexpected method: {method}")
monkeypatch.setattr(agent_provisioning, "openclaw_call", _fake_openclaw_call)
cp = agent_provisioning.OpenClawGatewayControlPlane(
agent_provisioning.GatewayClientConfig(url="ws://gateway.example/ws", token=None),
)
await cp.upsert_agent(
agent_provisioning.GatewayAgentRegistration(
agent_id="board-agent-a",
name="Board Agent A",
workspace_path="/tmp/workspace-board-agent-a",
heartbeat={"every": "10m", "target": "none", "includeReasoning": False},
),
)
assert calls[0][0] == "agents.create"
assert calls[1][0] == "agents.update"

View File

@@ -93,4 +93,3 @@ async def test_get_auth_context_optional_returns_none_for_agent_token(
session=_FakeSession(), # type: ignore[arg-type]
)
assert out is None