diff --git a/backend/app/api/agent.py b/backend/app/api/agent.py index 3aedbba8..e3c30af5 100644 --- a/backend/app/api/agent.py +++ b/backend/app/api/agent.py @@ -57,7 +57,7 @@ from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate from app.services.activity_log import record_activity from app.services.board_leads import LeadAgentOptions, LeadAgentRequest, ensure_board_lead_agent -from app.services.gateway_agents import gateway_agent_session_key, parse_gateway_agent_session_key +from app.services.gateway_agents import gateway_agent_session_key from app.services.task_dependencies import ( blocked_by_dependency_ids, dependency_status_by_id, @@ -172,25 +172,19 @@ async def _require_gateway_main( session: AsyncSession, agent: Agent, ) -> tuple[Gateway, GatewayClientConfig]: - session_key = (agent.openclaw_session_id or "").strip() - if not session_key: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Agent missing session key", - ) - gateway_id = parse_gateway_agent_session_key(session_key) - if gateway_id is None: + if agent.board_id is not None: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the dedicated gateway agent may call this endpoint.", ) + gateway_id = agent.gateway_id gateway = await Gateway.objects.by_id(gateway_id).first(session) if gateway is None: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the dedicated gateway agent may call this endpoint.", ) - if gateway_agent_session_key(gateway) != session_key: + if agent.openclaw_session_id != gateway_agent_session_key(gateway): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the dedicated gateway agent may call this endpoint.", @@ -257,7 +251,6 @@ async def list_agents( statement = statement.where(Agent.board_id == agent_ctx.agent.board_id) elif board_id: statement = statement.where(Agent.board_id == board_id) - main_session_keys = await agents_api.get_gateway_main_session_keys(session) statement = statement.order_by(col(Agent.created_at).desc()) def _transform(items: Sequence[Any]) -> Sequence[Any]: @@ -265,7 +258,6 @@ async def list_agents( return [ agents_api.to_agent_read( agents_api.with_computed_status(agent), - main_session_keys, ) for agent in agents ] @@ -758,11 +750,6 @@ async def ask_user_via_gateway_main( detail="Gateway is not configured for this board", ) main_session_key = gateway_agent_session_key(gateway) - if not main_session_key: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Gateway agent session key is required", - ) config = GatewayClientConfig(url=gateway.url, token=gateway.token) correlation = payload.correlation_id.strip() if payload.correlation_id else "" @@ -818,7 +805,8 @@ async def ask_user_via_gateway_main( ) main_agent = await Agent.objects.filter_by( - openclaw_session_id=main_session_key, + gateway_id=gateway.id, + board_id=None, ).first(session) await session.commit() diff --git a/backend/app/api/agents.py b/backend/app/api/agents.py index 19d8bd82..814d3ac8 100644 --- a/backend/app/api/agents.py +++ b/backend/app/api/agents.py @@ -201,32 +201,20 @@ def _gateway_client_config(gateway: Gateway) -> GatewayClientConfig: return GatewayClientConfig(url=gateway.url, token=gateway.token) -async def _get_gateway_main_session_keys(session: AsyncSession) -> set[str]: - gateways = await Gateway.objects.all().all(session) - return {gateway_agent_session_key(gateway) for gateway in gateways} +def _is_gateway_main(agent: Agent) -> bool: + return agent.board_id is None -def _is_gateway_main(agent: Agent, main_session_keys: set[str]) -> bool: - return bool( - agent.openclaw_session_id and agent.openclaw_session_id in main_session_keys, - ) - - -def _to_agent_read(agent: Agent, main_session_keys: set[str]) -> AgentRead: +def _to_agent_read(agent: Agent) -> AgentRead: model = AgentRead.model_validate(agent, from_attributes=True) return model.model_copy( - update={"is_gateway_main": _is_gateway_main(agent, main_session_keys)}, + update={"is_gateway_main": _is_gateway_main(agent)}, ) -async def get_gateway_main_session_keys(session: AsyncSession) -> set[str]: - """Return gateway main-session keys used to compute `is_gateway_main`.""" - return await _get_gateway_main_session_keys(session) - - -def to_agent_read(agent: Agent, main_session_keys: set[str]) -> AgentRead: +def to_agent_read(agent: Agent) -> AgentRead: """Convert an `Agent` model into its API read representation.""" - return _to_agent_read(agent, main_session_keys) + return _to_agent_read(agent) def _coerce_agent_items(items: Sequence[Any]) -> list[Agent]: @@ -239,17 +227,10 @@ def _coerce_agent_items(items: Sequence[Any]) -> list[Agent]: return agents -async def _find_gateway_for_main_session( - session: AsyncSession, - session_key: str | None, -) -> Gateway | None: - if not session_key: +async def _main_agent_gateway(session: AsyncSession, agent: Agent) -> Gateway | None: + if agent.board_id is not None: return None - gateways = await Gateway.objects.all().all(session) - for gateway in gateways: - if gateway_agent_session_key(gateway) == session_key: - return gateway - return None + return await Gateway.objects.by_id(agent.gateway_id).first(session) async def _ensure_gateway_session( @@ -281,8 +262,8 @@ def with_computed_status(agent: Agent) -> Agent: return _with_computed_status(agent) -def _serialize_agent(agent: Agent, main_session_keys: set[str]) -> dict[str, object]: - return _to_agent_read(_with_computed_status(agent), main_session_keys).model_dump( +def _serialize_agent(agent: Agent) -> dict[str, object]: + return _to_agent_read(_with_computed_status(agent)).model_dump( mode="json", ) @@ -331,10 +312,7 @@ async def _require_agent_access( if agent.board_id is None: if not is_org_admin(ctx.member): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) - gateway = await _find_gateway_for_main_session( - session, - agent.openclaw_session_id, - ) + gateway = await _main_agent_gateway(session, agent) if gateway is None or gateway.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) return @@ -593,10 +571,7 @@ async def _apply_agent_update_mutations( updates: dict[str, Any], make_main: bool | None, ) -> tuple[Gateway | None, Gateway | None]: - main_gateway = await _find_gateway_for_main_session( - session, - agent.openclaw_session_id, - ) + main_gateway = await _main_agent_gateway(session, agent) gateway_for_main: Gateway | None = None if make_main: @@ -604,20 +579,48 @@ async def _apply_agent_update_mutations( board_for_main = await _require_board(session, board_source) gateway_for_main, _ = await _require_gateway(session, board_for_main) updates["board_id"] = None + updates["gateway_id"] = gateway_for_main.id agent.is_board_lead = False agent.openclaw_session_id = gateway_agent_session_key(gateway_for_main) main_gateway = gateway_for_main elif make_main is not None: + if "board_id" not in updates or updates["board_id"] is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="board_id is required when converting a gateway-main agent to board scope", + ) + board = await _require_board(session, updates["board_id"]) + if board.gateway_id is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is required", + ) + updates["gateway_id"] = board.gateway_id agent.openclaw_session_id = None - if not make_main and "board_id" in updates: - await _require_board(session, updates["board_id"]) + if make_main is None and "board_id" in updates: + board = await _require_board(session, updates["board_id"]) + if board.gateway_id is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is required", + ) + updates["gateway_id"] = board.gateway_id for key, value in updates.items(): setattr(agent, key, value) if make_main is None and main_gateway is not None: agent.board_id = None + agent.gateway_id = main_gateway.id agent.is_board_lead = False + if make_main is False and agent.board_id is not None: + board = await _require_board(session, agent.board_id) + if board.gateway_id is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Board gateway_id is required", + ) + agent.gateway_id = board.gateway_id agent.updated_at = utcnow() if agent.heartbeat_config is None: agent.heartbeat_config = DEFAULT_HEARTBEAT_CONFIG.copy() @@ -812,6 +815,7 @@ async def _create_agent_from_heartbeat( data: dict[str, Any] = { "name": payload.name, "board_id": board.id, + "gateway_id": gateway.id, "heartbeat_config": DEFAULT_HEARTBEAT_CONFIG.copy(), } agent, raw_token, session_error = await _persist_new_agent( @@ -925,8 +929,7 @@ async def _commit_heartbeat( session.add(agent) await session.commit() await session.refresh(agent) - main_session_keys = await _get_gateway_main_session_keys(session) - return _to_agent_read(_with_computed_status(agent), main_session_keys) + return _to_agent_read(_with_computed_status(agent)) async def _send_wakeup_message( @@ -952,7 +955,6 @@ async def list_agents( ctx: OrganizationContext = ORG_ADMIN_DEP, ) -> LimitOffsetPage[AgentRead]: """List agents visible to the active organization admin.""" - main_session_keys = await _get_gateway_main_session_keys(session) board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False) if board_id is not None and board_id not in set(board_ids): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) @@ -963,9 +965,11 @@ async def list_agents( gateways = await Gateway.objects.filter_by( organization_id=ctx.organization.id, ).all(session) - gateway_keys = [gateway_agent_session_key(gateway) for gateway in gateways] - if gateway_keys: - base_filters.append(col(Agent.openclaw_session_id).in_(gateway_keys)) + gateway_ids = [gateway.id for gateway in gateways] + if gateway_ids: + base_filters.append( + (col(Agent.gateway_id).in_(gateway_ids)) & (col(Agent.board_id).is_(None)), + ) if base_filters: if len(base_filters) == 1: statement = select(Agent).where(base_filters[0]) @@ -979,19 +983,18 @@ async def list_agents( gateway = await Gateway.objects.by_id(gateway_id).first(session) if gateway is None or gateway.organization_id != ctx.organization.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) - gateway_main_key = gateway_agent_session_key(gateway) gateway_board_ids = select(Board.id).where(col(Board.gateway_id) == gateway_id) statement = statement.where( or_( col(Agent.board_id).in_(gateway_board_ids), - col(Agent.openclaw_session_id) == gateway_main_key, + (col(Agent.gateway_id) == gateway_id) & (col(Agent.board_id).is_(None)), ), ) statement = statement.order_by(col(Agent.created_at).desc()) def _transform(items: Sequence[Any]) -> Sequence[Any]: agents = _coerce_agent_items(items) - return [_to_agent_read(_with_computed_status(agent), main_session_keys) for agent in agents] + return [_to_agent_read(_with_computed_status(agent)) for agent in agents] return await paginate(session, statement, transformer=_transform) @@ -1029,13 +1032,10 @@ async def stream_agents( agents = [agent for agent in agents if agent.board_id in allowed_ids] else: agents = [] - main_session_keys = ( - await _get_gateway_main_session_keys(stream_session) if agents else set() - ) for agent in agents: updated_at = agent.updated_at or agent.last_seen_at or utcnow() last_seen = max(updated_at, last_seen) - payload = {"agent": _serialize_agent(agent, main_session_keys)} + payload = {"agent": _serialize_agent(agent)} yield {"event": "agent", "data": json.dumps(payload)} await asyncio.sleep(2) @@ -1059,6 +1059,7 @@ async def create_agent( ) gateway, client_config = await _require_gateway(session, board) data = payload.model_dump() + data["gateway_id"] = gateway.id requested_name = (data.get("name") or "").strip() await _ensure_unique_agent_name( session, @@ -1089,8 +1090,7 @@ async def create_agent( request=provision_request, client_config=client_config, ) - main_session_keys = await _get_gateway_main_session_keys(session) - return _to_agent_read(_with_computed_status(agent), main_session_keys) + return _to_agent_read(_with_computed_status(agent)) @router.get("/{agent_id}", response_model=AgentRead) @@ -1104,8 +1104,7 @@ async def get_agent( if agent is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) await _require_agent_access(session, agent=agent, ctx=ctx, write=False) - main_session_keys = await _get_gateway_main_session_keys(session) - return _to_agent_read(_with_computed_status(agent), main_session_keys) + return _to_agent_read(_with_computed_status(agent)) @router.patch("/{agent_id}", response_model=AgentRead) @@ -1129,8 +1128,7 @@ async def update_agent( make_main=make_main, ) if not updates and not params.force and make_main is None: - main_session_keys = await _get_gateway_main_session_keys(session) - return _to_agent_read(_with_computed_status(agent), main_session_keys) + return _to_agent_read(_with_computed_status(agent)) main_gateway, gateway_for_main = await _apply_agent_update_mutations( session, agent=agent, @@ -1164,8 +1162,7 @@ async def update_agent( agent=agent, request=provision_request, ) - main_session_keys = await _get_gateway_main_session_keys(session) - return _to_agent_read(_with_computed_status(agent), main_session_keys) + return _to_agent_read(_with_computed_status(agent)) @router.post("/{agent_id}/heartbeat", response_model=AgentRead) diff --git a/backend/app/api/board_onboarding.py b/backend/app/api/board_onboarding.py index 71f81ec3..4ddfd752 100644 --- a/backend/app/api/board_onboarding.py +++ b/backend/app/api/board_onboarding.py @@ -339,8 +339,7 @@ async def agent_onboarding_update( gateway = await Gateway.objects.by_id(board.gateway_id).first(session) if ( gateway - and agent.openclaw_session_id - and agent.openclaw_session_id != gateway_agent_session_key(gateway) + and (agent.gateway_id != gateway.id or agent.board_id is not None) ): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) diff --git a/backend/app/api/gateway.py b/backend/app/api/gateway.py index dc42ef98..758dea53 100644 --- a/backend/app/api/gateway.py +++ b/backend/app/api/gateway.py @@ -6,6 +6,7 @@ from collections.abc import Iterable from typing import TYPE_CHECKING from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlmodel import col from app.api.deps import require_org_admin from app.core.auth import AuthContext, get_auth_context @@ -23,6 +24,7 @@ from app.integrations.openclaw_gateway_protocol import ( GATEWAY_METHODS, PROTOCOL_VERSION, ) +from app.models.agents import Agent from app.models.boards import Board from app.models.gateways import Gateway from app.schemas.common import OkResponse @@ -35,7 +37,6 @@ from app.schemas.gateway_api import ( GatewaySessionsResponse, GatewaysStatusResponse, ) -from app.services.gateway_agents import gateway_agent_session_key from app.services.organizations import OrganizationContext, require_board_access if TYPE_CHECKING: @@ -120,10 +121,16 @@ async def _resolve_gateway( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Gateway url is required", ) + main_agent = ( + await Agent.objects.filter_by(gateway_id=gateway.id) + .filter(col(Agent.board_id).is_(None)) + .first(session) + ) + main_session = main_agent.openclaw_session_id if main_agent else None return ( board, GatewayClientConfig(url=gateway.url, token=gateway.token), - gateway_agent_session_key(gateway), + main_session, ) @@ -186,7 +193,6 @@ async def gateways_status( gateway_url=config.url, sessions_count=len(sessions_list), sessions=sessions_list, - main_session_key=main_session, main_session=main_session_entry, main_session_error=main_session_error, ) @@ -241,7 +247,6 @@ async def list_gateway_sessions( return GatewaySessionsResponse( sessions=sessions_list, - main_session_key=main_session, main_session=main_session_entry, ) diff --git a/backend/app/api/gateways.py b/backend/app/api/gateways.py index 94c084a7..df2f38ab 100644 --- a/backend/app/api/gateways.py +++ b/backend/app/api/gateways.py @@ -45,7 +45,6 @@ from app.services.agent_provisioning import ( ) from app.services.gateway_agents import ( gateway_agent_session_key, - gateway_agent_session_key_for_id, gateway_openclaw_agent_id, ) from app.services.template_sync import GatewayTemplateSyncOptions @@ -134,68 +133,27 @@ async def _require_gateway( async def _find_main_agent( session: AsyncSession, gateway: Gateway, - previous_name: str | None = None, - previous_session_key: str | None = None, ) -> Agent | None: - preferred_session_key = gateway_agent_session_key(gateway) - if preferred_session_key: - agent = await Agent.objects.filter_by( - openclaw_session_id=preferred_session_key, - ).first( - session, - ) - if agent: - return agent - if gateway.main_session_key: - agent = await Agent.objects.filter_by( - openclaw_session_id=gateway.main_session_key, - ).first( - session, - ) - if agent: - return agent - if previous_session_key: - agent = await Agent.objects.filter_by( - openclaw_session_id=previous_session_key, - ).first( - session, - ) - if agent: - return agent - names = {_main_agent_name(gateway)} - if previous_name: - names.add(f"{previous_name} Main") - for name in names: - agent = await Agent.objects.filter_by(name=name).first(session) - if agent: - return agent - return None + return ( + await Agent.objects.filter_by(gateway_id=gateway.id) + .filter(col(Agent.board_id).is_(None)) + .first(session) + ) async def _upsert_main_agent_record( session: AsyncSession, gateway: Gateway, - *, - previous: tuple[str | None, str | None] | None = None, ) -> tuple[Agent, bool]: changed = False session_key = gateway_agent_session_key(gateway) - if gateway.main_session_key != session_key: - gateway.main_session_key = session_key - gateway.updated_at = utcnow() - session.add(gateway) - changed = True - agent = await _find_main_agent( - session, - gateway, - previous_name=previous[0] if previous else None, - previous_session_key=previous[1] if previous else None, - ) + agent = await _find_main_agent(session, gateway) if agent is None: agent = Agent( name=_main_agent_name(gateway), status="provisioning", board_id=None, + gateway_id=gateway.id, is_board_lead=False, openclaw_session_id=session_key, heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), @@ -206,6 +164,9 @@ async def _upsert_main_agent_record( if agent.board_id is not None: agent.board_id = None changed = True + if agent.gateway_id != gateway.id: + agent.gateway_id = gateway.id + changed = True if agent.is_board_lead: agent.is_board_lead = False changed = True @@ -262,21 +223,15 @@ def _extract_agent_id_from_entry(item: object) -> str | None: return None -def _extract_config_agents_list(payload: object) -> list[object]: +def _extract_agents_list(payload: object) -> list[object]: + if isinstance(payload, list): + return [item for item in payload] if not isinstance(payload, dict): return [] - data = payload.get("config") or payload.get("parsed") or {} - if not isinstance(data, dict): + agents = payload.get("agents") or [] + if not isinstance(agents, list): return [] - agents = data.get("agents") or {} - if isinstance(agents, list): - return [item for item in agents] - if not isinstance(agents, dict): - return [] - agents_list = agents.get("list") or [] - if not isinstance(agents_list, list): - return [] - return [item for item in agents_list] + return [item for item in agents] async def _gateway_has_main_agent_entry(gateway: Gateway) -> bool: @@ -285,11 +240,11 @@ async def _gateway_has_main_agent_entry(gateway: Gateway) -> bool: config = GatewayClientConfig(url=gateway.url, token=gateway.token) target_id = gateway_openclaw_agent_id(gateway) try: - payload = await openclaw_call("config.get", config=config) + payload = await openclaw_call("agents.list", config=config) except OpenClawGatewayError: # Avoid treating transient gateway connectivity issues as a missing agent entry. return True - for item in _extract_config_agents_list(payload): + for item in _extract_agents_list(payload): if _extract_agent_id_from_entry(item) == target_id: return True return False @@ -376,14 +331,9 @@ async def _ensure_main_agent( gateway: Gateway, auth: AuthContext, *, - previous: tuple[str | None, str | None] | None = None, action: str = "provision", ) -> Agent: - agent, _ = await _upsert_main_agent_record( - session, - gateway, - previous=previous, - ) + agent, _ = await _upsert_main_agent_record(session, gateway) return await _provision_main_agent_record( session, gateway, @@ -464,7 +414,6 @@ async def create_gateway( gateway_id = uuid4() data["id"] = gateway_id data["organization_id"] = ctx.organization.id - data["main_session_key"] = gateway_agent_session_key_for_id(gateway_id) gateway = await crud.create(session, Gateway, **data) await _ensure_main_agent(session, gateway, auth, action="provision") return gateway @@ -500,15 +449,12 @@ async def update_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) - previous_name = gateway.name - previous_session_key = gateway.main_session_key updates = payload.model_dump(exclude_unset=True) await crud.patch(session, gateway, updates) await _ensure_main_agent( session, gateway, auth, - previous=(previous_name, previous_session_key), action="update", ) return gateway @@ -555,14 +501,14 @@ async def delete_gateway( gateway_id=gateway_id, organization_id=ctx.organization.id, ) - gateway_session_key = gateway_agent_session_key(gateway) main_agent = await _find_main_agent(session, gateway) if main_agent is not None: await _clear_agent_foreign_keys(session, agent_id=main_agent.id) await session.delete(main_agent) duplicate_main_agents = await Agent.objects.filter_by( - openclaw_session_id=gateway_session_key, + gateway_id=gateway.id, + board_id=None, ).all(session) for agent in duplicate_main_agents: if main_agent is not None and agent.id == main_agent.id: diff --git a/backend/app/integrations/openclaw_gateway_protocol.py b/backend/app/integrations/openclaw_gateway_protocol.py index f5c573ea..b72a1afe 100644 --- a/backend/app/integrations/openclaw_gateway_protocol.py +++ b/backend/app/integrations/openclaw_gateway_protocol.py @@ -38,6 +38,9 @@ GATEWAY_METHODS = [ "talk.mode", "models.list", "agents.list", + "agents.create", + "agents.update", + "agents.delete", "agents.files.list", "agents.files.get", "agents.files.set", diff --git a/backend/app/models/agents.py b/backend/app/models/agents.py index 20c86e85..1648e98f 100644 --- a/backend/app/models/agents.py +++ b/backend/app/models/agents.py @@ -22,6 +22,7 @@ class Agent(QueryModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) board_id: UUID | None = Field(default=None, foreign_key="boards.id", index=True) + gateway_id: UUID = Field(foreign_key="gateways.id", index=True) name: str = Field(index=True) status: str = Field(default="provisioning", index=True) openclaw_session_id: str | None = Field(default=None, index=True) diff --git a/backend/app/models/gateways.py b/backend/app/models/gateways.py index da464190..954f144f 100644 --- a/backend/app/models/gateways.py +++ b/backend/app/models/gateways.py @@ -23,7 +23,6 @@ class Gateway(QueryModel, table=True): name: str url: str token: str | None = Field(default=None) - main_session_key: str workspace_root: str created_at: datetime = Field(default_factory=utcnow) updated_at: datetime = Field(default_factory=utcnow) diff --git a/backend/app/schemas/agents.py b/backend/app/schemas/agents.py index e37909bc..eee2eb03 100644 --- a/backend/app/schemas/agents.py +++ b/backend/app/schemas/agents.py @@ -112,6 +112,7 @@ class AgentRead(AgentBase): """Public agent representation returned by the API.""" id: UUID + gateway_id: UUID is_board_lead: bool = False is_gateway_main: bool = False openclaw_session_id: str | None = None diff --git a/backend/app/schemas/gateway_api.py b/backend/app/schemas/gateway_api.py index 2a783e31..2ae97692 100644 --- a/backend/app/schemas/gateway_api.py +++ b/backend/app/schemas/gateway_api.py @@ -30,7 +30,6 @@ class GatewaysStatusResponse(SQLModel): gateway_url: str sessions_count: int | None = None sessions: list[object] | None = None - main_session_key: str | None = None main_session: object | None = None main_session_error: str | None = None error: str | None = None @@ -40,7 +39,6 @@ class GatewaySessionsResponse(SQLModel): """Gateway sessions list response payload.""" sessions: list[object] - main_session_key: str | None = None main_session: object | None = None diff --git a/backend/app/schemas/gateways.py b/backend/app/schemas/gateways.py index f3f576dc..233a44d5 100644 --- a/backend/app/schemas/gateways.py +++ b/backend/app/schemas/gateways.py @@ -62,7 +62,6 @@ class GatewayRead(GatewayBase): id: UUID organization_id: UUID token: str | None = None - main_session_key: str created_at: datetime updated_at: datetime diff --git a/backend/app/services/agent_provisioning.py b/backend/app/services/agent_provisioning.py index 7535964d..6abf4379 100644 --- a/backend/app/services/agent_provisioning.py +++ b/backend/app/services/agent_provisioning.py @@ -2,6 +2,7 @@ from __future__ import annotations +from abc import ABC, abstractmethod import hashlib import json import re @@ -19,7 +20,6 @@ from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_sessi from app.services.gateway_agents import ( gateway_agent_session_key, gateway_openclaw_agent_id, - parse_gateway_agent_session_key, ) if TYPE_CHECKING: @@ -146,22 +146,6 @@ def _slugify(value: str) -> str: return slug or uuid4().hex -def _agent_id_from_session_key(session_key: str | None) -> str | None: - value = (session_key or "").strip() - if not value: - return None - # Dedicated Mission Control gateway-agent session keys are not gateway config agent ids. - if parse_gateway_agent_session_key(value) is not None: - return None - if not value.startswith("agent:"): - return None - parts = value.split(":") - if len(parts) < _SESSION_KEY_PARTS_MIN: - return None - agent_id = parts[1].strip() - return agent_id or None - - def _clean_str(value: object) -> str | None: if isinstance(value, str) and value.strip(): return value.strip() @@ -436,70 +420,6 @@ def _session_key(agent: Agent) -> str: return f"agent:{_agent_key(agent)}:main" -async def _supported_gateway_files(config: GatewayClientConfig) -> set[str]: - try: - agents_payload = await openclaw_call("agents.list", config=config) - agents = [] - default_id = None - if isinstance(agents_payload, dict): - agents = list(agents_payload.get("agents") or []) - default_id = agents_payload.get("defaultId") or agents_payload.get( - "default_id", - ) - agent_id = default_id or (agents[0].get("id") if agents else None) - if not agent_id: - return set(DEFAULT_GATEWAY_FILES) - files_payload = await openclaw_call( - "agents.files.list", - {"agentId": agent_id}, - config=config, - ) - if isinstance(files_payload, dict): - files = files_payload.get("files") or [] - supported: set[str] = set() - for item in files: - if not isinstance(item, dict): - continue - name = item.get("name") - if isinstance(name, str) and name: - supported.add(name) - return supported or set(DEFAULT_GATEWAY_FILES) - except OpenClawGatewayError: - pass - return set(DEFAULT_GATEWAY_FILES) - - -async def _reset_session(session_key: str, config: GatewayClientConfig) -> None: - if not session_key: - return - await openclaw_call("sessions.reset", {"key": session_key}, config=config) - - -async def _gateway_agent_files_index( - agent_id: str, - config: GatewayClientConfig, -) -> dict[str, dict[str, Any]]: - try: - payload = await openclaw_call( - "agents.files.list", - {"agentId": agent_id}, - config=config, - ) - if isinstance(payload, dict): - files = payload.get("files") or [] - index: dict[str, dict[str, Any]] = {} - for item in files: - if not isinstance(item, dict): - continue - name = item.get("name") - if isinstance(name, str) and name: - index[name] = dict(item) - return index - except OpenClawGatewayError: - pass - return {} - - def _render_agent_files( context: dict[str, str], agent: Agent, @@ -548,67 +468,205 @@ def _render_agent_files( return rendered -async def _gateway_default_agent_id( - config: GatewayClientConfig, - *, - fallback_session_key: str | None = None, -) -> str | None: - try: - payload = await openclaw_call("agents.list", config=config) - except OpenClawGatewayError: - return _agent_id_from_session_key(fallback_session_key) +@dataclass(frozen=True, slots=True) +class GatewayAgentRegistration: + """Desired gateway runtime state for one agent.""" - agent_id = _extract_agent_id(payload) - if agent_id: - return agent_id - return _agent_id_from_session_key(fallback_session_key) + agent_id: str + name: str + workspace_path: str + heartbeat: dict[str, Any] -async def _patch_gateway_agent_list( - agent_id: str, - workspace_path: str, - heartbeat: dict[str, Any], - config: GatewayClientConfig, -) -> None: - cfg = await openclaw_call("config.get", config=config) - if not isinstance(cfg, dict): - msg = "config.get returned invalid payload" - raise OpenClawGatewayError(msg) - base_hash = cfg.get("hash") - data = cfg.get("config") or cfg.get("parsed") or {} - if not isinstance(data, dict): - msg = "config.get returned invalid config" - raise OpenClawGatewayError(msg) - agents = data.get("agents") or {} - lst = agents.get("list") or [] - if not isinstance(lst, list): - msg = "config agents.list is not a list" - raise OpenClawGatewayError(msg) +class GatewayControlPlane(ABC): + """Abstract gateway runtime interface used by agent lifecycle managers.""" - updated = False - new_list: list[dict[str, Any]] = [] - for entry in lst: - if isinstance(entry, dict) and entry.get("id") == agent_id: - new_entry = dict(entry) - new_entry["workspace"] = workspace_path - new_entry["heartbeat"] = heartbeat - new_list.append(new_entry) - updated = True + @abstractmethod + async def ensure_agent_session(self, session_key: str, *, label: str | None = None) -> None: + raise NotImplementedError + + @abstractmethod + async def reset_agent_session(self, session_key: str) -> None: + raise NotImplementedError + + @abstractmethod + async def delete_agent_session(self, session_key: str) -> None: + raise NotImplementedError + + @abstractmethod + async def upsert_agent(self, registration: GatewayAgentRegistration) -> None: + raise NotImplementedError + + @abstractmethod + async def delete_agent(self, agent_id: str, *, delete_files: bool = True) -> None: + raise NotImplementedError + + @abstractmethod + async def list_supported_files(self) -> set[str]: + raise NotImplementedError + + @abstractmethod + async def list_agent_files(self, agent_id: str) -> dict[str, dict[str, Any]]: + raise NotImplementedError + + @abstractmethod + async def set_agent_file(self, *, agent_id: str, name: str, content: str) -> None: + raise NotImplementedError + + @abstractmethod + async def patch_agent_heartbeats( + self, + entries: list[tuple[str, str, dict[str, Any]]], + ) -> None: + raise NotImplementedError + + +class OpenClawGatewayControlPlane(GatewayControlPlane): + """OpenClaw gateway RPC implementation of the lifecycle control-plane contract.""" + + def __init__(self, config: GatewayClientConfig) -> None: + self._config = config + + async def ensure_agent_session(self, session_key: str, *, label: str | None = None) -> None: + if not session_key: + return + await ensure_session(session_key, config=self._config, label=label) + + async def reset_agent_session(self, session_key: str) -> None: + if not session_key: + return + await openclaw_call("sessions.reset", {"key": session_key}, config=self._config) + + async def delete_agent_session(self, session_key: str) -> None: + if not session_key: + return + await openclaw_call("sessions.delete", {"key": session_key}, config=self._config) + + async def _agent_ids(self) -> set[str]: + payload = await openclaw_call("agents.list", config=self._config) + raw_agents: object = payload + if isinstance(payload, dict): + raw_agents = payload.get("agents") or [] + if not isinstance(raw_agents, list): + return set() + ids: set[str] = set() + for item in raw_agents: + agent_id = _extract_agent_id_from_item(item) + if agent_id: + ids.add(agent_id) + return ids + + async def upsert_agent(self, registration: GatewayAgentRegistration) -> None: + agent_ids = await self._agent_ids() + if registration.agent_id in agent_ids: + await openclaw_call( + "agents.update", + { + "agentId": registration.agent_id, + "name": registration.name, + "workspace": registration.workspace_path, + }, + config=self._config, + ) else: - new_list.append(entry) - if not updated: - new_list.append( - {"id": agent_id, "workspace": workspace_path, "heartbeat": heartbeat}, + # `agents.create` derives `agentId` from `name`, so create with the target id + # and then set the human-facing name in a follow-up update. + await openclaw_call( + "agents.create", + { + "name": registration.agent_id, + "workspace": registration.workspace_path, + }, + config=self._config, + ) + if registration.name != registration.agent_id: + await openclaw_call( + "agents.update", + { + "agentId": registration.agent_id, + "name": registration.name, + "workspace": registration.workspace_path, + }, + config=self._config, + ) + await self.patch_agent_heartbeats( + [(registration.agent_id, registration.workspace_path, registration.heartbeat)], ) - patch: dict[str, Any] = {"agents": {"list": new_list}} - channels_patch = _channel_heartbeat_visibility_patch(data) - if channels_patch is not None: - patch["channels"] = channels_patch - params = {"raw": json.dumps(patch)} - if base_hash: - params["baseHash"] = base_hash - await openclaw_call("config.patch", params, config=config) + async def delete_agent(self, agent_id: str, *, delete_files: bool = True) -> None: + await openclaw_call( + "agents.delete", + {"agentId": agent_id, "deleteFiles": delete_files}, + config=self._config, + ) + + async def list_supported_files(self) -> set[str]: + agents_payload = await openclaw_call("agents.list", config=self._config) + agent_id = _extract_agent_id(agents_payload) + if not agent_id: + return set(DEFAULT_GATEWAY_FILES) + files_payload = await openclaw_call( + "agents.files.list", + {"agentId": agent_id}, + config=self._config, + ) + if not isinstance(files_payload, dict): + return set(DEFAULT_GATEWAY_FILES) + files = files_payload.get("files") or [] + if not isinstance(files, list): + return set(DEFAULT_GATEWAY_FILES) + supported: set[str] = set() + for item in files: + if not isinstance(item, dict): + continue + name = item.get("name") + if isinstance(name, str) and name: + supported.add(name) + return supported or set(DEFAULT_GATEWAY_FILES) + + async def list_agent_files(self, agent_id: str) -> dict[str, dict[str, Any]]: + payload = await openclaw_call( + "agents.files.list", + {"agentId": agent_id}, + config=self._config, + ) + if not isinstance(payload, dict): + return {} + files = payload.get("files") or [] + if not isinstance(files, list): + return {} + index: dict[str, dict[str, Any]] = {} + for item in files: + if not isinstance(item, dict): + continue + name = item.get("name") + if isinstance(name, str) and name: + index[name] = dict(item) + return index + + async def set_agent_file(self, *, agent_id: str, name: str, content: str) -> None: + await openclaw_call( + "agents.files.set", + {"agentId": agent_id, "name": name, "content": content}, + config=self._config, + ) + + async def patch_agent_heartbeats( + self, + entries: list[tuple[str, str, dict[str, Any]]], + ) -> None: + base_hash, raw_list, config_data = await _gateway_config_agent_list(self._config) + entry_by_id = _heartbeat_entry_map(entries) + new_list = _updated_agent_list(raw_list, entry_by_id) + + patch: dict[str, Any] = {"agents": {"list": new_list}} + channels_patch = _channel_heartbeat_visibility_patch(config_data) + if channels_patch is not None: + patch["channels"] = channels_patch + params = {"raw": json.dumps(patch)} + if base_hash: + params["baseHash"] = base_hash + await openclaw_call("config.patch", params, config=self._config) async def _gateway_config_agent_list( @@ -673,6 +731,177 @@ def _updated_agent_list( return new_list +class BaseAgentLifecycleManager(ABC): + """Base class for scalable board/main agent lifecycle managers.""" + + def __init__(self, gateway: Gateway, control_plane: GatewayControlPlane) -> None: + self._gateway = gateway + self._control_plane = control_plane + + @abstractmethod + def _agent_id(self, agent: Agent) -> str: + raise NotImplementedError + + @abstractmethod + def _build_context( + self, + *, + agent: Agent, + auth_token: str, + user: User | None, + board: Board | None, + ) -> dict[str, str]: + raise NotImplementedError + + def _template_overrides(self) -> dict[str, str] | None: + return None + + async def _set_agent_files( + self, + *, + agent_id: str, + rendered: dict[str, str], + existing_files: dict[str, dict[str, Any]], + ) -> None: + for name, content in rendered.items(): + if content == "": + continue + if name in PRESERVE_AGENT_EDITABLE_FILES: + entry = existing_files.get(name) + if entry and not bool(entry.get("missing")): + continue + try: + await self._control_plane.set_agent_file( + agent_id=agent_id, + name=name, + content=content, + ) + except OpenClawGatewayError as exc: + if "unsupported file" in str(exc).lower(): + continue + raise + + async def provision( + self, + *, + agent: Agent, + session_key: str, + auth_token: str, + user: User | None, + options: ProvisionOptions, + board: Board | None = None, + session_label: str | None = None, + ) -> None: + if not self._gateway.workspace_root: + msg = "gateway_workspace_root is required" + raise ValueError(msg) + if not agent.openclaw_session_id: + agent.openclaw_session_id = session_key + await self._control_plane.ensure_agent_session( + session_key, + label=session_label or agent.name, + ) + + agent_id = self._agent_id(agent) + workspace_path = _workspace_path(agent, self._gateway.workspace_root) + heartbeat = _heartbeat_config(agent) + await self._control_plane.upsert_agent( + GatewayAgentRegistration( + agent_id=agent_id, + name=agent.name, + workspace_path=workspace_path, + heartbeat=heartbeat, + ), + ) + + context = self._build_context( + agent=agent, + auth_token=auth_token, + user=user, + board=board, + ) + supported = await self._control_plane.list_supported_files() + supported.update({"USER.md", "SELF.md", "AUTONOMY.md"}) + existing_files = await self._control_plane.list_agent_files(agent_id) + include_bootstrap = _should_include_bootstrap( + action=options.action, + force_bootstrap=options.force_bootstrap, + existing_files=existing_files, + ) + rendered = _render_agent_files( + context, + agent, + supported, + include_bootstrap=include_bootstrap, + template_overrides=self._template_overrides(), + ) + + for name in PRESERVE_AGENT_EDITABLE_FILES: + content = rendered.get(name) + if not content: + continue + with suppress(OSError): + _ensure_workspace_file(workspace_path, name, content, overwrite=False) + + await self._set_agent_files( + agent_id=agent_id, + rendered=rendered, + existing_files=existing_files, + ) + if options.reset_session: + await self._control_plane.reset_agent_session(session_key) + + +class BoardAgentLifecycleManager(BaseAgentLifecycleManager): + """Provisioning manager for board-scoped agents.""" + + def _agent_id(self, agent: Agent) -> str: + return _agent_key(agent) + + def _build_context( + self, + *, + agent: Agent, + auth_token: str, + user: User | None, + board: Board | None, + ) -> dict[str, str]: + if board is None: + msg = "board is required for board-scoped agent provisioning" + raise ValueError(msg) + return _build_context(agent, board, self._gateway, auth_token, user) + + +class GatewayMainAgentLifecycleManager(BaseAgentLifecycleManager): + """Provisioning manager for organization gateway-main agents.""" + + def _agent_id(self, agent: Agent) -> str: + return gateway_openclaw_agent_id(self._gateway) + + def _build_context( + self, + *, + agent: Agent, + auth_token: str, + user: User | None, + board: Board | None, + ) -> dict[str, str]: + _ = board + return _build_main_context(agent, self._gateway, auth_token, user) + + def _template_overrides(self) -> dict[str, str] | None: + return MAIN_TEMPLATE_MAP + + +def _control_plane_for_gateway(gateway: Gateway) -> OpenClawGatewayControlPlane: + if not gateway.url: + msg = "Gateway url is required" + raise OpenClawGatewayError(msg) + return OpenClawGatewayControlPlane( + GatewayClientConfig(url=gateway.url, token=gateway.token), + ) + + async def patch_gateway_agent_heartbeats( gateway: Gateway, *, @@ -682,22 +911,8 @@ async def patch_gateway_agent_heartbeats( Each entry is (agent_id, workspace_path, heartbeat_dict). """ - if not gateway.url: - msg = "Gateway url is required" - raise OpenClawGatewayError(msg) - config = GatewayClientConfig(url=gateway.url, token=gateway.token) - base_hash, raw_list, config_data = await _gateway_config_agent_list(config) - entry_by_id = _heartbeat_entry_map(entries) - new_list = _updated_agent_list(raw_list, entry_by_id) - - patch: dict[str, Any] = {"agents": {"list": new_list}} - channels_patch = _channel_heartbeat_visibility_patch(config_data) - if channels_patch is not None: - patch["channels"] = channels_patch - params = {"raw": json.dumps(patch)} - if base_hash: - params["baseHash"] = base_hash - await openclaw_call("config.patch", params, config=config) + control_plane = _control_plane_for_gateway(gateway) + await control_plane.patch_agent_heartbeats(entries) async def sync_gateway_agent_heartbeats(gateway: Gateway, agents: list[Agent]) -> None: @@ -716,57 +931,6 @@ async def sync_gateway_agent_heartbeats(gateway: Gateway, agents: list[Agent]) - await patch_gateway_agent_heartbeats(gateway, entries=entries) -async def _remove_gateway_agent_list( - agent_id: str, - config: GatewayClientConfig, -) -> None: - cfg = await openclaw_call("config.get", config=config) - if not isinstance(cfg, dict): - msg = "config.get returned invalid payload" - raise OpenClawGatewayError(msg) - base_hash = cfg.get("hash") - data = cfg.get("config") or cfg.get("parsed") or {} - if not isinstance(data, dict): - msg = "config.get returned invalid config" - raise OpenClawGatewayError(msg) - agents = data.get("agents") or {} - lst = agents.get("list") or [] - if not isinstance(lst, list): - msg = "config agents.list is not a list" - raise OpenClawGatewayError(msg) - - new_list = [ - entry for entry in lst if not (isinstance(entry, dict) and entry.get("id") == agent_id) - ] - if len(new_list) == len(lst): - return - patch = {"agents": {"list": new_list}} - params = {"raw": json.dumps(patch)} - if base_hash: - params["baseHash"] = base_hash - await openclaw_call("config.patch", params, config=config) - - -async def _get_gateway_agent_entry( - agent_id: str, - config: GatewayClientConfig, -) -> dict[str, Any] | None: - cfg = await openclaw_call("config.get", config=config) - if not isinstance(cfg, dict): - return None - data = cfg.get("config") or cfg.get("parsed") or {} - if not isinstance(data, dict): - return None - agents = data.get("agents") or {} - lst = agents.get("list") or [] - if not isinstance(lst, list): - return None - for entry in lst: - if isinstance(entry, dict) and entry.get("id") == agent_id: - return entry - return None - - def _should_include_bootstrap( *, action: str, @@ -781,32 +945,6 @@ def _should_include_bootstrap( return not bool(entry and entry.get("missing")) -async def _set_agent_files( - *, - agent_id: str, - rendered: dict[str, str], - existing_files: dict[str, dict[str, Any]], - client_config: GatewayClientConfig, -) -> None: - for name, content in rendered.items(): - if content == "": - continue - if name in PRESERVE_AGENT_EDITABLE_FILES: - entry = existing_files.get(name) - if entry and not bool(entry.get("missing")): - continue - try: - await openclaw_call( - "agents.files.set", - {"agentId": agent_id, "name": name, "content": content}, - config=client_config, - ) - except OpenClawGatewayError as exc: - if "unsupported file" in str(exc).lower(): - continue - raise - - async def provision_agent( agent: Agent, request: AgentProvisionRequest, @@ -815,57 +953,17 @@ async def provision_agent( gateway = request.gateway if not gateway.url: return - if not gateway.workspace_root: - msg = "gateway_workspace_root is required" - raise ValueError(msg) - client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) session_key = _session_key(agent) - await ensure_session(session_key, config=client_config, label=agent.name) - - agent_id = _agent_key(agent) - workspace_path = _workspace_path(agent, gateway.workspace_root) - heartbeat = _heartbeat_config(agent) - await _patch_gateway_agent_list(agent_id, workspace_path, heartbeat, client_config) - - context = _build_context( - agent, - request.board, - gateway, - request.auth_token, - request.user, + control_plane = _control_plane_for_gateway(gateway) + manager = BoardAgentLifecycleManager(gateway, control_plane) + await manager.provision( + agent=agent, + board=request.board, + session_key=session_key, + auth_token=request.auth_token, + user=request.user, + options=request.options, ) - supported = set(await _supported_gateway_files(client_config)) - supported.update({"USER.md", "SELF.md", "AUTONOMY.md"}) - existing_files = await _gateway_agent_files_index(agent_id, client_config) - include_bootstrap = _should_include_bootstrap( - action=request.options.action, - force_bootstrap=request.options.force_bootstrap, - existing_files=existing_files, - ) - - rendered = _render_agent_files( - context, - agent, - supported, - include_bootstrap=include_bootstrap, - ) - - # Ensure editable template files exist locally (best-effort) without overwriting. - for name in PRESERVE_AGENT_EDITABLE_FILES: - content = rendered.get(name) - if not content: - continue - with suppress(OSError): - # Local workspace may not be writable/available; fall back to gateway API. - _ensure_workspace_file(workspace_path, name, content, overwrite=False) - await _set_agent_files( - agent_id=agent_id, - rendered=rendered, - existing_files=existing_files, - client_config=client_config, - ) - if request.options.reset_session: - await _reset_session(session_key, client_config) async def provision_main_agent( @@ -876,52 +974,21 @@ async def provision_main_agent( gateway = request.gateway if not gateway.url: return - session_key = (request.session_key or gateway.main_session_key or "").strip() + session_key = (request.session_key or gateway_agent_session_key(gateway) or "").strip() if not session_key: msg = "gateway main agent session_key is required" raise ValueError(msg) - client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) - await ensure_session( - session_key, - config=client_config, - label=agent.name or "Gateway Agent", + control_plane = _control_plane_for_gateway(gateway) + manager = GatewayMainAgentLifecycleManager(gateway, control_plane) + await manager.provision( + agent=agent, + session_key=session_key, + auth_token=request.auth_token, + user=request.user, + options=request.options, + session_label=agent.name or "Gateway Agent", ) - # Keep gateway default agent intact and use a dedicated OpenClaw agent id for Mission Control. - if not gateway.workspace_root: - msg = "gateway_workspace_root is required" - raise ValueError(msg) - agent_id = gateway_openclaw_agent_id(gateway) - workspace_path = _workspace_path(agent, gateway.workspace_root) - heartbeat = _heartbeat_config(agent) - await _patch_gateway_agent_list(agent_id, workspace_path, heartbeat, client_config) - - context = _build_main_context(agent, gateway, request.auth_token, request.user) - supported = set(await _supported_gateway_files(client_config)) - supported.update({"USER.md", "SELF.md", "AUTONOMY.md"}) - existing_files = await _gateway_agent_files_index(agent_id, client_config) - include_bootstrap = _should_include_bootstrap( - action=request.options.action, - force_bootstrap=request.options.force_bootstrap, - existing_files=existing_files, - ) - - rendered = _render_agent_files( - context, - agent, - supported, - include_bootstrap=include_bootstrap, - template_overrides=MAIN_TEMPLATE_MAP, - ) - await _set_agent_files( - agent_id=agent_id, - rendered=rendered, - existing_files=existing_files, - client_config=client_config, - ) - if request.options.reset_session: - await _reset_session(session_key, client_config) - async def cleanup_agent( agent: Agent, @@ -933,16 +1000,11 @@ async def cleanup_agent( if not gateway.workspace_root: msg = "gateway_workspace_root is required" raise ValueError(msg) - client_config = GatewayClientConfig(url=gateway.url, token=gateway.token) - + control_plane = _control_plane_for_gateway(gateway) agent_id = _agent_key(agent) - entry = await _get_gateway_agent_entry(agent_id, client_config) - await _remove_gateway_agent_list(agent_id, client_config) + await control_plane.delete_agent(agent_id, delete_files=True) session_key = _session_key(agent) - await openclaw_call("sessions.delete", {"key": session_key}, config=client_config) - - workspace_path = entry.get("workspace") if entry else None - if not workspace_path: - workspace_path = _workspace_path(agent, gateway.workspace_root) - return workspace_path + with suppress(OpenClawGatewayError): + await control_plane.delete_agent_session(session_key) + return None diff --git a/backend/app/services/board_leads.py b/backend/app/services/board_leads.py index 41f6d803..2f189bad 100644 --- a/backend/app/services/board_leads.py +++ b/backend/app/services/board_leads.py @@ -78,6 +78,9 @@ async def ensure_board_lead_agent( if existing.name != desired_name: existing.name = desired_name changed = True + if existing.gateway_id != request.gateway.id: + existing.gateway_id = request.gateway.id + changed = True desired_session_key = lead_session_key(board) if not existing.openclaw_session_id: existing.openclaw_session_id = desired_session_key @@ -107,6 +110,7 @@ async def ensure_board_lead_agent( name=config_options.agent_name or lead_agent_name(board), status="provisioning", board_id=board.id, + gateway_id=request.gateway.id, is_board_lead=True, heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), identity_profile=merged_identity_profile, diff --git a/backend/app/services/board_snapshot.py b/backend/app/services/board_snapshot.py index 5516b12b..4df19252 100644 --- a/backend/app/services/board_snapshot.py +++ b/backend/app/services/board_snapshot.py @@ -12,14 +12,12 @@ from app.core.time import utcnow from app.models.agents import Agent from app.models.approvals import Approval from app.models.board_memory import BoardMemory -from app.models.gateways import Gateway from app.models.tasks import Task from app.schemas.agents import AgentRead from app.schemas.approvals import ApprovalRead from app.schemas.board_memory import BoardMemoryRead from app.schemas.boards import BoardRead from app.schemas.view_models import BoardSnapshot, TaskCardRead -from app.services.gateway_agents import gateway_agent_session_key from app.services.task_dependencies import ( blocked_by_dependency_ids, dependency_ids_by_task_id, @@ -47,17 +45,10 @@ def _computed_agent_status(agent: Agent) -> str: return agent.status -async def _gateway_main_session_keys(session: AsyncSession) -> set[str]: - gateways = await Gateway.objects.all().all(session) - return {gateway_agent_session_key(gateway) for gateway in gateways} - - -def _agent_to_read(agent: Agent, main_session_keys: set[str]) -> AgentRead: +def _agent_to_read(agent: Agent) -> AgentRead: model = AgentRead.model_validate(agent, from_attributes=True) computed_status = _computed_agent_status(agent) - is_gateway_main = bool( - agent.openclaw_session_id and agent.openclaw_session_id in main_session_keys, - ) + is_gateway_main = agent.gateway_id is not None and agent.board_id is None return model.model_copy( update={ "status": computed_status, @@ -129,13 +120,12 @@ async def build_board_snapshot(session: AsyncSession, board: Board) -> BoardSnap dependency_ids=list({*all_dependency_ids}), ) - main_session_keys = await _gateway_main_session_keys(session) agents = ( await Agent.objects.filter_by(board_id=board.id) .order_by(col(Agent.created_at).desc()) .all(session) ) - agent_reads = [_agent_to_read(agent, main_session_keys) for agent in agents] + agent_reads = [_agent_to_read(agent) for agent in agents] agent_name_by_id = {agent.id: agent.name for agent in agents} pending_approvals_count = int( diff --git a/backend/app/services/gateway_agents.py b/backend/app/services/gateway_agents.py index 2b74d165..0b9b082e 100644 --- a/backend/app/services/gateway_agents.py +++ b/backend/app/services/gateway_agents.py @@ -29,17 +29,3 @@ def gateway_openclaw_agent_id_for_id(gateway_id: UUID) -> str: def gateway_openclaw_agent_id(gateway: Gateway) -> str: """Return the dedicated OpenClaw config `agentId` for a gateway agent.""" return gateway_openclaw_agent_id_for_id(gateway.id) - - -def parse_gateway_agent_session_key(session_key: str | None) -> UUID | None: - """Parse a gateway id from a dedicated gateway-agent session key.""" - value = (session_key or "").strip() - if not (value.startswith(_GATEWAY_AGENT_PREFIX) and value.endswith(_GATEWAY_AGENT_SUFFIX)): - return None - gateway_id = value[len(_GATEWAY_AGENT_PREFIX) : -len(_GATEWAY_AGENT_SUFFIX)] - if not gateway_id: - return None - try: - return UUID(gateway_id) - except ValueError: - return None diff --git a/backend/app/services/template_sync.py b/backend/app/services/template_sync.py index 6e04c387..d6148f0d 100644 --- a/backend/app/services/template_sync.py +++ b/backend/app/services/template_sync.py @@ -34,7 +34,6 @@ from app.services.agent_provisioning import ( from app.services.gateway_agents import ( gateway_agent_session_key, gateway_openclaw_agent_id, - parse_gateway_agent_session_key, ) _TOOLS_KV_RE = re.compile(r"^(?P[A-Z0-9_]+)=(?P.*)$") @@ -179,54 +178,6 @@ async def _with_gateway_retry( return await backoff.run(fn) -def _agent_id_from_session_key(session_key: str | None) -> str | None: - value = (session_key or "").strip() - if not value: - return None - # Dedicated Mission Control gateway-agent session keys are not gateway config agent ids. - if parse_gateway_agent_session_key(value) is not None: - return None - if not value.startswith("agent:"): - return None - parts = value.split(":") - if len(parts) < SESSION_KEY_PARTS_MIN: - return None - agent_id = parts[1].strip() - return agent_id or None - - -def _extract_agent_id_from_list(items: object) -> str | None: - if not isinstance(items, list): - return None - for item in items: - if isinstance(item, str) and item.strip(): - return item.strip() - if not isinstance(item, dict): - continue - for key in ("id", "agentId", "agent_id"): - raw = item.get(key) - if isinstance(raw, str) and raw.strip(): - return raw.strip() - return None - - -def _extract_agent_id(payload: object) -> str | None: - """Extract a default gateway agent id from common list payload shapes.""" - if isinstance(payload, list): - return _extract_agent_id_from_list(payload) - if not isinstance(payload, dict): - return None - for key in ("defaultId", "default_id", "defaultAgentId", "default_agent_id"): - raw = payload.get(key) - if isinstance(raw, str) and raw.strip(): - return raw.strip() - for key in ("agents", "items", "list", "data"): - agent_id = _extract_agent_id_from_list(payload.get(key)) - if agent_id: - return agent_id - return None - - def _gateway_agent_id(agent: Agent) -> str: session_key = agent.openclaw_session_id or "" if session_key.startswith("agent:"): @@ -304,27 +255,6 @@ async def _get_existing_auth_token( return token or None -async def _gateway_default_agent_id( - config: GatewayClientConfig, - *, - fallback_session_key: str | None = None, - backoff: _GatewayBackoff | None = None, -) -> str | None: - try: - - async def _do_list() -> object: - return await openclaw_call("agents.list", config=config) - - payload = await (backoff.run(_do_list) if backoff else _do_list()) - agent_id = _extract_agent_id(payload) - if agent_id: - return agent_id - except OpenClawGatewayError: - pass - # Avoid falling back to dedicated gateway session keys, which are not agent ids. - return _agent_id_from_session_key(fallback_session_key) - - async def _paused_board_ids(session: AsyncSession, board_ids: list[UUID]) -> set[UUID]: if not board_ids: return set() @@ -532,7 +462,8 @@ async def _sync_main_agent( main_session_key = gateway_agent_session_key(ctx.gateway) main_agent = ( await Agent.objects.all() - .filter(col(Agent.openclaw_session_id) == main_session_key) + .filter(col(Agent.gateway_id) == ctx.gateway.id) + .filter(col(Agent.board_id).is_(None)) .first(ctx.session) ) if main_agent is None: diff --git a/backend/tests/test_agent_provisioning_utils.py b/backend/tests/test_agent_provisioning_utils.py index 6c9150c5..2b8b0644 100644 --- a/backend/tests/test_agent_provisioning_utils.py +++ b/backend/tests/test_agent_provisioning_utils.py @@ -27,19 +27,6 @@ def test_slugify_falls_back_to_uuid_hex(monkeypatch): assert agent_provisioning._slugify("!!!") == "deadbeef" -def test_agent_id_from_session_key_parses_agent_prefix(): - assert agent_provisioning._agent_id_from_session_key(None) is None - assert agent_provisioning._agent_id_from_session_key("") is None - assert agent_provisioning._agent_id_from_session_key("not-agent") is None - assert agent_provisioning._agent_id_from_session_key("agent:") is None - assert agent_provisioning._agent_id_from_session_key("agent:riya:main") == "riya" - - -def test_agent_id_from_session_key_ignores_gateway_main_session_key(): - session_key = gateway_agent_session_key_for_id(uuid4()) - assert agent_provisioning._agent_id_from_session_key(session_key) is None - - def test_extract_agent_id_supports_lists_and_dicts(): assert agent_provisioning._extract_agent_id(["", " ", "abc"]) == "abc" assert agent_provisioning._extract_agent_id([{"agent_id": "xyz"}]) == "xyz" @@ -89,7 +76,6 @@ class _GatewayStub: url: str token: str | None workspace_root: str - main_session_key: str @pytest.mark.asyncio @@ -102,41 +88,56 @@ async def test_provision_main_agent_uses_dedicated_openclaw_agent_id(monkeypatch url="ws://gateway.example/ws", token=None, workspace_root="/tmp/openclaw", - main_session_key=session_key, ) agent = _AgentStub(name="Acme Gateway Agent", openclaw_session_id=session_key) captured: dict[str, object] = {} - async def _fake_ensure_session(*args, **kwargs): + async def _fake_ensure_agent_session(self, session_key, *, label=None): return None - async def _fake_patch_gateway_agent_list(agent_id, workspace_path, heartbeat, config): - captured["patched_agent_id"] = agent_id - captured["workspace_path"] = workspace_path + async def _fake_upsert_agent(self, registration): + captured["patched_agent_id"] = registration.agent_id + captured["workspace_path"] = registration.workspace_path - async def _fake_supported_gateway_files(config): + async def _fake_list_supported_files(self): return set() - async def _fake_gateway_agent_files_index(agent_id, config): + async def _fake_list_agent_files(self, agent_id): captured["files_index_agent_id"] = agent_id return {} def _fake_render_agent_files(*args, **kwargs): return {} - async def _fake_set_agent_files(*args, **kwargs): + async def _fake_set_agent_files(self, **kwargs): return None - monkeypatch.setattr(agent_provisioning, "ensure_session", _fake_ensure_session) - monkeypatch.setattr(agent_provisioning, "_patch_gateway_agent_list", _fake_patch_gateway_agent_list) - monkeypatch.setattr(agent_provisioning, "_supported_gateway_files", _fake_supported_gateway_files) monkeypatch.setattr( - agent_provisioning, - "_gateway_agent_files_index", - _fake_gateway_agent_files_index, + agent_provisioning.OpenClawGatewayControlPlane, + "ensure_agent_session", + _fake_ensure_agent_session, + ) + monkeypatch.setattr( + agent_provisioning.OpenClawGatewayControlPlane, + "upsert_agent", + _fake_upsert_agent, + ) + monkeypatch.setattr( + agent_provisioning.OpenClawGatewayControlPlane, + "list_supported_files", + _fake_list_supported_files, + ) + monkeypatch.setattr( + agent_provisioning.OpenClawGatewayControlPlane, + "list_agent_files", + _fake_list_agent_files, ) monkeypatch.setattr(agent_provisioning, "_render_agent_files", _fake_render_agent_files) - monkeypatch.setattr(agent_provisioning, "_set_agent_files", _fake_set_agent_files) + monkeypatch.setattr( + agent_provisioning.BaseAgentLifecycleManager, + "_set_agent_files", + _fake_set_agent_files, + ) await agent_provisioning.provision_main_agent( agent, diff --git a/backend/tests/test_mentions.py b/backend/tests/test_mentions.py index e791d044..c5268f18 100644 --- a/backend/tests/test_mentions.py +++ b/backend/tests/test_mentions.py @@ -1,36 +1,42 @@ # ruff: noqa +from uuid import uuid4 + from app.models.agents import Agent from app.services.mentions import extract_mentions, matches_agent_mention +def _agent(name: str, *, is_board_lead: bool = False) -> Agent: + return Agent(name=name, gateway_id=uuid4(), is_board_lead=is_board_lead) + + def test_extract_mentions_parses_tokens(): assert extract_mentions("hi @Alex and @bob-2") == {"alex", "bob-2"} def test_matches_agent_mention_matches_first_name(): - agent = Agent(name="Alice Cooper") + agent = _agent("Alice Cooper") assert matches_agent_mention(agent, {"alice"}) is True assert matches_agent_mention(agent, {"cooper"}) is False def test_matches_agent_mention_no_mentions_is_false(): - agent = Agent(name="Alice") + agent = _agent("Alice") assert matches_agent_mention(agent, set()) is False def test_matches_agent_mention_empty_agent_name_is_false(): - agent = Agent(name=" ") + agent = _agent(" ") assert matches_agent_mention(agent, {"alice"}) is False def test_matches_agent_mention_matches_full_normalized_name(): - agent = Agent(name="Alice Cooper") + agent = _agent("Alice Cooper") assert matches_agent_mention(agent, {"alice cooper"}) is True def test_matches_agent_mention_supports_reserved_lead_shortcut(): - lead = Agent(name="Riya", is_board_lead=True) - other = Agent(name="Lead", is_board_lead=False) + lead = _agent("Riya", is_board_lead=True) + other = _agent("Lead", is_board_lead=False) assert matches_agent_mention(lead, {"lead"}) is True assert matches_agent_mention(other, {"lead"}) is False diff --git a/frontend/src/api/generated/model/agentRead.ts b/frontend/src/api/generated/model/agentRead.ts index 4ebba7bd..e7da99ff 100644 --- a/frontend/src/api/generated/model/agentRead.ts +++ b/frontend/src/api/generated/model/agentRead.ts @@ -20,6 +20,7 @@ export interface AgentRead { identity_template?: string | null; soul_template?: string | null; id: string; + gateway_id: string; is_board_lead?: boolean; is_gateway_main?: boolean; openclaw_session_id?: string | null; diff --git a/frontend/src/api/generated/model/gatewayRead.ts b/frontend/src/api/generated/model/gatewayRead.ts index b408b0d9..2c967ea2 100644 --- a/frontend/src/api/generated/model/gatewayRead.ts +++ b/frontend/src/api/generated/model/gatewayRead.ts @@ -15,7 +15,6 @@ export interface GatewayRead { id: string; organization_id: string; token?: string | null; - main_session_key: string; created_at: string; updated_at: string; } diff --git a/frontend/src/api/generated/model/gatewaySessionsResponse.ts b/frontend/src/api/generated/model/gatewaySessionsResponse.ts index 4b5164c1..2a3f4d82 100644 --- a/frontend/src/api/generated/model/gatewaySessionsResponse.ts +++ b/frontend/src/api/generated/model/gatewaySessionsResponse.ts @@ -10,6 +10,5 @@ */ export interface GatewaySessionsResponse { sessions: unknown[]; - main_session_key?: string | null; main_session?: unknown | null; } diff --git a/frontend/src/api/generated/model/gatewaysStatusResponse.ts b/frontend/src/api/generated/model/gatewaysStatusResponse.ts index ac728bfd..9b0eb394 100644 --- a/frontend/src/api/generated/model/gatewaysStatusResponse.ts +++ b/frontend/src/api/generated/model/gatewaysStatusResponse.ts @@ -13,7 +13,6 @@ export interface GatewaysStatusResponse { gateway_url: string; sessions_count?: number | null; sessions?: unknown[] | null; - main_session_key?: string | null; main_session?: unknown | null; main_session_error?: string | null; error?: string | null; diff --git a/frontend/src/app/gateways/[gatewayId]/edit/page.tsx b/frontend/src/app/gateways/[gatewayId]/edit/page.tsx index 6ae1cdef..6b4e2120 100644 --- a/frontend/src/app/gateways/[gatewayId]/edit/page.tsx +++ b/frontend/src/app/gateways/[gatewayId]/edit/page.tsx @@ -82,7 +82,6 @@ export default function EditGatewayPage() { const resolvedName = name ?? loadedGateway?.name ?? ""; const resolvedGatewayUrl = gatewayUrl ?? loadedGateway?.url ?? ""; const resolvedGatewayToken = gatewayToken ?? loadedGateway?.token ?? ""; - const resolvedMainSessionKey = loadedGateway?.main_session_key ?? null; const resolvedWorkspaceRoot = workspaceRoot ?? loadedGateway?.workspace_root ?? DEFAULT_WORKSPACE_ROOT; @@ -165,7 +164,6 @@ export default function EditGatewayPage() { name={resolvedName} gatewayUrl={resolvedGatewayUrl} gatewayToken={resolvedGatewayToken} - mainSessionKey={resolvedMainSessionKey} workspaceRoot={resolvedWorkspaceRoot} gatewayUrlError={gatewayUrlError} gatewayCheckStatus={gatewayCheckStatus} diff --git a/frontend/src/app/gateways/[gatewayId]/page.tsx b/frontend/src/app/gateways/[gatewayId]/page.tsx index d035509f..dcc7701b 100644 --- a/frontend/src/app/gateways/[gatewayId]/page.tsx +++ b/frontend/src/app/gateways/[gatewayId]/page.tsx @@ -178,14 +178,6 @@ export default function GatewayDetailPage() { Runtime

-
-

- Main session key -

-

- {gateway.main_session_key} -

-

Workspace root diff --git a/frontend/src/app/gateways/new/page.tsx b/frontend/src/app/gateways/new/page.tsx index aa9b0aea..f72db43e 100644 --- a/frontend/src/app/gateways/new/page.tsx +++ b/frontend/src/app/gateways/new/page.tsx @@ -125,7 +125,6 @@ export default function NewGatewayPage() { name={name} gatewayUrl={gatewayUrl} gatewayToken={gatewayToken} - mainSessionKey={null} workspaceRoot={workspaceRoot} gatewayUrlError={gatewayUrlError} gatewayCheckStatus={gatewayCheckStatus} diff --git a/frontend/src/app/gateways/page.tsx b/frontend/src/app/gateways/page.tsx index 3531ce61..ca8513e5 100644 --- a/frontend/src/app/gateways/page.tsx +++ b/frontend/src/app/gateways/page.tsx @@ -134,15 +134,6 @@ export default function GatewaysPage() { ), }, - { - accessorKey: "main_session_key", - header: "Main session", - cell: ({ row }) => ( - - {truncate(row.original.main_session_key, 24)} - - ), - }, { accessorKey: "workspace_root", header: "Workspace root", diff --git a/frontend/src/components/gateways/GatewayForm.tsx b/frontend/src/components/gateways/GatewayForm.tsx index 1d5e3fdf..f5068faf 100644 --- a/frontend/src/components/gateways/GatewayForm.tsx +++ b/frontend/src/components/gateways/GatewayForm.tsx @@ -9,7 +9,6 @@ type GatewayFormProps = { name: string; gatewayUrl: string; gatewayToken: string; - mainSessionKey: string | null; workspaceRoot: string; gatewayUrlError: string | null; gatewayCheckStatus: GatewayCheckStatus; @@ -34,7 +33,6 @@ export function GatewayForm({ name, gatewayUrl, gatewayToken, - mainSessionKey, workspaceRoot, gatewayUrlError, gatewayCheckStatus, @@ -130,28 +128,16 @@ export function GatewayForm({

-
-
- - -
-
- - onWorkspaceRootChange(event.target.value)} - placeholder={workspaceRootPlaceholder} - disabled={isLoading} - /> -
+
+ + onWorkspaceRootChange(event.target.value)} + placeholder={workspaceRootPlaceholder} + disabled={isLoading} + />
{errorMessage ? (