diff --git a/backend/app/api/agent.py b/backend/app/api/agent.py index 929863a8..bf0ce86c 100644 --- a/backend/app/api/agent.py +++ b/backend/app/api/agent.py @@ -44,9 +44,9 @@ from app.schemas.gateway_coordination import ( from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate from app.services.activity_log import record_activity -from app.services.openclaw.agent_service import AgentLifecycleService from app.services.openclaw.coordination_service import GatewayCoordinationService from app.services.openclaw.policies import OpenClawAuthorizationPolicy +from app.services.openclaw.provisioning_db import AgentLifecycleService from app.services.task_dependencies import ( blocked_by_dependency_ids, dependency_status_by_id, diff --git a/backend/app/api/agents.py b/backend/app/api/agents.py index 7b50b41c..4400beb7 100644 --- a/backend/app/api/agents.py +++ b/backend/app/api/agents.py @@ -21,7 +21,7 @@ from app.schemas.agents import ( ) from app.schemas.common import OkResponse from app.schemas.pagination import DefaultLimitOffsetPage -from app.services.openclaw.agent_service import AgentLifecycleService, AgentUpdateOptions +from app.services.openclaw.provisioning_db import AgentLifecycleService, AgentUpdateOptions from app.services.organizations import OrganizationContext if TYPE_CHECKING: diff --git a/backend/app/api/gateway.py b/backend/app/api/gateway.py index 718715c9..12b24117 100644 --- a/backend/app/api/gateway.py +++ b/backend/app/api/gateway.py @@ -9,11 +9,6 @@ from fastapi import APIRouter, Depends, Query from app.api.deps import require_org_admin from app.core.auth import AuthContext, get_auth_context from app.db.session import get_session -from app.integrations.openclaw_gateway_protocol import ( - GATEWAY_EVENTS, - GATEWAY_METHODS, - PROTOCOL_VERSION, -) from app.schemas.common import OkResponse from app.schemas.gateway_api import ( GatewayCommandsResponse, @@ -24,6 +19,7 @@ from app.schemas.gateway_api import ( GatewaySessionsResponse, GatewaysStatusResponse, ) +from app.services.openclaw.gateway_rpc import GATEWAY_EVENTS, GATEWAY_METHODS, PROTOCOL_VERSION from app.services.openclaw.session_service import GatewaySessionService from app.services.organizations import OrganizationContext diff --git a/backend/app/integrations/openclaw_gateway_protocol.py b/backend/app/integrations/openclaw_gateway_protocol.py deleted file mode 100644 index b72a1afe..00000000 --- a/backend/app/integrations/openclaw_gateway_protocol.py +++ /dev/null @@ -1,125 +0,0 @@ -"""OpenClaw gateway protocol constants shared across integration layers.""" - -from __future__ import annotations - -PROTOCOL_VERSION = 3 - -# NOTE: These are the base gateway methods from the OpenClaw gateway repo. -# The gateway can expose additional methods at runtime via channel plugins. -GATEWAY_METHODS = [ - "health", - "logs.tail", - "channels.status", - "channels.logout", - "status", - "usage.status", - "usage.cost", - "tts.status", - "tts.providers", - "tts.enable", - "tts.disable", - "tts.convert", - "tts.setProvider", - "config.get", - "config.set", - "config.apply", - "config.patch", - "config.schema", - "exec.approvals.get", - "exec.approvals.set", - "exec.approvals.node.get", - "exec.approvals.node.set", - "exec.approval.request", - "exec.approval.resolve", - "wizard.start", - "wizard.next", - "wizard.cancel", - "wizard.status", - "talk.mode", - "models.list", - "agents.list", - "agents.create", - "agents.update", - "agents.delete", - "agents.files.list", - "agents.files.get", - "agents.files.set", - "skills.status", - "skills.bins", - "skills.install", - "skills.update", - "update.run", - "voicewake.get", - "voicewake.set", - "sessions.list", - "sessions.preview", - "sessions.patch", - "sessions.reset", - "sessions.delete", - "sessions.compact", - "last-heartbeat", - "set-heartbeats", - "wake", - "node.pair.request", - "node.pair.list", - "node.pair.approve", - "node.pair.reject", - "node.pair.verify", - "device.pair.list", - "device.pair.approve", - "device.pair.reject", - "device.token.rotate", - "device.token.revoke", - "node.rename", - "node.list", - "node.describe", - "node.invoke", - "node.invoke.result", - "node.event", - "cron.list", - "cron.status", - "cron.add", - "cron.update", - "cron.remove", - "cron.run", - "cron.runs", - "system-presence", - "system-event", - "send", - "agent", - "agent.identity.get", - "agent.wait", - "browser.request", - "chat.history", - "chat.abort", - "chat.send", -] - -GATEWAY_EVENTS = [ - "connect.challenge", - "agent", - "chat", - "presence", - "tick", - "talk.mode", - "shutdown", - "health", - "heartbeat", - "cron", - "node.pair.requested", - "node.pair.resolved", - "node.invoke.request", - "device.pair.requested", - "device.pair.resolved", - "voicewake.changed", - "exec.approval.requested", - "exec.approval.resolved", -] - -GATEWAY_METHODS_SET = frozenset(GATEWAY_METHODS) -GATEWAY_EVENTS_SET = frozenset(GATEWAY_EVENTS) - - -def is_known_gateway_method(method: str) -> bool: - """Return whether a method name is part of the known base gateway methods.""" - return method in GATEWAY_METHODS_SET diff --git a/backend/app/services/board_snapshot.py b/backend/app/services/board_snapshot.py index 56f2a342..aadce967 100644 --- a/backend/app/services/board_snapshot.py +++ b/backend/app/services/board_snapshot.py @@ -15,7 +15,7 @@ from app.schemas.approvals import ApprovalRead from app.schemas.board_memory import BoardMemoryRead from app.schemas.boards import BoardRead from app.schemas.view_models import BoardSnapshot, TaskCardRead -from app.services.openclaw.agent_service import AgentLifecycleService +from app.services.openclaw.provisioning_db import AgentLifecycleService from app.services.task_dependencies import ( blocked_by_dependency_ids, dependency_ids_by_task_id, diff --git a/backend/app/services/openclaw/__init__.py b/backend/app/services/openclaw/__init__.py index e2517cc5..a3241805 100644 --- a/backend/app/services/openclaw/__init__.py +++ b/backend/app/services/openclaw/__init__.py @@ -1,6 +1,6 @@ """OpenClaw lifecycle package. -Import concrete modules directly (for example: ``app.services.openclaw.agent_service``) +Import concrete modules directly (for example: ``app.services.openclaw.provisioning_db``) to keep architectural boundaries explicit. """ diff --git a/backend/app/services/openclaw/admin_service.py b/backend/app/services/openclaw/admin_service.py index 62633734..9c8d0463 100644 --- a/backend/app/services/openclaw/admin_service.py +++ b/backend/app/services/openclaw/admin_service.py @@ -14,11 +14,6 @@ from app.core.agent_tokens import generate_agent_token, hash_agent_token from app.core.auth import AuthContext from app.core.time import utcnow from app.db import crud -from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig -from app.integrations.openclaw_gateway import ( - OpenClawGatewayError, - openclaw_call, -) from app.models.activity_events import ActivityEvent from app.models.agents import Agent from app.models.approvals import Approval @@ -26,6 +21,8 @@ from app.models.gateways import Gateway from app.models.tasks import Task from app.schemas.gateways import GatewayTemplatesSyncResult from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call from app.services.openclaw.provisioning import OpenClawGatewayProvisioner from app.services.openclaw.provisioning_db import ( GatewayTemplateSyncOptions, diff --git a/backend/app/services/openclaw/agent_service.py b/backend/app/services/openclaw/agent_service.py deleted file mode 100644 index cbd018f9..00000000 --- a/backend/app/services/openclaw/agent_service.py +++ /dev/null @@ -1,1251 +0,0 @@ -"""Async agent lifecycle management service.""" - -from __future__ import annotations - -import asyncio -import json -import logging -import re -from dataclasses import dataclass -from datetime import UTC, datetime -from typing import TYPE_CHECKING, Any, Literal, Protocol -from uuid import UUID, uuid4 - -from fastapi import HTTPException, Request, status -from sqlalchemy import asc, or_ -from sqlmodel import col, select -from sse_starlette.sse import EventSourceResponse - -from app.core.agent_tokens import generate_agent_token, hash_agent_token -from app.core.time import utcnow -from app.db import crud -from app.db.pagination import paginate -from app.db.session import async_session_maker -from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig -from app.integrations.openclaw_gateway import ( - OpenClawGatewayError, - ensure_session, - send_message, -) -from app.models.activity_events import ActivityEvent -from app.models.agents import Agent -from app.models.boards import Board -from app.models.gateways import Gateway -from app.models.organizations import Organization -from app.models.tasks import Task -from app.schemas.agents import ( - AgentCreate, - AgentHeartbeat, - AgentHeartbeatCreate, - AgentRead, - AgentUpdate, -) -from app.schemas.common import OkResponse -from app.services.activity_log import record_activity -from app.services.openclaw.constants import ( - AGENT_SESSION_PREFIX, - DEFAULT_HEARTBEAT_CONFIG, - OFFLINE_AFTER, -) -from app.services.openclaw.policies import OpenClawAuthorizationPolicy -from app.services.openclaw.provisioning import OpenClawGatewayProvisioner -from app.services.openclaw.shared import GatewayAgentIdentity -from app.services.organizations import ( - OrganizationContext, - get_active_membership, - get_org_owner_user, - has_board_access, - is_org_admin, - list_accessible_board_ids, - require_board_access, -) - -if TYPE_CHECKING: - from collections.abc import AsyncIterator, Sequence - - from fastapi_pagination.limit_offset import LimitOffsetPage - from sqlalchemy.sql.elements import ColumnElement - from sqlmodel.ext.asyncio.session import AsyncSession - from sqlmodel.sql.expression import SelectOfScalar - - from app.models.users import User - - -class ActorContextLike(Protocol): - """Minimal actor context contract consumed by lifecycle APIs.""" - - actor_type: Literal["user", "agent"] - user: User | None - agent: Agent | None - - -@dataclass(frozen=True, slots=True) -class AgentUpdateOptions: - """Runtime options for update-and-reprovision flows.""" - - force: bool - user: User | None - context: OrganizationContext - - -@dataclass(frozen=True, slots=True) -class AgentUpdateProvisionTarget: - """Resolved target for an update provision operation.""" - - is_main_agent: bool - board: Board | None - gateway: Gateway - - -@dataclass(frozen=True, slots=True) -class AgentUpdateProvisionRequest: - """Provision request payload for agent updates.""" - - target: AgentUpdateProvisionTarget - raw_token: str - user: User | None - force_bootstrap: bool - - -class AgentLifecycleService: - """Async service encapsulating agent lifecycle behavior for API routes.""" - - def __init__(self, session: AsyncSession) -> None: - self._session = session - self._logger = logging.getLogger(__name__) - - @property - def session(self) -> AsyncSession: - return self._session - - @session.setter - def session(self, value: AsyncSession) -> None: - self._session = value - - @property - def logger(self) -> logging.Logger: - return self._logger - - @logger.setter - def logger(self, value: logging.Logger) -> None: - self._logger = value - - @staticmethod - def parse_since(value: str | None) -> datetime | None: - if not value: - return None - normalized = value.strip() - if not normalized: - return None - normalized = normalized.replace("Z", "+00:00") - try: - parsed = datetime.fromisoformat(normalized) - except ValueError: - return None - if parsed.tzinfo is not None: - return parsed.astimezone(UTC).replace(tzinfo=None) - return parsed - - @staticmethod - def slugify(value: str) -> str: - slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") - return slug or uuid4().hex - - @classmethod - def resolve_session_key(cls, agent: Agent) -> str: - """Resolve the gateway session key for an agent. - - Notes: - - For board-scoped agents, default to a UUID-based key to avoid name collisions. - """ - - existing = (agent.openclaw_session_id or "").strip() - if agent.board_id is None: - # Gateway-main agents must have an explicit deterministic key (set elsewhere). - if existing: - return existing - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Gateway main agent session key is required", - ) - if agent.is_board_lead: - return f"{AGENT_SESSION_PREFIX}:lead-{agent.board_id}:main" - return f"{AGENT_SESSION_PREFIX}:mc-{agent.id}:main" - - @classmethod - def workspace_path(cls, agent_name: str, workspace_root: str | None) -> str: - if not workspace_root: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Gateway workspace_root is required", - ) - root = workspace_root.rstrip("/") - return f"{root}/workspace-{cls.slugify(agent_name)}" - - async def require_board( - self, - board_id: UUID | str | None, - *, - user: User | None = None, - write: bool = False, - ) -> Board: - if not board_id: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="board_id is required", - ) - board = await Board.objects.by_id(board_id).first(self.session) - if board is None: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Board not found", - ) - if user is not None: - await require_board_access(self.session, user=user, board=board, write=write) - return board - - async def require_gateway( - self, - board: Board, - ) -> tuple[Gateway, GatewayClientConfig]: - if not board.gateway_id: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Board gateway_id is required", - ) - gateway = await Gateway.objects.by_id(board.gateway_id).first(self.session) - if gateway is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Board gateway_id is invalid", - ) - if gateway.organization_id != board.organization_id: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Board gateway_id is invalid", - ) - if not gateway.url: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Gateway url is required", - ) - if not gateway.workspace_root: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Gateway workspace_root is required", - ) - return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token) - - @staticmethod - def gateway_client_config(gateway: Gateway) -> GatewayClientConfig: - if not gateway.url: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Gateway url is required", - ) - return GatewayClientConfig(url=gateway.url, token=gateway.token) - - @staticmethod - def is_gateway_main(agent: Agent) -> bool: - return agent.board_id is None - - @classmethod - def to_agent_read(cls, agent: Agent) -> AgentRead: - model = AgentRead.model_validate(agent, from_attributes=True) - return model.model_copy( - update={"is_gateway_main": cls.is_gateway_main(agent)}, - ) - - @staticmethod - def coerce_agent_items(items: Sequence[Any]) -> list[Agent]: - agents: list[Agent] = [] - for item in items: - if not isinstance(item, Agent): - msg = "Expected Agent items from paginated query" - raise TypeError(msg) - agents.append(item) - return agents - - async def get_main_agent_gateway(self, agent: Agent) -> Gateway | None: - if agent.board_id is not None: - return None - return await Gateway.objects.by_id(agent.gateway_id).first(self.session) - - @classmethod - def with_computed_status(cls, agent: Agent) -> Agent: - now = utcnow() - if agent.status in {"deleting", "updating"}: - return agent - if agent.last_seen_at is None: - agent.status = "provisioning" - elif now - agent.last_seen_at > OFFLINE_AFTER: - agent.status = "offline" - return agent - - @classmethod - def serialize_agent(cls, agent: Agent) -> dict[str, object]: - return cls.to_agent_read(cls.with_computed_status(agent)).model_dump(mode="json") - - async def fetch_agent_events( - self, - board_id: UUID | None, - since: datetime, - ) -> list[Agent]: - statement = select(Agent) - if board_id: - statement = statement.where(col(Agent.board_id) == board_id) - statement = statement.where( - or_( - col(Agent.updated_at) >= since, - col(Agent.last_seen_at) >= since, - ), - ).order_by(asc(col(Agent.updated_at))) - return list(await self.session.exec(statement)) - - async def require_user_context(self, user: User | None) -> OrganizationContext: - if user is None: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) - member = await get_active_membership(self.session, user) - if member is None: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) - organization = await Organization.objects.by_id(member.organization_id).first(self.session) - if organization is None: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) - return OrganizationContext(organization=organization, member=member) - - async def require_agent_access( - self, - *, - agent: Agent, - ctx: OrganizationContext, - write: bool, - ) -> None: - if agent.board_id is None: - OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) - gateway = await self.get_main_agent_gateway(agent) - OpenClawAuthorizationPolicy.require_gateway_in_org( - gateway=gateway, - organization_id=ctx.organization.id, - ) - return - - board = await Board.objects.by_id(agent.board_id).first(self.session) - board = OpenClawAuthorizationPolicy.require_board_in_org( - board=board, - organization_id=ctx.organization.id, - ) - allowed = await has_board_access( - self.session, - member=ctx.member, - board=board, - write=write, - ) - OpenClawAuthorizationPolicy.require_board_write_access(allowed=allowed) - - @staticmethod - def record_heartbeat(session: AsyncSession, agent: Agent) -> None: - record_activity( - session, - event_type="agent.heartbeat", - message=f"Heartbeat received from {agent.name}.", - agent_id=agent.id, - ) - - @staticmethod - def record_instruction_failure( - session: AsyncSession, - agent: Agent, - error: str, - action: str, - ) -> None: - action_label = action.replace("_", " ").capitalize() - record_activity( - session, - event_type=f"agent.{action}.failed", - message=f"{action_label} message failed: {error}", - agent_id=agent.id, - ) - - async def coerce_agent_create_payload( - self, - payload: AgentCreate, - actor: ActorContextLike, - ) -> AgentCreate: - if actor.actor_type == "user": - ctx = await self.require_user_context(actor.user) - OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) - return payload - - if actor.actor_type == "agent": - board_id = OpenClawAuthorizationPolicy.resolve_board_lead_create_board_id( - actor_agent=actor.agent, - requested_board_id=payload.board_id, - ) - return AgentCreate(**{**payload.model_dump(), "board_id": board_id}) - - return payload - - async def ensure_unique_agent_name( - self, - *, - board: Board, - gateway: Gateway, - requested_name: str, - ) -> None: - if not requested_name: - return - - existing = ( - await self.session.exec( - select(Agent) - .where(Agent.board_id == board.id) - .where(col(Agent.name).ilike(requested_name)), - ) - ).first() - if existing: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail="An agent with this name already exists on this board.", - ) - - existing_gateway = ( - await self.session.exec( - select(Agent) - .join(Board, col(Agent.board_id) == col(Board.id)) - .where(col(Board.gateway_id) == gateway.id) - .where(col(Agent.name).ilike(requested_name)), - ) - ).first() - if existing_gateway: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail="An agent with this name already exists in this gateway workspace.", - ) - - async def persist_new_agent( - self, - *, - data: dict[str, Any], - ) -> tuple[Agent, str]: - agent = Agent.model_validate(data) - agent.status = "provisioning" - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - if agent.heartbeat_config is None: - agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() - agent.provision_requested_at = utcnow() - agent.provision_action = "provision" - agent.openclaw_session_id = self.resolve_session_key(agent) - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) - return agent, raw_token - - async def _apply_gateway_provisioning( - self, - *, - agent: Agent, - target: AgentUpdateProvisionTarget, - auth_token: str, - user: User | None, - action: str, - wakeup_verb: str, - force_bootstrap: bool, - raise_gateway_errors: bool, - ) -> None: - self.logger.log( - 5, - "agent.provision.start action=%s agent_id=%s target_main=%s", - action, - agent.id, - target.is_main_agent, - ) - try: - if not target.is_main_agent and target.board is None: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="board is required for non-main agent provisioning", - ) - template_user = user - if target.is_main_agent and template_user is None: - template_user = await get_org_owner_user( - self.session, - organization_id=target.gateway.organization_id, - ) - if template_user is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=( - "User context is required to provision the gateway main agent " - "(org owner not found)." - ), - ) - await OpenClawGatewayProvisioner().apply_agent_lifecycle( - agent=agent, - gateway=target.gateway, - board=target.board if not target.is_main_agent else None, - auth_token=auth_token, - user=template_user, - action=action, - force_bootstrap=force_bootstrap, - reset_session=True, - wake=True, - deliver_wakeup=True, - wakeup_verb=wakeup_verb, - ) - agent.provision_confirm_token_hash = None - agent.provision_requested_at = None - agent.provision_action = None - agent.status = "online" - agent.updated_at = utcnow() - self.session.add(agent) - await self.session.commit() - record_activity( - self.session, - event_type=f"agent.{action}.direct", - message=f"{action.capitalize()}d directly for {agent.name}.", - agent_id=agent.id, - ) - record_activity( - self.session, - event_type="agent.wakeup.sent", - message=f"Wakeup message sent to {agent.name}.", - agent_id=agent.id, - ) - await self.session.commit() - self.logger.info( - "agent.provision.success action=%s agent_id=%s", - action, - agent.id, - ) - except OpenClawGatewayError as exc: - self.record_instruction_failure( - self.session, - agent, - str(exc), - action, - ) - await self.session.commit() - self.logger.error( - "agent.provision.gateway_error action=%s agent_id=%s error=%s", - action, - agent.id, - str(exc), - ) - if raise_gateway_errors: - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail=f"Gateway {action} failed: {exc}", - ) from exc - except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover - self.record_instruction_failure( - self.session, - agent, - str(exc), - action, - ) - await self.session.commit() - self.logger.critical( - "agent.provision.runtime_error action=%s agent_id=%s error=%s", - action, - agent.id, - str(exc), - ) - if raise_gateway_errors: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Unexpected error {action}ing agent provisioning.", - ) from exc - - async def provision_new_agent( - self, - *, - agent: Agent, - board: Board, - gateway: Gateway, - auth_token: str, - user: User | None, - force_bootstrap: bool, - ) -> None: - await self._apply_gateway_provisioning( - agent=agent, - target=AgentUpdateProvisionTarget(is_main_agent=False, board=board, gateway=gateway), - auth_token=auth_token, - user=user, - action="provision", - wakeup_verb="provisioned", - force_bootstrap=force_bootstrap, - raise_gateway_errors=True, - ) - - async def validate_agent_update_inputs( - self, - *, - ctx: OrganizationContext, - updates: dict[str, Any], - make_main: bool | None, - ) -> None: - if make_main: - OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) - if "status" in updates: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="status is controlled by agent heartbeat", - ) - if "board_id" in updates and updates["board_id"] is not None: - new_board = await self.require_board(updates["board_id"]) - OpenClawAuthorizationPolicy.require_board_in_org( - board=new_board, - organization_id=ctx.organization.id, - ) - allowed = await has_board_access( - self.session, - member=ctx.member, - board=new_board, - write=True, - ) - OpenClawAuthorizationPolicy.require_board_write_access(allowed=allowed) - - async def apply_agent_update_mutations( - self, - *, - agent: Agent, - updates: dict[str, Any], - make_main: bool | None, - ) -> tuple[Gateway | None, Gateway | None]: - main_gateway = await self.get_main_agent_gateway(agent) - gateway_for_main: Gateway | None = None - - if make_main: - board_source = updates.get("board_id") or agent.board_id - board_for_main = await self.require_board(board_source) - gateway_for_main, _ = await self.require_gateway(board_for_main) - updates["board_id"] = None - updates["gateway_id"] = gateway_for_main.id - agent.is_board_lead = False - agent.openclaw_session_id = GatewayAgentIdentity.session_key(gateway_for_main) - main_gateway = gateway_for_main - elif make_main is not None: - if "board_id" not in updates or updates["board_id"] is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=( - "board_id is required when converting a gateway-main agent " - "to board scope" - ), - ) - board = await self.require_board(updates["board_id"]) - if board.gateway_id is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Board gateway_id is required", - ) - updates["gateway_id"] = board.gateway_id - agent.openclaw_session_id = None - - if make_main is None and "board_id" in updates: - board = await self.require_board(updates["board_id"]) - if board.gateway_id is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Board gateway_id is required", - ) - updates["gateway_id"] = board.gateway_id - for key, value in updates.items(): - setattr(agent, key, value) - - if make_main is None and main_gateway is not None: - agent.board_id = None - agent.gateway_id = main_gateway.id - agent.is_board_lead = False - if make_main is False and agent.board_id is not None: - board = await self.require_board(agent.board_id) - if board.gateway_id is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Board gateway_id is required", - ) - agent.gateway_id = board.gateway_id - agent.updated_at = utcnow() - if agent.heartbeat_config is None: - agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) - return main_gateway, gateway_for_main - - async def resolve_agent_update_target( - self, - *, - agent: Agent, - make_main: bool | None, - main_gateway: Gateway | None, - gateway_for_main: Gateway | None, - ) -> AgentUpdateProvisionTarget: - if make_main: - if gateway_for_main is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Gateway agent requires a gateway configuration", - ) - return AgentUpdateProvisionTarget( - is_main_agent=True, - board=None, - gateway=gateway_for_main, - ) - - if make_main is None and agent.board_id is None and main_gateway is not None: - return AgentUpdateProvisionTarget( - is_main_agent=True, - board=None, - gateway=main_gateway, - ) - - if agent.board_id is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="board_id is required for non-main agents", - ) - board = await self.require_board(agent.board_id) - gateway, _client_config = await self.require_gateway(board) - return AgentUpdateProvisionTarget( - is_main_agent=False, - board=board, - gateway=gateway, - ) - - @staticmethod - def mark_agent_update_pending(agent: Agent) -> str: - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - agent.provision_requested_at = utcnow() - agent.provision_action = "update" - agent.status = "updating" - return raw_token - - async def provision_updated_agent( - self, - *, - agent: Agent, - request: AgentUpdateProvisionRequest, - ) -> None: - await self._apply_gateway_provisioning( - agent=agent, - target=request.target, - auth_token=request.raw_token, - user=request.user, - action="update", - wakeup_verb="updated", - force_bootstrap=request.force_bootstrap, - raise_gateway_errors=True, - ) - - @staticmethod - def heartbeat_lookup_statement(payload: AgentHeartbeatCreate) -> SelectOfScalar[Agent]: - statement = Agent.objects.filter_by(name=payload.name).statement - if payload.board_id is not None: - statement = statement.where(Agent.board_id == payload.board_id) - return statement - - async def create_agent_from_heartbeat( - self, - *, - payload: AgentHeartbeatCreate, - actor: ActorContextLike, - ) -> Agent: - if actor.actor_type == "agent": - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) - if actor.actor_type == "user": - ctx = await self.require_user_context(actor.user) - OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) - - board = await self.require_board( - payload.board_id, - user=actor.user, - write=True, - ) - gateway, _client_config = await self.require_gateway(board) - data: dict[str, Any] = { - "name": payload.name, - "board_id": board.id, - "gateway_id": gateway.id, - "heartbeat_config": DEFAULT_HEARTBEAT_CONFIG.copy(), - } - agent, raw_token = await self.persist_new_agent(data=data) - await self.provision_new_agent( - agent=agent, - board=board, - gateway=gateway, - auth_token=raw_token, - user=actor.user, - force_bootstrap=False, - ) - return agent - - async def handle_existing_user_heartbeat_agent( - self, - *, - agent: Agent, - user: User | None, - ) -> None: - ctx = await self.require_user_context(user) - await self.require_agent_access(agent=agent, ctx=ctx, write=True) - - if agent.agent_token_hash is not None: - return - - raw_token = generate_agent_token() - agent.agent_token_hash = hash_agent_token(raw_token) - if agent.heartbeat_config is None: - agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() - agent.provision_requested_at = utcnow() - agent.provision_action = "provision" - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) - board = await self.require_board( - str(agent.board_id) if agent.board_id else None, - user=user, - write=True, - ) - gateway, _client_config = await self.require_gateway(board) - await self.provision_new_agent( - agent=agent, - board=board, - gateway=gateway, - auth_token=raw_token, - user=user, - force_bootstrap=False, - ) - - async def ensure_heartbeat_session_key( - self, - *, - agent: Agent, - actor: ActorContextLike, - ) -> None: - _ = actor - if agent.board_id is None: - return - desired = self.resolve_session_key(agent) - existing = (agent.openclaw_session_id or "").strip() - if existing == desired: - return - agent.openclaw_session_id = desired - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) - - async def commit_heartbeat( - self, - *, - agent: Agent, - status_value: str | None, - ) -> AgentRead: - if status_value: - agent.status = status_value - elif agent.status == "provisioning": - agent.status = "online" - agent.last_seen_at = utcnow() - agent.updated_at = utcnow() - self.record_heartbeat(self.session, agent) - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) - return self.to_agent_read(self.with_computed_status(agent)) - - async def list_agents( - self, - *, - board_id: UUID | None, - gateway_id: UUID | None, - ctx: OrganizationContext, - ) -> LimitOffsetPage[AgentRead]: - board_ids = await list_accessible_board_ids(self.session, member=ctx.member, write=False) - if board_id is not None: - OpenClawAuthorizationPolicy.require_board_write_access( - allowed=board_id in set(board_ids), - ) - base_filters: list[ColumnElement[bool]] = [] - if board_ids: - base_filters.append(col(Agent.board_id).in_(board_ids)) - if is_org_admin(ctx.member): - gateways = await Gateway.objects.filter_by( - organization_id=ctx.organization.id, - ).all(self.session) - gateway_ids = [gateway.id for gateway in gateways] - if gateway_ids: - base_filters.append( - (col(Agent.gateway_id).in_(gateway_ids)) & (col(Agent.board_id).is_(None)), - ) - if base_filters: - if len(base_filters) == 1: - statement = select(Agent).where(base_filters[0]) - else: - statement = select(Agent).where(or_(*base_filters)) - else: - statement = select(Agent).where(col(Agent.id).is_(None)) - if board_id is not None: - statement = statement.where(col(Agent.board_id) == board_id) - if gateway_id is not None: - gateway = await Gateway.objects.by_id(gateway_id).first(self.session) - if gateway is None or gateway.organization_id != ctx.organization.id: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) - gateway_board_ids = select(Board.id).where(col(Board.gateway_id) == gateway_id) - statement = statement.where( - or_( - col(Agent.board_id).in_(gateway_board_ids), - (col(Agent.gateway_id) == gateway_id) & (col(Agent.board_id).is_(None)), - ), - ) - statement = statement.order_by(col(Agent.created_at).desc()) - - def _transform(items: Sequence[Any]) -> Sequence[Any]: - agents = self.coerce_agent_items(items) - return [self.to_agent_read(self.with_computed_status(agent)) for agent in agents] - - return await paginate(self.session, statement, transformer=_transform) - - async def stream_agents( - self, - *, - request: Request, - board_id: UUID | None, - since: str | None, - ctx: OrganizationContext, - ) -> EventSourceResponse: - since_dt = self.parse_since(since) or utcnow() - last_seen = since_dt - board_ids = await list_accessible_board_ids(self.session, member=ctx.member, write=False) - allowed_ids = set(board_ids) - if board_id is not None: - OpenClawAuthorizationPolicy.require_board_write_access(allowed=board_id in allowed_ids) - - async def event_generator() -> AsyncIterator[dict[str, str]]: - nonlocal last_seen - while True: - if await request.is_disconnected(): - break - async with async_session_maker() as stream_session: - stream_service = AgentLifecycleService(stream_session) - stream_service.logger = self.logger - if board_id is not None: - agents = await stream_service.fetch_agent_events( - board_id, - last_seen, - ) - elif allowed_ids: - agents = await stream_service.fetch_agent_events(None, last_seen) - agents = [agent for agent in agents if agent.board_id in allowed_ids] - else: - agents = [] - for agent in agents: - updated_at = agent.updated_at or agent.last_seen_at or utcnow() - last_seen = max(updated_at, last_seen) - payload = {"agent": self.serialize_agent(agent)} - yield {"event": "agent", "data": json.dumps(payload)} - await asyncio.sleep(2) - - return EventSourceResponse(event_generator(), ping=15) - - async def create_agent( - self, - *, - payload: AgentCreate, - actor: ActorContextLike, - ) -> AgentRead: - self.logger.log( - 5, - "agent.create.start actor_type=%s board_id=%s", - actor.actor_type, - payload.board_id, - ) - payload = await self.coerce_agent_create_payload(payload, actor) - - board = await self.require_board( - payload.board_id, - user=actor.user if actor.actor_type == "user" else None, - write=actor.actor_type == "user", - ) - gateway, _client_config = await self.require_gateway(board) - data = payload.model_dump() - data["gateway_id"] = gateway.id - requested_name = (data.get("name") or "").strip() - await self.ensure_unique_agent_name( - board=board, - gateway=gateway, - requested_name=requested_name, - ) - agent, raw_token = await self.persist_new_agent(data=data) - await self.provision_new_agent( - agent=agent, - board=board, - gateway=gateway, - auth_token=raw_token, - user=actor.user if actor.actor_type == "user" else None, - force_bootstrap=False, - ) - self.logger.info("agent.create.success agent_id=%s board_id=%s", agent.id, board.id) - return self.to_agent_read(self.with_computed_status(agent)) - - async def get_agent( - self, - *, - agent_id: str, - ctx: OrganizationContext, - ) -> AgentRead: - agent = await Agent.objects.by_id(agent_id).first(self.session) - if agent is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) - await self.require_agent_access(agent=agent, ctx=ctx, write=False) - return self.to_agent_read(self.with_computed_status(agent)) - - async def update_agent( - self, - *, - agent_id: str, - payload: AgentUpdate, - options: AgentUpdateOptions, - ) -> AgentRead: - self.logger.log(5, "agent.update.start agent_id=%s force=%s", agent_id, options.force) - agent = await Agent.objects.by_id(agent_id).first(self.session) - if agent is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) - await self.require_agent_access(agent=agent, ctx=options.context, write=True) - updates = payload.model_dump(exclude_unset=True) - make_main = updates.pop("is_gateway_main", None) - await self.validate_agent_update_inputs( - ctx=options.context, - updates=updates, - make_main=make_main, - ) - if not updates and not options.force and make_main is None: - return self.to_agent_read(self.with_computed_status(agent)) - main_gateway, gateway_for_main = await self.apply_agent_update_mutations( - agent=agent, - updates=updates, - make_main=make_main, - ) - target = await self.resolve_agent_update_target( - agent=agent, - make_main=make_main, - main_gateway=main_gateway, - gateway_for_main=gateway_for_main, - ) - raw_token = self.mark_agent_update_pending(agent) - self.session.add(agent) - await self.session.commit() - await self.session.refresh(agent) - provision_request = AgentUpdateProvisionRequest( - target=target, - raw_token=raw_token, - user=options.user, - force_bootstrap=options.force, - ) - await self.provision_updated_agent( - agent=agent, - request=provision_request, - ) - self.logger.info("agent.update.success agent_id=%s", agent.id) - return self.to_agent_read(self.with_computed_status(agent)) - - async def heartbeat_agent( - self, - *, - agent_id: str, - payload: AgentHeartbeat, - actor: ActorContextLike, - ) -> AgentRead: - self.logger.log( - 5, "agent.heartbeat.start agent_id=%s actor_type=%s", agent_id, actor.actor_type - ) - agent = await Agent.objects.by_id(agent_id).first(self.session) - if agent is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) - if actor.actor_type == "agent": - OpenClawAuthorizationPolicy.require_same_agent_actor( - actor_agent_id=actor.agent.id if actor.agent else None, - target_agent_id=agent.id, - ) - if actor.actor_type == "user": - ctx = await self.require_user_context(actor.user) - OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) - await self.require_agent_access(agent=agent, ctx=ctx, write=True) - return await self.commit_heartbeat( - agent=agent, - status_value=payload.status, - ) - - async def heartbeat_or_create_agent( - self, - *, - payload: AgentHeartbeatCreate, - actor: ActorContextLike, - ) -> AgentRead: - self.logger.log( - 5, - "agent.heartbeat_or_create.start actor_type=%s name=%s board_id=%s", - actor.actor_type, - payload.name, - payload.board_id, - ) - if actor.actor_type == "agent" and actor.agent: - return await self.heartbeat_agent( - agent_id=str(actor.agent.id), - payload=AgentHeartbeat(status=payload.status), - actor=actor, - ) - - agent = (await self.session.exec(self.heartbeat_lookup_statement(payload))).first() - if agent is None: - agent = await self.create_agent_from_heartbeat( - payload=payload, - actor=actor, - ) - elif actor.actor_type == "user": - await self.handle_existing_user_heartbeat_agent( - agent=agent, - user=actor.user, - ) - elif actor.actor_type == "agent": - OpenClawAuthorizationPolicy.require_same_agent_actor( - actor_agent_id=actor.agent.id if actor.agent else None, - target_agent_id=agent.id, - ) - - await self.ensure_heartbeat_session_key( - agent=agent, - actor=actor, - ) - return await self.commit_heartbeat( - agent=agent, - status_value=payload.status, - ) - - async def delete_agent( - self, - *, - agent_id: str, - ctx: OrganizationContext, - ) -> OkResponse: - self.logger.log(5, "agent.delete.start agent_id=%s", agent_id) - agent = await Agent.objects.by_id(agent_id).first(self.session) - if agent is None: - return OkResponse() - await self.require_agent_access(agent=agent, ctx=ctx, write=True) - - gateway: Gateway | None = None - client_config: GatewayClientConfig | None = None - workspace_path: str | None = None - - if agent.board_id is None: - # Gateway-main agents are not tied to a board; resolve via agent.gateway_id. - gateway = await Gateway.objects.by_id(agent.gateway_id).first(self.session) - if gateway and gateway.url: - client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) - try: - workspace_path = await OpenClawGatewayProvisioner().delete_agent_lifecycle( - agent=agent, - gateway=gateway, - ) - except OpenClawGatewayError as exc: - self.record_instruction_failure(self.session, agent, str(exc), "delete") - await self.session.commit() - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail=f"Gateway cleanup failed: {exc}", - ) from exc - except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover - self.record_instruction_failure(self.session, agent, str(exc), "delete") - await self.session.commit() - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Workspace cleanup failed: {exc}", - ) from exc - else: - board = await self.require_board(str(agent.board_id)) - gateway, client_config = await self.require_gateway(board) - try: - workspace_path = await OpenClawGatewayProvisioner().delete_agent_lifecycle( - agent=agent, - gateway=gateway, - ) - except OpenClawGatewayError as exc: - self.record_instruction_failure(self.session, agent, str(exc), "delete") - await self.session.commit() - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail=f"Gateway cleanup failed: {exc}", - ) from exc - except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover - self.record_instruction_failure(self.session, agent, str(exc), "delete") - await self.session.commit() - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Workspace cleanup failed: {exc}", - ) from exc - - record_activity( - self.session, - event_type="agent.delete.direct", - message=f"Deleted agent {agent.name}.", - agent_id=None, - ) - now = utcnow() - await crud.update_where( - self.session, - Task, - col(Task.assigned_agent_id) == agent.id, - col(Task.status) == "in_progress", - assigned_agent_id=None, - status="inbox", - in_progress_at=None, - updated_at=now, - commit=False, - ) - await crud.update_where( - self.session, - Task, - col(Task.assigned_agent_id) == agent.id, - col(Task.status) != "in_progress", - assigned_agent_id=None, - updated_at=now, - commit=False, - ) - await crud.update_where( - self.session, - ActivityEvent, - col(ActivityEvent.agent_id) == agent.id, - agent_id=None, - commit=False, - ) - await self.session.delete(agent) - await self.session.commit() - - try: - # Notify the gateway-main agent about cleanup for board-scoped deletes. - # Skip when deleting the gateway-main agent itself. - if gateway is None or client_config is None or agent.board_id is None: - raise ValueError("skip main agent cleanup notification") - main_session = GatewayAgentIdentity.session_key(gateway) - if main_session and workspace_path: - cleanup_message = ( - "Cleanup request for deleted agent.\n\n" - f"Agent name: {agent.name}\n" - f"Agent id: {agent.id}\n" - f"Workspace path: {workspace_path}\n\n" - "Actions:\n" - "1) Remove the workspace directory.\n" - "2) Reply NO_REPLY.\n" - ) - await ensure_session(main_session, config=client_config, label="Gateway Agent") - await send_message( - cleanup_message, - session_key=main_session, - config=client_config, - deliver=False, - ) - except (OSError, OpenClawGatewayError, ValueError): - pass - self.logger.info("agent.delete.success agent_id=%s", agent_id) - return OkResponse() diff --git a/backend/app/services/openclaw/coordination_service.py b/backend/app/services/openclaw/coordination_service.py index 1aefd238..06fa8735 100644 --- a/backend/app/services/openclaw/coordination_service.py +++ b/backend/app/services/openclaw/coordination_service.py @@ -14,8 +14,6 @@ from sqlmodel import col, select from app.core.config import settings from app.core.time import utcnow -from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig -from app.integrations.openclaw_gateway import OpenClawGatewayError, openclaw_call from app.models.agents import Agent from app.models.boards import Board from app.models.gateways import Gateway @@ -34,6 +32,8 @@ from app.services.openclaw.exceptions import ( map_gateway_error_message, map_gateway_error_to_http_exception, ) +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call from app.services.openclaw.internal import agent_key, with_coordination_gateway_retry from app.services.openclaw.policies import OpenClawAuthorizationPolicy from app.services.openclaw.provisioning_db import ( diff --git a/backend/app/integrations/openclaw_gateway.py b/backend/app/services/openclaw/gateway_rpc.py similarity index 64% rename from backend/app/integrations/openclaw_gateway.py rename to backend/app/services/openclaw/gateway_rpc.py index 0ec2a006..bd870a68 100644 --- a/backend/app/integrations/openclaw_gateway.py +++ b/backend/app/services/openclaw/gateway_rpc.py @@ -1,4 +1,9 @@ -"""OpenClaw gateway client helpers for websocket RPC calls.""" +"""OpenClaw gateway websocket RPC client and protocol constants. + +This is the low-level, DB-free interface for talking to the OpenClaw gateway. +Keep gateway RPC protocol details and client helpers here so OpenClaw services +operate within a single scope (no `app.integrations.*` plumbing). +""" from __future__ import annotations @@ -12,20 +17,133 @@ from uuid import uuid4 import websockets from websockets.exceptions import WebSocketException -from app.integrations.openclaw_gateway_protocol import PROTOCOL_VERSION +PROTOCOL_VERSION = 3 + +# NOTE: These are the base gateway methods from the OpenClaw gateway repo. +# The gateway can expose additional methods at runtime via channel plugins. +GATEWAY_METHODS = [ + "health", + "logs.tail", + "channels.status", + "channels.logout", + "status", + "usage.status", + "usage.cost", + "tts.status", + "tts.providers", + "tts.enable", + "tts.disable", + "tts.convert", + "tts.setProvider", + "config.get", + "config.set", + "config.apply", + "config.patch", + "config.schema", + "exec.approvals.get", + "exec.approvals.set", + "exec.approvals.node.get", + "exec.approvals.node.set", + "exec.approval.request", + "exec.approval.resolve", + "wizard.start", + "wizard.next", + "wizard.cancel", + "wizard.status", + "talk.mode", + "models.list", + "agents.list", + "agents.create", + "agents.update", + "agents.delete", + "agents.files.list", + "agents.files.get", + "agents.files.set", + "skills.status", + "skills.bins", + "skills.install", + "skills.update", + "update.run", + "voicewake.get", + "voicewake.set", + "sessions.list", + "sessions.preview", + "sessions.patch", + "sessions.reset", + "sessions.delete", + "sessions.compact", + "last-heartbeat", + "set-heartbeats", + "wake", + "node.pair.request", + "node.pair.list", + "node.pair.approve", + "node.pair.reject", + "node.pair.verify", + "device.pair.list", + "device.pair.approve", + "device.pair.reject", + "device.token.rotate", + "device.token.revoke", + "node.rename", + "node.list", + "node.describe", + "node.invoke", + "node.invoke.result", + "node.event", + "cron.list", + "cron.status", + "cron.add", + "cron.update", + "cron.remove", + "cron.run", + "cron.runs", + "system-presence", + "system-event", + "send", + "agent", + "agent.identity.get", + "agent.wait", + "browser.request", + "chat.history", + "chat.abort", + "chat.send", +] + +GATEWAY_EVENTS = [ + "connect.challenge", + "agent", + "chat", + "presence", + "tick", + "talk.mode", + "shutdown", + "health", + "heartbeat", + "cron", + "node.pair.requested", + "node.pair.resolved", + "node.invoke.request", + "device.pair.requested", + "device.pair.resolved", + "voicewake.changed", + "exec.approval.requested", + "exec.approval.resolved", +] + +GATEWAY_METHODS_SET = frozenset(GATEWAY_METHODS) +GATEWAY_EVENTS_SET = frozenset(GATEWAY_EVENTS) + + +def is_known_gateway_method(method: str) -> bool: + """Return whether a method name is part of the known base gateway methods.""" + return method in GATEWAY_METHODS_SET class OpenClawGatewayError(RuntimeError): """Raised when OpenClaw gateway calls fail.""" -@dataclass -class OpenClawResponse: - """Container for raw OpenClaw payloads.""" - - payload: Any - - @dataclass(frozen=True) class GatewayConfig: """Connection configuration for the OpenClaw gateway.""" @@ -37,7 +155,7 @@ class GatewayConfig: def _build_gateway_url(config: GatewayConfig) -> str: base_url: str = (config.url or "").strip() if not base_url: - message = "Gateway URL is not configured for this board." + message = "Gateway URL is not configured." raise OpenClawGatewayError(message) token = config.token if not token: diff --git a/backend/app/services/openclaw/internal/retry.py b/backend/app/services/openclaw/internal/retry.py index df097061..ce05064c 100644 --- a/backend/app/services/openclaw/internal/retry.py +++ b/backend/app/services/openclaw/internal/retry.py @@ -6,7 +6,6 @@ import asyncio from collections.abc import Awaitable, Callable from typing import TypeVar -from app.integrations.openclaw_gateway import OpenClawGatewayError from app.services.openclaw.constants import ( _COORDINATION_GATEWAY_BASE_DELAY_S, _COORDINATION_GATEWAY_MAX_DELAY_S, @@ -15,6 +14,7 @@ from app.services.openclaw.constants import ( _SECURE_RANDOM, _TRANSIENT_GATEWAY_ERROR_MARKERS, ) +from app.services.openclaw.gateway_rpc import OpenClawGatewayError _T = TypeVar("_T") diff --git a/backend/app/services/openclaw/onboarding_service.py b/backend/app/services/openclaw/onboarding_service.py index 9cc8351c..2dbc99a4 100644 --- a/backend/app/services/openclaw/onboarding_service.py +++ b/backend/app/services/openclaw/onboarding_service.py @@ -2,11 +2,11 @@ from __future__ import annotations -from app.integrations.openclaw_gateway import OpenClawGatewayError from app.models.board_onboarding import BoardOnboardingSession from app.models.boards import Board from app.services.openclaw.coordination_service import AbstractGatewayMessagingService from app.services.openclaw.exceptions import GatewayOperation, map_gateway_error_to_http_exception +from app.services.openclaw.gateway_rpc import OpenClawGatewayError from app.services.openclaw.shared import ( GatewayAgentIdentity, require_gateway_config_for_board, diff --git a/backend/app/services/openclaw/provisioning.py b/backend/app/services/openclaw/provisioning.py index 63146fe4..57efd741 100644 --- a/backend/app/services/openclaw/provisioning.py +++ b/backend/app/services/openclaw/provisioning.py @@ -18,13 +18,6 @@ from uuid import uuid4 from jinja2 import Environment, FileSystemLoader, StrictUndefined, select_autoescape from app.core.config import settings -from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig -from app.integrations.openclaw_gateway import ( - OpenClawGatewayError, - ensure_session, - openclaw_call, - send_message, -) from app.models.agents import Agent from app.models.boards import Board from app.models.gateways import Gateway @@ -40,6 +33,13 @@ from app.services.openclaw.constants import ( MAIN_TEMPLATE_MAP, PRESERVE_AGENT_EDITABLE_FILES, ) +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import ( + OpenClawGatewayError, + ensure_session, + openclaw_call, + send_message, +) from app.services.openclaw.internal import agent_key as _agent_key from app.services.openclaw.shared import GatewayAgentIdentity diff --git a/backend/app/services/openclaw/provisioning_db.py b/backend/app/services/openclaw/provisioning_db.py index 127a6771..c0658927 100644 --- a/backend/app/services/openclaw/provisioning_db.py +++ b/backend/app/services/openclaw/provisioning_db.py @@ -1,49 +1,88 @@ -"""DB-backed OpenClaw provisioning orchestration. +"""DB-backed OpenClaw orchestration and agent lifecycle services. Layering: - `app.services.openclaw.provisioning` contains gateway-only lifecycle operations (no DB calls). - This module builds on top of that layer using AsyncSession for token rotation, lead-agent records, - and bulk template synchronization. + bulk template synchronization, and API-facing agent lifecycle flows. """ from __future__ import annotations import asyncio +import json +import logging +import re from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, TypeVar -from uuid import UUID +from datetime import UTC, datetime +from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar +from uuid import UUID, uuid4 -from sqlalchemy import func +from fastapi import HTTPException, Request, status +from sqlalchemy import asc, func, or_ from sqlmodel import col, select +from sse_starlette.sse import EventSourceResponse from app.core.agent_tokens import generate_agent_token, hash_agent_token, verify_agent_token from app.core.time import utcnow -from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig -from app.integrations.openclaw_gateway import OpenClawGatewayError +from app.db import crud +from app.db.pagination import paginate +from app.db.session import async_session_maker +from app.models.activity_events import ActivityEvent from app.models.agents import Agent from app.models.board_memory import BoardMemory from app.models.boards import Board from app.models.gateways import Gateway +from app.models.organizations import Organization +from app.models.tasks import Task +from app.schemas.agents import ( + AgentCreate, + AgentHeartbeat, + AgentHeartbeatCreate, + AgentRead, + AgentUpdate, +) +from app.schemas.common import OkResponse from app.schemas.gateways import GatewayTemplatesSyncError, GatewayTemplatesSyncResult +from app.services.activity_log import record_activity from app.services.openclaw.constants import ( _NON_TRANSIENT_GATEWAY_ERROR_MARKERS, _SECURE_RANDOM, _TOOLS_KV_RE, _TRANSIENT_GATEWAY_ERROR_MARKERS, + AGENT_SESSION_PREFIX, DEFAULT_HEARTBEAT_CONFIG, + OFFLINE_AFTER, +) +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import ( + OpenClawGatewayError, + ensure_session, + send_message, ) from app.services.openclaw.internal import agent_key as _agent_key +from app.services.openclaw.policies import OpenClawAuthorizationPolicy from app.services.openclaw.provisioning import ( OpenClawGatewayControlPlane, OpenClawGatewayProvisioner, ) from app.services.openclaw.shared import GatewayAgentIdentity -from app.services.organizations import get_org_owner_user +from app.services.organizations import ( + OrganizationContext, + get_active_membership, + get_org_owner_user, + has_board_access, + is_org_admin, + list_accessible_board_ids, + require_board_access, +) if TYPE_CHECKING: - from collections.abc import Awaitable, Callable + from collections.abc import AsyncIterator, Awaitable, Callable, Sequence + from fastapi_pagination.limit_offset import LimitOffsetPage + from sqlalchemy.sql.elements import ColumnElement from sqlmodel.ext.asyncio.session import AsyncSession + from sqlmodel.sql.expression import SelectOfScalar from app.models.users import User @@ -716,3 +755,1183 @@ async def _sync_main_agent( else: result.main_updated = True return stop_sync + + +class ActorContextLike(Protocol): + """Minimal actor context contract consumed by lifecycle APIs.""" + + actor_type: Literal["user", "agent"] + user: User | None + agent: Agent | None + + +@dataclass(frozen=True, slots=True) +class AgentUpdateOptions: + """Runtime options for update-and-reprovision flows.""" + + force: bool + user: User | None + context: OrganizationContext + + +@dataclass(frozen=True, slots=True) +class AgentUpdateProvisionTarget: + """Resolved target for an update provision operation.""" + + is_main_agent: bool + board: Board | None + gateway: Gateway + + +@dataclass(frozen=True, slots=True) +class AgentUpdateProvisionRequest: + """Provision request payload for agent updates.""" + + target: AgentUpdateProvisionTarget + raw_token: str + user: User | None + force_bootstrap: bool + + +class AgentLifecycleService: + """Async service encapsulating agent lifecycle behavior for API routes.""" + + def __init__(self, session: AsyncSession) -> None: + self._session = session + self._logger = logging.getLogger(__name__) + + @property + def session(self) -> AsyncSession: + return self._session + + @session.setter + def session(self, value: AsyncSession) -> None: + self._session = value + + @property + def logger(self) -> logging.Logger: + return self._logger + + @logger.setter + def logger(self, value: logging.Logger) -> None: + self._logger = value + + @staticmethod + def parse_since(value: str | None) -> datetime | None: + if not value: + return None + normalized = value.strip() + if not normalized: + return None + normalized = normalized.replace("Z", "+00:00") + try: + parsed = datetime.fromisoformat(normalized) + except ValueError: + return None + if parsed.tzinfo is not None: + return parsed.astimezone(UTC).replace(tzinfo=None) + return parsed + + @staticmethod + def slugify(value: str) -> str: + slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") + return slug or uuid4().hex + + @classmethod + def resolve_session_key(cls, agent: Agent) -> str: + """Resolve the gateway session key for an agent. + + Notes: + - For board-scoped agents, default to a UUID-based key to avoid name collisions. + """ + + existing = (agent.openclaw_session_id or "").strip() + if agent.board_id is None: + # Gateway-main agents must have an explicit deterministic key (set elsewhere). + if existing: + return existing + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway main agent session key is required", + ) + if agent.is_board_lead: + return f"{AGENT_SESSION_PREFIX}:lead-{agent.board_id}:main" + return f"{AGENT_SESSION_PREFIX}:mc-{agent.id}:main" + + @classmethod + def workspace_path(cls, agent_name: str, workspace_root: str | None) -> str: + if not workspace_root: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway workspace_root is required", + ) + root = workspace_root.rstrip("/") + return f"{root}/workspace-{cls.slugify(agent_name)}" + + async def require_board( + self, + board_id: UUID | str | None, + *, + user: User | None = None, + write: bool = False, + ) -> Board: + if not board_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="board_id is required", + ) + board = await Board.objects.by_id(board_id).first(self.session) + if board is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Board not found", + ) + if user is not None: + await require_board_access(self.session, user=user, board=board, write=write) + return board + + async def require_gateway( + self, + board: Board, + ) -> tuple[Gateway, GatewayClientConfig]: + if not board.gateway_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is required", + ) + gateway = await Gateway.objects.by_id(board.gateway_id).first(self.session) + if gateway is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is invalid", + ) + if gateway.organization_id != board.organization_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is invalid", + ) + if not gateway.url: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway url is required", + ) + if not gateway.workspace_root: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway workspace_root is required", + ) + return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token) + + @staticmethod + def gateway_client_config(gateway: Gateway) -> GatewayClientConfig: + if not gateway.url: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway url is required", + ) + return GatewayClientConfig(url=gateway.url, token=gateway.token) + + @staticmethod + def is_gateway_main(agent: Agent) -> bool: + return agent.board_id is None + + @classmethod + def to_agent_read(cls, agent: Agent) -> AgentRead: + model = AgentRead.model_validate(agent, from_attributes=True) + return model.model_copy( + update={"is_gateway_main": cls.is_gateway_main(agent)}, + ) + + @staticmethod + def coerce_agent_items(items: Sequence[Any]) -> list[Agent]: + agents: list[Agent] = [] + for item in items: + if not isinstance(item, Agent): + msg = "Expected Agent items from paginated query" + raise TypeError(msg) + agents.append(item) + return agents + + async def get_main_agent_gateway(self, agent: Agent) -> Gateway | None: + if agent.board_id is not None: + return None + return await Gateway.objects.by_id(agent.gateway_id).first(self.session) + + @classmethod + def with_computed_status(cls, agent: Agent) -> Agent: + now = utcnow() + if agent.status in {"deleting", "updating"}: + return agent + if agent.last_seen_at is None: + agent.status = "provisioning" + elif now - agent.last_seen_at > OFFLINE_AFTER: + agent.status = "offline" + return agent + + @classmethod + def serialize_agent(cls, agent: Agent) -> dict[str, object]: + return cls.to_agent_read(cls.with_computed_status(agent)).model_dump(mode="json") + + async def fetch_agent_events( + self, + board_id: UUID | None, + since: datetime, + ) -> list[Agent]: + statement = select(Agent) + if board_id: + statement = statement.where(col(Agent.board_id) == board_id) + statement = statement.where( + or_( + col(Agent.updated_at) >= since, + col(Agent.last_seen_at) >= since, + ), + ).order_by(asc(col(Agent.updated_at))) + return list(await self.session.exec(statement)) + + async def require_user_context(self, user: User | None) -> OrganizationContext: + if user is None: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) + member = await get_active_membership(self.session, user) + if member is None: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + organization = await Organization.objects.by_id(member.organization_id).first(self.session) + if organization is None: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + return OrganizationContext(organization=organization, member=member) + + async def require_agent_access( + self, + *, + agent: Agent, + ctx: OrganizationContext, + write: bool, + ) -> None: + if agent.board_id is None: + OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) + gateway = await self.get_main_agent_gateway(agent) + OpenClawAuthorizationPolicy.require_gateway_in_org( + gateway=gateway, + organization_id=ctx.organization.id, + ) + return + + board = await Board.objects.by_id(agent.board_id).first(self.session) + board = OpenClawAuthorizationPolicy.require_board_in_org( + board=board, + organization_id=ctx.organization.id, + ) + allowed = await has_board_access( + self.session, + member=ctx.member, + board=board, + write=write, + ) + OpenClawAuthorizationPolicy.require_board_write_access(allowed=allowed) + + @staticmethod + def record_heartbeat(session: AsyncSession, agent: Agent) -> None: + record_activity( + session, + event_type="agent.heartbeat", + message=f"Heartbeat received from {agent.name}.", + agent_id=agent.id, + ) + + @staticmethod + def record_instruction_failure( + session: AsyncSession, + agent: Agent, + error: str, + action: str, + ) -> None: + action_label = action.replace("_", " ").capitalize() + record_activity( + session, + event_type=f"agent.{action}.failed", + message=f"{action_label} message failed: {error}", + agent_id=agent.id, + ) + + async def coerce_agent_create_payload( + self, + payload: AgentCreate, + actor: ActorContextLike, + ) -> AgentCreate: + if actor.actor_type == "user": + ctx = await self.require_user_context(actor.user) + OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) + return payload + + if actor.actor_type == "agent": + board_id = OpenClawAuthorizationPolicy.resolve_board_lead_create_board_id( + actor_agent=actor.agent, + requested_board_id=payload.board_id, + ) + return AgentCreate(**{**payload.model_dump(), "board_id": board_id}) + + return payload + + async def ensure_unique_agent_name( + self, + *, + board: Board, + gateway: Gateway, + requested_name: str, + ) -> None: + if not requested_name: + return + + existing = ( + await self.session.exec( + select(Agent) + .where(Agent.board_id == board.id) + .where(col(Agent.name).ilike(requested_name)), + ) + ).first() + if existing: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="An agent with this name already exists on this board.", + ) + + existing_gateway = ( + await self.session.exec( + select(Agent) + .join(Board, col(Agent.board_id) == col(Board.id)) + .where(col(Board.gateway_id) == gateway.id) + .where(col(Agent.name).ilike(requested_name)), + ) + ).first() + if existing_gateway: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="An agent with this name already exists in this gateway workspace.", + ) + + async def persist_new_agent( + self, + *, + data: dict[str, Any], + ) -> tuple[Agent, str]: + agent = Agent.model_validate(data) + agent.status = "provisioning" + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + if agent.heartbeat_config is None: + agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() + agent.provision_requested_at = utcnow() + agent.provision_action = "provision" + agent.openclaw_session_id = self.resolve_session_key(agent) + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + return agent, raw_token + + async def _apply_gateway_provisioning( + self, + *, + agent: Agent, + target: AgentUpdateProvisionTarget, + auth_token: str, + user: User | None, + action: str, + wakeup_verb: str, + force_bootstrap: bool, + raise_gateway_errors: bool, + ) -> None: + self.logger.log( + 5, + "agent.provision.start action=%s agent_id=%s target_main=%s", + action, + agent.id, + target.is_main_agent, + ) + try: + if not target.is_main_agent and target.board is None: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="board is required for non-main agent provisioning", + ) + template_user = user + if target.is_main_agent and template_user is None: + template_user = await get_org_owner_user( + self.session, + organization_id=target.gateway.organization_id, + ) + if template_user is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=( + "User context is required to provision the gateway main agent " + "(org owner not found)." + ), + ) + await OpenClawGatewayProvisioner().apply_agent_lifecycle( + agent=agent, + gateway=target.gateway, + board=target.board if not target.is_main_agent else None, + auth_token=auth_token, + user=template_user, + action=action, + force_bootstrap=force_bootstrap, + reset_session=True, + wake=True, + deliver_wakeup=True, + wakeup_verb=wakeup_verb, + ) + agent.provision_confirm_token_hash = None + agent.provision_requested_at = None + agent.provision_action = None + agent.status = "online" + agent.updated_at = utcnow() + self.session.add(agent) + await self.session.commit() + record_activity( + self.session, + event_type=f"agent.{action}.direct", + message=f"{action.capitalize()}d directly for {agent.name}.", + agent_id=agent.id, + ) + record_activity( + self.session, + event_type="agent.wakeup.sent", + message=f"Wakeup message sent to {agent.name}.", + agent_id=agent.id, + ) + await self.session.commit() + self.logger.info( + "agent.provision.success action=%s agent_id=%s", + action, + agent.id, + ) + except OpenClawGatewayError as exc: + self.record_instruction_failure( + self.session, + agent, + str(exc), + action, + ) + await self.session.commit() + self.logger.error( + "agent.provision.gateway_error action=%s agent_id=%s error=%s", + action, + agent.id, + str(exc), + ) + if raise_gateway_errors: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Gateway {action} failed: {exc}", + ) from exc + except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover + self.record_instruction_failure( + self.session, + agent, + str(exc), + action, + ) + await self.session.commit() + self.logger.critical( + "agent.provision.runtime_error action=%s agent_id=%s error=%s", + action, + agent.id, + str(exc), + ) + if raise_gateway_errors: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Unexpected error {action}ing agent provisioning.", + ) from exc + + async def provision_new_agent( + self, + *, + agent: Agent, + board: Board, + gateway: Gateway, + auth_token: str, + user: User | None, + force_bootstrap: bool, + ) -> None: + await self._apply_gateway_provisioning( + agent=agent, + target=AgentUpdateProvisionTarget(is_main_agent=False, board=board, gateway=gateway), + auth_token=auth_token, + user=user, + action="provision", + wakeup_verb="provisioned", + force_bootstrap=force_bootstrap, + raise_gateway_errors=True, + ) + + async def validate_agent_update_inputs( + self, + *, + ctx: OrganizationContext, + updates: dict[str, Any], + make_main: bool | None, + ) -> None: + if make_main: + OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) + if "status" in updates: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="status is controlled by agent heartbeat", + ) + if "board_id" in updates and updates["board_id"] is not None: + new_board = await self.require_board(updates["board_id"]) + OpenClawAuthorizationPolicy.require_board_in_org( + board=new_board, + organization_id=ctx.organization.id, + ) + allowed = await has_board_access( + self.session, + member=ctx.member, + board=new_board, + write=True, + ) + OpenClawAuthorizationPolicy.require_board_write_access(allowed=allowed) + + async def apply_agent_update_mutations( + self, + *, + agent: Agent, + updates: dict[str, Any], + make_main: bool | None, + ) -> tuple[Gateway | None, Gateway | None]: + main_gateway = await self.get_main_agent_gateway(agent) + gateway_for_main: Gateway | None = None + + if make_main: + board_source = updates.get("board_id") or agent.board_id + board_for_main = await self.require_board(board_source) + gateway_for_main, _ = await self.require_gateway(board_for_main) + updates["board_id"] = None + updates["gateway_id"] = gateway_for_main.id + agent.is_board_lead = False + agent.openclaw_session_id = GatewayAgentIdentity.session_key(gateway_for_main) + main_gateway = gateway_for_main + elif make_main is not None: + if "board_id" not in updates or updates["board_id"] is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=( + "board_id is required when converting a gateway-main agent " + "to board scope" + ), + ) + board = await self.require_board(updates["board_id"]) + if board.gateway_id is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is required", + ) + updates["gateway_id"] = board.gateway_id + agent.openclaw_session_id = None + + if make_main is None and "board_id" in updates: + board = await self.require_board(updates["board_id"]) + if board.gateway_id is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is required", + ) + updates["gateway_id"] = board.gateway_id + for key, value in updates.items(): + setattr(agent, key, value) + + if make_main is None and main_gateway is not None: + agent.board_id = None + agent.gateway_id = main_gateway.id + agent.is_board_lead = False + if make_main is False and agent.board_id is not None: + board = await self.require_board(agent.board_id) + if board.gateway_id is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is required", + ) + agent.gateway_id = board.gateway_id + agent.updated_at = utcnow() + if agent.heartbeat_config is None: + agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + return main_gateway, gateway_for_main + + async def resolve_agent_update_target( + self, + *, + agent: Agent, + make_main: bool | None, + main_gateway: Gateway | None, + gateway_for_main: Gateway | None, + ) -> AgentUpdateProvisionTarget: + if make_main: + if gateway_for_main is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway agent requires a gateway configuration", + ) + return AgentUpdateProvisionTarget( + is_main_agent=True, + board=None, + gateway=gateway_for_main, + ) + + if make_main is None and agent.board_id is None and main_gateway is not None: + return AgentUpdateProvisionTarget( + is_main_agent=True, + board=None, + gateway=main_gateway, + ) + + if agent.board_id is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="board_id is required for non-main agents", + ) + board = await self.require_board(agent.board_id) + gateway, _client_config = await self.require_gateway(board) + return AgentUpdateProvisionTarget( + is_main_agent=False, + board=board, + gateway=gateway, + ) + + @staticmethod + def mark_agent_update_pending(agent: Agent) -> str: + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + agent.provision_requested_at = utcnow() + agent.provision_action = "update" + agent.status = "updating" + return raw_token + + async def provision_updated_agent( + self, + *, + agent: Agent, + request: AgentUpdateProvisionRequest, + ) -> None: + await self._apply_gateway_provisioning( + agent=agent, + target=request.target, + auth_token=request.raw_token, + user=request.user, + action="update", + wakeup_verb="updated", + force_bootstrap=request.force_bootstrap, + raise_gateway_errors=True, + ) + + @staticmethod + def heartbeat_lookup_statement(payload: AgentHeartbeatCreate) -> SelectOfScalar[Agent]: + statement = Agent.objects.filter_by(name=payload.name).statement + if payload.board_id is not None: + statement = statement.where(Agent.board_id == payload.board_id) + return statement + + async def create_agent_from_heartbeat( + self, + *, + payload: AgentHeartbeatCreate, + actor: ActorContextLike, + ) -> Agent: + if actor.actor_type == "agent": + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + if actor.actor_type == "user": + ctx = await self.require_user_context(actor.user) + OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) + + board = await self.require_board( + payload.board_id, + user=actor.user, + write=True, + ) + gateway, _client_config = await self.require_gateway(board) + data: dict[str, Any] = { + "name": payload.name, + "board_id": board.id, + "gateway_id": gateway.id, + "heartbeat_config": DEFAULT_HEARTBEAT_CONFIG.copy(), + } + agent, raw_token = await self.persist_new_agent(data=data) + await self.provision_new_agent( + agent=agent, + board=board, + gateway=gateway, + auth_token=raw_token, + user=actor.user, + force_bootstrap=False, + ) + return agent + + async def handle_existing_user_heartbeat_agent( + self, + *, + agent: Agent, + user: User | None, + ) -> None: + ctx = await self.require_user_context(user) + await self.require_agent_access(agent=agent, ctx=ctx, write=True) + + if agent.agent_token_hash is not None: + return + + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + if agent.heartbeat_config is None: + agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() + agent.provision_requested_at = utcnow() + agent.provision_action = "provision" + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + board = await self.require_board( + str(agent.board_id) if agent.board_id else None, + user=user, + write=True, + ) + gateway, _client_config = await self.require_gateway(board) + await self.provision_new_agent( + agent=agent, + board=board, + gateway=gateway, + auth_token=raw_token, + user=user, + force_bootstrap=False, + ) + + async def ensure_heartbeat_session_key( + self, + *, + agent: Agent, + actor: ActorContextLike, + ) -> None: + _ = actor + if agent.board_id is None: + return + desired = self.resolve_session_key(agent) + existing = (agent.openclaw_session_id or "").strip() + if existing == desired: + return + agent.openclaw_session_id = desired + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + + async def commit_heartbeat( + self, + *, + agent: Agent, + status_value: str | None, + ) -> AgentRead: + if status_value: + agent.status = status_value + elif agent.status == "provisioning": + agent.status = "online" + agent.last_seen_at = utcnow() + agent.updated_at = utcnow() + self.record_heartbeat(self.session, agent) + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + return self.to_agent_read(self.with_computed_status(agent)) + + async def list_agents( + self, + *, + board_id: UUID | None, + gateway_id: UUID | None, + ctx: OrganizationContext, + ) -> LimitOffsetPage[AgentRead]: + board_ids = await list_accessible_board_ids(self.session, member=ctx.member, write=False) + if board_id is not None: + OpenClawAuthorizationPolicy.require_board_write_access( + allowed=board_id in set(board_ids), + ) + base_filters: list[ColumnElement[bool]] = [] + if board_ids: + base_filters.append(col(Agent.board_id).in_(board_ids)) + if is_org_admin(ctx.member): + gateways = await Gateway.objects.filter_by( + organization_id=ctx.organization.id, + ).all(self.session) + gateway_ids = [gateway.id for gateway in gateways] + if gateway_ids: + base_filters.append( + (col(Agent.gateway_id).in_(gateway_ids)) & (col(Agent.board_id).is_(None)), + ) + if base_filters: + if len(base_filters) == 1: + statement = select(Agent).where(base_filters[0]) + else: + statement = select(Agent).where(or_(*base_filters)) + else: + statement = select(Agent).where(col(Agent.id).is_(None)) + if board_id is not None: + statement = statement.where(col(Agent.board_id) == board_id) + if gateway_id is not None: + gateway = await Gateway.objects.by_id(gateway_id).first(self.session) + if gateway is None or gateway.organization_id != ctx.organization.id: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + gateway_board_ids = select(Board.id).where(col(Board.gateway_id) == gateway_id) + statement = statement.where( + or_( + col(Agent.board_id).in_(gateway_board_ids), + (col(Agent.gateway_id) == gateway_id) & (col(Agent.board_id).is_(None)), + ), + ) + statement = statement.order_by(col(Agent.created_at).desc()) + + def _transform(items: Sequence[Any]) -> Sequence[Any]: + agents = self.coerce_agent_items(items) + return [self.to_agent_read(self.with_computed_status(agent)) for agent in agents] + + return await paginate(self.session, statement, transformer=_transform) + + async def stream_agents( + self, + *, + request: Request, + board_id: UUID | None, + since: str | None, + ctx: OrganizationContext, + ) -> EventSourceResponse: + since_dt = self.parse_since(since) or utcnow() + last_seen = since_dt + board_ids = await list_accessible_board_ids(self.session, member=ctx.member, write=False) + allowed_ids = set(board_ids) + if board_id is not None: + OpenClawAuthorizationPolicy.require_board_write_access(allowed=board_id in allowed_ids) + + async def event_generator() -> AsyncIterator[dict[str, str]]: + nonlocal last_seen + while True: + if await request.is_disconnected(): + break + async with async_session_maker() as stream_session: + stream_service = AgentLifecycleService(stream_session) + stream_service.logger = self.logger + if board_id is not None: + agents = await stream_service.fetch_agent_events( + board_id, + last_seen, + ) + elif allowed_ids: + agents = await stream_service.fetch_agent_events(None, last_seen) + agents = [agent for agent in agents if agent.board_id in allowed_ids] + else: + agents = [] + for agent in agents: + updated_at = agent.updated_at or agent.last_seen_at or utcnow() + last_seen = max(updated_at, last_seen) + payload = {"agent": self.serialize_agent(agent)} + yield {"event": "agent", "data": json.dumps(payload)} + await asyncio.sleep(2) + + return EventSourceResponse(event_generator(), ping=15) + + async def create_agent( + self, + *, + payload: AgentCreate, + actor: ActorContextLike, + ) -> AgentRead: + self.logger.log( + 5, + "agent.create.start actor_type=%s board_id=%s", + actor.actor_type, + payload.board_id, + ) + payload = await self.coerce_agent_create_payload(payload, actor) + + board = await self.require_board( + payload.board_id, + user=actor.user if actor.actor_type == "user" else None, + write=actor.actor_type == "user", + ) + gateway, _client_config = await self.require_gateway(board) + data = payload.model_dump() + data["gateway_id"] = gateway.id + requested_name = (data.get("name") or "").strip() + await self.ensure_unique_agent_name( + board=board, + gateway=gateway, + requested_name=requested_name, + ) + agent, raw_token = await self.persist_new_agent(data=data) + await self.provision_new_agent( + agent=agent, + board=board, + gateway=gateway, + auth_token=raw_token, + user=actor.user if actor.actor_type == "user" else None, + force_bootstrap=False, + ) + self.logger.info("agent.create.success agent_id=%s board_id=%s", agent.id, board.id) + return self.to_agent_read(self.with_computed_status(agent)) + + async def get_agent( + self, + *, + agent_id: str, + ctx: OrganizationContext, + ) -> AgentRead: + agent = await Agent.objects.by_id(agent_id).first(self.session) + if agent is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + await self.require_agent_access(agent=agent, ctx=ctx, write=False) + return self.to_agent_read(self.with_computed_status(agent)) + + async def update_agent( + self, + *, + agent_id: str, + payload: AgentUpdate, + options: AgentUpdateOptions, + ) -> AgentRead: + self.logger.log(5, "agent.update.start agent_id=%s force=%s", agent_id, options.force) + agent = await Agent.objects.by_id(agent_id).first(self.session) + if agent is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + await self.require_agent_access(agent=agent, ctx=options.context, write=True) + updates = payload.model_dump(exclude_unset=True) + make_main = updates.pop("is_gateway_main", None) + await self.validate_agent_update_inputs( + ctx=options.context, + updates=updates, + make_main=make_main, + ) + if not updates and not options.force and make_main is None: + return self.to_agent_read(self.with_computed_status(agent)) + main_gateway, gateway_for_main = await self.apply_agent_update_mutations( + agent=agent, + updates=updates, + make_main=make_main, + ) + target = await self.resolve_agent_update_target( + agent=agent, + make_main=make_main, + main_gateway=main_gateway, + gateway_for_main=gateway_for_main, + ) + raw_token = self.mark_agent_update_pending(agent) + self.session.add(agent) + await self.session.commit() + await self.session.refresh(agent) + provision_request = AgentUpdateProvisionRequest( + target=target, + raw_token=raw_token, + user=options.user, + force_bootstrap=options.force, + ) + await self.provision_updated_agent( + agent=agent, + request=provision_request, + ) + self.logger.info("agent.update.success agent_id=%s", agent.id) + return self.to_agent_read(self.with_computed_status(agent)) + + async def heartbeat_agent( + self, + *, + agent_id: str, + payload: AgentHeartbeat, + actor: ActorContextLike, + ) -> AgentRead: + self.logger.log( + 5, "agent.heartbeat.start agent_id=%s actor_type=%s", agent_id, actor.actor_type + ) + agent = await Agent.objects.by_id(agent_id).first(self.session) + if agent is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + if actor.actor_type == "agent": + OpenClawAuthorizationPolicy.require_same_agent_actor( + actor_agent_id=actor.agent.id if actor.agent else None, + target_agent_id=agent.id, + ) + if actor.actor_type == "user": + ctx = await self.require_user_context(actor.user) + OpenClawAuthorizationPolicy.require_org_admin(is_admin=is_org_admin(ctx.member)) + await self.require_agent_access(agent=agent, ctx=ctx, write=True) + return await self.commit_heartbeat( + agent=agent, + status_value=payload.status, + ) + + async def heartbeat_or_create_agent( + self, + *, + payload: AgentHeartbeatCreate, + actor: ActorContextLike, + ) -> AgentRead: + self.logger.log( + 5, + "agent.heartbeat_or_create.start actor_type=%s name=%s board_id=%s", + actor.actor_type, + payload.name, + payload.board_id, + ) + if actor.actor_type == "agent" and actor.agent: + return await self.heartbeat_agent( + agent_id=str(actor.agent.id), + payload=AgentHeartbeat(status=payload.status), + actor=actor, + ) + + agent = (await self.session.exec(self.heartbeat_lookup_statement(payload))).first() + if agent is None: + agent = await self.create_agent_from_heartbeat( + payload=payload, + actor=actor, + ) + elif actor.actor_type == "user": + await self.handle_existing_user_heartbeat_agent( + agent=agent, + user=actor.user, + ) + elif actor.actor_type == "agent": + OpenClawAuthorizationPolicy.require_same_agent_actor( + actor_agent_id=actor.agent.id if actor.agent else None, + target_agent_id=agent.id, + ) + + await self.ensure_heartbeat_session_key( + agent=agent, + actor=actor, + ) + return await self.commit_heartbeat( + agent=agent, + status_value=payload.status, + ) + + async def delete_agent( + self, + *, + agent_id: str, + ctx: OrganizationContext, + ) -> OkResponse: + self.logger.log(5, "agent.delete.start agent_id=%s", agent_id) + agent = await Agent.objects.by_id(agent_id).first(self.session) + if agent is None: + return OkResponse() + await self.require_agent_access(agent=agent, ctx=ctx, write=True) + + gateway: Gateway | None = None + client_config: GatewayClientConfig | None = None + workspace_path: str | None = None + + if agent.board_id is None: + # Gateway-main agents are not tied to a board; resolve via agent.gateway_id. + gateway = await Gateway.objects.by_id(agent.gateway_id).first(self.session) + if gateway and gateway.url: + client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) + try: + workspace_path = await OpenClawGatewayProvisioner().delete_agent_lifecycle( + agent=agent, + gateway=gateway, + ) + except OpenClawGatewayError as exc: + self.record_instruction_failure(self.session, agent, str(exc), "delete") + await self.session.commit() + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Gateway cleanup failed: {exc}", + ) from exc + except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover + self.record_instruction_failure(self.session, agent, str(exc), "delete") + await self.session.commit() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Workspace cleanup failed: {exc}", + ) from exc + else: + board = await self.require_board(str(agent.board_id)) + gateway, client_config = await self.require_gateway(board) + try: + workspace_path = await OpenClawGatewayProvisioner().delete_agent_lifecycle( + agent=agent, + gateway=gateway, + ) + except OpenClawGatewayError as exc: + self.record_instruction_failure(self.session, agent, str(exc), "delete") + await self.session.commit() + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Gateway cleanup failed: {exc}", + ) from exc + except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover + self.record_instruction_failure(self.session, agent, str(exc), "delete") + await self.session.commit() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Workspace cleanup failed: {exc}", + ) from exc + + record_activity( + self.session, + event_type="agent.delete.direct", + message=f"Deleted agent {agent.name}.", + agent_id=None, + ) + now = utcnow() + await crud.update_where( + self.session, + Task, + col(Task.assigned_agent_id) == agent.id, + col(Task.status) == "in_progress", + assigned_agent_id=None, + status="inbox", + in_progress_at=None, + updated_at=now, + commit=False, + ) + await crud.update_where( + self.session, + Task, + col(Task.assigned_agent_id) == agent.id, + col(Task.status) != "in_progress", + assigned_agent_id=None, + updated_at=now, + commit=False, + ) + await crud.update_where( + self.session, + ActivityEvent, + col(ActivityEvent.agent_id) == agent.id, + agent_id=None, + commit=False, + ) + await self.session.delete(agent) + await self.session.commit() + + try: + # Notify the gateway-main agent about cleanup for board-scoped deletes. + # Skip when deleting the gateway-main agent itself. + if gateway is None or client_config is None or agent.board_id is None: + raise ValueError("skip main agent cleanup notification") + main_session = GatewayAgentIdentity.session_key(gateway) + if main_session and workspace_path: + cleanup_message = ( + "Cleanup request for deleted agent.\n\n" + f"Agent name: {agent.name}\n" + f"Agent id: {agent.id}\n" + f"Workspace path: {workspace_path}\n\n" + "Actions:\n" + "1) Remove the workspace directory.\n" + "2) Reply NO_REPLY.\n" + ) + await ensure_session(main_session, config=client_config, label="Gateway Agent") + await send_message( + cleanup_message, + session_key=main_session, + config=client_config, + deliver=False, + ) + except (OSError, OpenClawGatewayError, ValueError): + pass + self.logger.info("agent.delete.success agent_id=%s", agent_id) + return OkResponse() diff --git a/backend/app/services/openclaw/session_service.py b/backend/app/services/openclaw/session_service.py index e0f39f5b..17fd6345 100644 --- a/backend/app/services/openclaw/session_service.py +++ b/backend/app/services/openclaw/session_service.py @@ -11,14 +11,6 @@ from uuid import UUID from fastapi import HTTPException, status from sqlmodel import col -from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig -from app.integrations.openclaw_gateway import ( - OpenClawGatewayError, - ensure_session, - get_chat_history, - openclaw_call, - send_message, -) from app.models.agents import Agent from app.models.boards import Board from app.models.gateways import Gateway @@ -30,6 +22,14 @@ from app.schemas.gateway_api import ( GatewaySessionsResponse, GatewaysStatusResponse, ) +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import ( + OpenClawGatewayError, + ensure_session, + get_chat_history, + openclaw_call, + send_message, +) from app.services.openclaw.policies import OpenClawAuthorizationPolicy from app.services.organizations import require_board_access diff --git a/backend/app/services/openclaw/shared.py b/backend/app/services/openclaw/shared.py index 343452f4..d87ce474 100644 --- a/backend/app/services/openclaw/shared.py +++ b/backend/app/services/openclaw/shared.py @@ -8,8 +8,6 @@ from uuid import UUID, uuid4 from fastapi import HTTPException, status -from app.integrations.openclaw_gateway import GatewayConfig as _GatewayClientConfig -from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message from app.models.boards import Board from app.models.gateways import Gateway from app.services.openclaw.constants import ( @@ -17,6 +15,8 @@ from app.services.openclaw.constants import ( _GATEWAY_AGENT_SUFFIX, _GATEWAY_OPENCLAW_AGENT_PREFIX, ) +from app.services.openclaw.gateway_rpc import GatewayConfig as _GatewayClientConfig +from app.services.openclaw.gateway_rpc import OpenClawGatewayError, ensure_session, send_message if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession diff --git a/backend/tests/test_agent_delete_main_agent.py b/backend/tests/test_agent_delete_main_agent.py index 9a3da74f..8dced0d7 100644 --- a/backend/tests/test_agent_delete_main_agent.py +++ b/backend/tests/test_agent_delete_main_agent.py @@ -9,7 +9,7 @@ from uuid import UUID, uuid4 import pytest -import app.services.openclaw.agent_service as agent_service +import app.services.openclaw.provisioning_db as agent_service @dataclass diff --git a/backend/tests/test_agent_provisioning_utils.py b/backend/tests/test_agent_provisioning_utils.py index cca862ce..7c75f386 100644 --- a/backend/tests/test_agent_provisioning_utils.py +++ b/backend/tests/test_agent_provisioning_utils.py @@ -8,7 +8,7 @@ from uuid import UUID, uuid4 import pytest import app.services.openclaw.provisioning as agent_provisioning -from app.services.openclaw.agent_service import AgentLifecycleService +from app.services.openclaw.provisioning_db import AgentLifecycleService from app.services.openclaw.shared import GatewayAgentIdentity diff --git a/backend/tests/test_lifecycle_services.py b/backend/tests/test_lifecycle_services.py index e05dbee0..f0d96418 100644 --- a/backend/tests/test_lifecycle_services.py +++ b/backend/tests/test_lifecycle_services.py @@ -13,8 +13,8 @@ from fastapi import HTTPException, status import app.services.openclaw.coordination_service as coordination_lifecycle import app.services.openclaw.onboarding_service as onboarding_lifecycle -from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig -from app.integrations.openclaw_gateway import OpenClawGatewayError +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig +from app.services.openclaw.gateway_rpc import OpenClawGatewayError from app.services.openclaw.shared import GatewayAgentIdentity diff --git a/backend/tests/test_openclaw_import_boundaries.py b/backend/tests/test_openclaw_import_boundaries.py index 0e8f3496..164fce24 100644 --- a/backend/tests/test_openclaw_import_boundaries.py +++ b/backend/tests/test_openclaw_import_boundaries.py @@ -27,6 +27,6 @@ def test_no_openclaw_package_barrel_imports() -> None: assert not violations, ( "Use concrete OpenClaw modules (for example " - "`from app.services.openclaw.agent_service import ...`) instead of package imports. " + "`from app.services.openclaw.provisioning_db import ...`) instead of package imports. " f"Violations: {', '.join(violations)}" ) diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 8b3f3bde..f77dfb87 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -51,8 +51,7 @@ flowchart LR ### Gateway integration (optional) Mission Control can call into an OpenClaw Gateway over WebSockets. -- Client: `backend/app/integrations/openclaw_gateway.py` -- Known methods/events: `backend/app/integrations/openclaw_gateway_protocol.py` +- Client + protocol: `backend/app/services/openclaw/gateway_rpc.py` - Protocol doc: `docs/openclaw_gateway_ws.md` - Base gateway config (getting started): `docs/openclaw_gateway_base_config.md` diff --git a/docs/openclaw_gateway_base_config.md b/docs/openclaw_gateway_base_config.md index dfd20af0..7bdb22d2 100644 --- a/docs/openclaw_gateway_base_config.md +++ b/docs/openclaw_gateway_base_config.md @@ -151,7 +151,7 @@ In the Mission Control UI: 4. Use the "Check connection" action to confirm reachability. Implementation note (how Mission Control sends tokens): -- If you provide a token, Mission Control's backend will include it when connecting to the gateway (it attaches it to the URL query string and also sends it in the `connect` params). See `backend/app/integrations/openclaw_gateway.py`. +- If you provide a token, Mission Control's backend will include it when connecting to the gateway (it attaches it to the URL query string and also sends it in the `connect` params). See `backend/app/services/openclaw/gateway_rpc.py`. ### Workspace Root (Mission Control) vs Workspace (Gateway)