diff --git a/.gitignore b/.gitignore index addb7216..e9bd9489 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,5 @@ node_modules/ backend/~/ backend/coverage.* backend/.coverage -frontend/coverage \ No newline at end of file +frontend/coverage +backend/app/services/openclaw/.device-keys \ No newline at end of file diff --git a/backend/app/models/agents.py b/backend/app/models/agents.py index 1648e98f..9c97e970 100644 --- a/backend/app/models/agents.py +++ b/backend/app/models/agents.py @@ -43,6 +43,11 @@ class Agent(QueryModel, table=True): delete_requested_at: datetime | None = Field(default=None) delete_confirm_token_hash: str | None = Field(default=None, index=True) last_seen_at: datetime | None = Field(default=None) + lifecycle_generation: int = Field(default=0) + wake_attempts: int = Field(default=0) + last_wake_sent_at: datetime | None = Field(default=None) + checkin_deadline_at: datetime | None = Field(default=None) + last_provision_error: str | None = Field(default=None, sa_column=Column(Text)) is_board_lead: bool = Field(default=False, index=True) created_at: datetime = Field(default_factory=utcnow) updated_at: datetime = Field(default_factory=utcnow) diff --git a/backend/app/services/openclaw/admin_service.py b/backend/app/services/openclaw/admin_service.py index ee7235f5..5a58e18f 100644 --- a/backend/app/services/openclaw/admin_service.py +++ b/backend/app/services/openclaw/admin_service.py @@ -21,24 +21,18 @@ from app.models.gateways import Gateway from app.models.tasks import Task from app.schemas.gateways import GatewayTemplatesSyncResult from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG -from app.services.openclaw.db_agent_state import ( - mark_provision_complete, - mark_provision_requested, - mint_agent_token, -) from app.services.openclaw.db_service import OpenClawDBService from app.services.openclaw.error_messages import normalize_gateway_error_message from app.services.openclaw.gateway_compat import check_gateway_version_compatibility from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call -from app.services.openclaw.provisioning import OpenClawGatewayProvisioner +from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator from app.services.openclaw.provisioning_db import ( GatewayTemplateSyncOptions, OpenClawProvisioningService, ) from app.services.openclaw.session_service import GatewayTemplateSyncQuery from app.services.openclaw.shared import GatewayAgentIdentity -from app.services.organizations import get_org_owner_user if TYPE_CHECKING: from sqlmodel.ext.asyncio.session import AsyncSession @@ -222,69 +216,38 @@ class GatewayAdminLifecycleService(OpenClawDBService): action: str, notify: bool, ) -> Agent: - template_user = user or await get_org_owner_user( - self.session, - organization_id=gateway.organization_id, - ) - if template_user is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, - detail="Organization owner not found (required for gateway agent USER.md rendering).", - ) - raw_token = mint_agent_token(agent) - mark_provision_requested( - agent, - action=action, - status="updating" if action == "update" else "provisioning", - ) - await self.add_commit_refresh(agent) - if not gateway.url: - return agent - + orchestrator = AgentLifecycleOrchestrator(self.session) try: - await OpenClawGatewayProvisioner().apply_agent_lifecycle( - agent=agent, + provisioned = await orchestrator.run_lifecycle( gateway=gateway, + agent_id=agent.id, board=None, - auth_token=raw_token, - user=template_user, + user=user, action=action, + auth_token=None, + force_bootstrap=False, + reset_session=False, wake=notify, deliver_wakeup=True, + wakeup_verb=None, + clear_confirm_token=False, + raise_gateway_errors=True, ) - except OpenClawGatewayError as exc: + except HTTPException: self.logger.error( - "gateway.main_agent.provision_failed_gateway gateway_id=%s agent_id=%s error=%s", + "gateway.main_agent.provision_failed gateway_id=%s agent_id=%s action=%s", gateway.id, agent.id, - str(exc), + action, ) - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail=f"Gateway {action} failed: {exc}", - ) from exc - except (OSError, RuntimeError, ValueError) as exc: - self.logger.error( - "gateway.main_agent.provision_failed gateway_id=%s agent_id=%s error=%s", - gateway.id, - agent.id, - str(exc), - ) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Unexpected error {action}ing gateway provisioning.", - ) from exc - - mark_provision_complete(agent, status="online") - await self.add_commit_refresh(agent) - + raise self.logger.info( "gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s", gateway.id, - agent.id, + provisioned.id, action, ) - return agent + return provisioned async def ensure_main_agent( self, diff --git a/backend/app/services/openclaw/constants.py b/backend/app/services/openclaw/constants.py index e7c3e433..48816537 100644 --- a/backend/app/services/openclaw/constants.py +++ b/backend/app/services/openclaw/constants.py @@ -18,6 +18,11 @@ DEFAULT_HEARTBEAT_CONFIG: dict[str, Any] = { } OFFLINE_AFTER = timedelta(minutes=10) +# Provisioning convergence policy: +# - require first heartbeat/check-in within 30s of wake +# - allow up to 3 wake attempts before giving up +CHECKIN_DEADLINE_AFTER_WAKE = timedelta(seconds=30) +MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN = 3 AGENT_SESSION_PREFIX = "agent" DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY: dict[str, bool] = { diff --git a/backend/app/services/openclaw/lifecycle_orchestrator.py b/backend/app/services/openclaw/lifecycle_orchestrator.py new file mode 100644 index 00000000..64bc9466 --- /dev/null +++ b/backend/app/services/openclaw/lifecycle_orchestrator.py @@ -0,0 +1,167 @@ +"""Unified agent lifecycle orchestration. + +This module centralizes DB-backed lifecycle transitions so call sites do not +duplicate provisioning/wake/state logic. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from uuid import UUID + +from fastapi import HTTPException, status +from sqlmodel import col, select + +from app.core.time import utcnow +from app.models.agents import Agent +from app.models.boards import Board +from app.models.gateways import Gateway +from app.services.openclaw.constants import CHECKIN_DEADLINE_AFTER_WAKE +from app.services.openclaw.db_agent_state import ( + mark_provision_complete, + mark_provision_requested, + mint_agent_token, +) +from app.services.openclaw.db_service import OpenClawDBService +from app.services.openclaw.gateway_rpc import OpenClawGatewayError +from app.services.openclaw.lifecycle_queue import ( + QueuedAgentLifecycleReconcile, + enqueue_lifecycle_reconcile, +) +from app.services.openclaw.provisioning import OpenClawGatewayProvisioner +from app.services.organizations import get_org_owner_user + +if TYPE_CHECKING: + from sqlmodel.ext.asyncio.session import AsyncSession + + from app.models.users import User + + +class AgentLifecycleOrchestrator(OpenClawDBService): + """Single lifecycle writer for agent provision/update transitions.""" + + def __init__(self, session: AsyncSession) -> None: + super().__init__(session) + + async def _lock_agent(self, *, agent_id: UUID) -> Agent: + statement = select(Agent).where(col(Agent.id) == agent_id).with_for_update() + agent = (await self.session.exec(statement)).first() + if agent is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent not found") + return agent + + async def run_lifecycle( + self, + *, + gateway: Gateway, + agent_id: UUID, + board: Board | None, + user: User | None, + action: str, + auth_token: str | None = None, + force_bootstrap: bool = False, + reset_session: bool = False, + wake: bool = True, + deliver_wakeup: bool = True, + wakeup_verb: str | None = None, + clear_confirm_token: bool = False, + raise_gateway_errors: bool = True, + ) -> Agent: + """Provision or update any agent under a per-agent lock.""" + + locked = await self._lock_agent(agent_id=agent_id) + template_user = user + if board is None and template_user is None: + template_user = await get_org_owner_user( + self.session, + organization_id=gateway.organization_id, + ) + if template_user is None: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, + detail=( + "Organization owner not found " + "(required for gateway agent USER.md rendering)." + ), + ) + + raw_token = auth_token or mint_agent_token(locked) + mark_provision_requested( + locked, + action=action, + status="updating" if action == "update" else "provisioning", + ) + locked.lifecycle_generation += 1 + locked.last_provision_error = None + locked.checkin_deadline_at = utcnow() + CHECKIN_DEADLINE_AFTER_WAKE if wake else None + if wake: + locked.wake_attempts += 1 + locked.last_wake_sent_at = utcnow() + self.session.add(locked) + await self.session.flush() + + if not gateway.url: + await self.session.commit() + await self.session.refresh(locked) + return locked + + try: + await OpenClawGatewayProvisioner().apply_agent_lifecycle( + agent=locked, + gateway=gateway, + board=board, + auth_token=raw_token, + user=template_user, + action=action, + force_bootstrap=force_bootstrap, + reset_session=reset_session, + wake=wake, + deliver_wakeup=deliver_wakeup, + wakeup_verb=wakeup_verb, + ) + except OpenClawGatewayError as exc: + locked.last_provision_error = str(exc) + locked.updated_at = utcnow() + self.session.add(locked) + await self.session.commit() + await self.session.refresh(locked) + if raise_gateway_errors: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"Gateway {action} failed: {exc}", + ) from exc + return locked + except (OSError, RuntimeError, ValueError) as exc: + locked.last_provision_error = str(exc) + locked.updated_at = utcnow() + self.session.add(locked) + await self.session.commit() + await self.session.refresh(locked) + if raise_gateway_errors: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Unexpected error {action}ing gateway provisioning.", + ) from exc + return locked + + mark_provision_complete( + locked, + status="online", + clear_confirm_token=clear_confirm_token, + ) + locked.last_provision_error = None + locked.checkin_deadline_at = utcnow() + CHECKIN_DEADLINE_AFTER_WAKE if wake else None + self.session.add(locked) + await self.session.commit() + await self.session.refresh(locked) + if wake and locked.checkin_deadline_at is not None: + enqueue_lifecycle_reconcile( + QueuedAgentLifecycleReconcile( + agent_id=locked.id, + gateway_id=locked.gateway_id, + board_id=locked.board_id, + generation=locked.lifecycle_generation, + checkin_deadline_at=locked.checkin_deadline_at, + ) + ) + return locked diff --git a/backend/app/services/openclaw/lifecycle_queue.py b/backend/app/services/openclaw/lifecycle_queue.py new file mode 100644 index 00000000..3a7f2b54 --- /dev/null +++ b/backend/app/services/openclaw/lifecycle_queue.py @@ -0,0 +1,122 @@ +"""Queue payload helpers for stuck-agent lifecycle reconciliation.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Any +from uuid import UUID + +from app.core.config import settings +from app.core.logging import get_logger +from app.core.time import utcnow +from app.services.queue import QueuedTask, enqueue_task_with_delay +from app.services.queue import requeue_if_failed as generic_requeue_if_failed + +logger = get_logger(__name__) +TASK_TYPE = "agent_lifecycle_reconcile" + + +@dataclass(frozen=True) +class QueuedAgentLifecycleReconcile: + """Queued payload metadata for lifecycle reconciliation checks.""" + + agent_id: UUID + gateway_id: UUID + board_id: UUID | None + generation: int + checkin_deadline_at: datetime + attempts: int = 0 + + +def _task_from_payload(payload: QueuedAgentLifecycleReconcile) -> QueuedTask: + return QueuedTask( + task_type=TASK_TYPE, + payload={ + "agent_id": str(payload.agent_id), + "gateway_id": str(payload.gateway_id), + "board_id": str(payload.board_id) if payload.board_id is not None else None, + "generation": payload.generation, + "checkin_deadline_at": payload.checkin_deadline_at.isoformat(), + }, + created_at=utcnow(), + attempts=payload.attempts, + ) + + +def decode_lifecycle_task(task: QueuedTask) -> QueuedAgentLifecycleReconcile: + if task.task_type not in {TASK_TYPE, "legacy"}: + raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}") + payload: dict[str, Any] = task.payload + raw_board_id = payload.get("board_id") + board_id = UUID(raw_board_id) if isinstance(raw_board_id, str) and raw_board_id else None + raw_deadline = payload.get("checkin_deadline_at") + if not isinstance(raw_deadline, str): + raise ValueError("checkin_deadline_at is required") + return QueuedAgentLifecycleReconcile( + agent_id=UUID(str(payload["agent_id"])), + gateway_id=UUID(str(payload["gateway_id"])), + board_id=board_id, + generation=int(payload["generation"]), + checkin_deadline_at=datetime.fromisoformat(raw_deadline), + attempts=int(payload.get("attempts", task.attempts)), + ) + + +def enqueue_lifecycle_reconcile(payload: QueuedAgentLifecycleReconcile) -> bool: + """Enqueue a delayed reconcile check keyed to the expected check-in deadline.""" + now = utcnow() + delay_seconds = max(0.0, (payload.checkin_deadline_at - now).total_seconds()) + queued = _task_from_payload(payload) + ok = enqueue_task_with_delay( + queued, + settings.rq_queue_name, + delay_seconds=delay_seconds, + redis_url=settings.rq_redis_url, + ) + if ok: + logger.info( + "lifecycle.queue.enqueued", + extra={ + "agent_id": str(payload.agent_id), + "generation": payload.generation, + "delay_seconds": delay_seconds, + "attempt": payload.attempts, + }, + ) + return ok + + +def defer_lifecycle_reconcile( + task: QueuedTask, + *, + delay_seconds: float, +) -> bool: + """Defer a reconcile task without incrementing retry attempts.""" + payload = decode_lifecycle_task(task) + deferred = QueuedAgentLifecycleReconcile( + agent_id=payload.agent_id, + gateway_id=payload.gateway_id, + board_id=payload.board_id, + generation=payload.generation, + checkin_deadline_at=payload.checkin_deadline_at, + attempts=task.attempts, + ) + queued = _task_from_payload(deferred) + return enqueue_task_with_delay( + queued, + settings.rq_queue_name, + delay_seconds=max(0.0, delay_seconds), + redis_url=settings.rq_redis_url, + ) + + +def requeue_lifecycle_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool: + """Requeue a failed lifecycle task with capped retries.""" + return generic_requeue_if_failed( + task, + settings.rq_queue_name, + max_retries=settings.rq_dispatch_max_retries, + redis_url=settings.rq_redis_url, + delay_seconds=max(0.0, delay_seconds), + ) diff --git a/backend/app/services/openclaw/lifecycle_reconcile.py b/backend/app/services/openclaw/lifecycle_reconcile.py new file mode 100644 index 00000000..6d1aebeb --- /dev/null +++ b/backend/app/services/openclaw/lifecycle_reconcile.py @@ -0,0 +1,140 @@ +"""Worker handlers for lifecycle reconciliation tasks.""" + +from __future__ import annotations + +import asyncio + +from app.core.logging import get_logger +from app.core.time import utcnow +from app.db.session import async_session_maker +from app.models.agents import Agent +from app.models.boards import Board +from app.models.gateways import Gateway +from app.services.openclaw.constants import MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN +from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator +from app.services.openclaw.lifecycle_queue import decode_lifecycle_task, defer_lifecycle_reconcile +from app.services.queue import QueuedTask + +logger = get_logger(__name__) +_RECONCILE_TIMEOUT_SECONDS = 60.0 + + +def _has_checked_in_since_wake(agent: Agent) -> bool: + if agent.last_seen_at is None: + return False + if agent.last_wake_sent_at is None: + return True + return agent.last_seen_at >= agent.last_wake_sent_at + + +async def process_lifecycle_queue_task(task: QueuedTask) -> None: + """Re-run lifecycle provisioning when an agent misses post-provision check-in.""" + payload = decode_lifecycle_task(task) + now = utcnow() + + async with async_session_maker() as session: + agent = await Agent.objects.by_id(payload.agent_id).first(session) + if agent is None: + logger.info( + "lifecycle.reconcile.skip_missing_agent", + extra={"agent_id": str(payload.agent_id)}, + ) + return + + # Ignore stale queue messages after a newer lifecycle generation. + if agent.lifecycle_generation != payload.generation: + logger.info( + "lifecycle.reconcile.skip_stale_generation", + extra={ + "agent_id": str(agent.id), + "queued_generation": payload.generation, + "current_generation": agent.lifecycle_generation, + }, + ) + return + + if _has_checked_in_since_wake(agent): + logger.info( + "lifecycle.reconcile.skip_not_stuck", + extra={"agent_id": str(agent.id), "status": agent.status}, + ) + return + + deadline = agent.checkin_deadline_at or payload.checkin_deadline_at + if agent.status == "deleting": + logger.info( + "lifecycle.reconcile.skip_deleting", + extra={"agent_id": str(agent.id)}, + ) + return + + if now < deadline: + delay = max(0.0, (deadline - now).total_seconds()) + if not defer_lifecycle_reconcile(task, delay_seconds=delay): + msg = "Failed to defer lifecycle reconcile task" + raise RuntimeError(msg) + logger.info( + "lifecycle.reconcile.deferred", + extra={"agent_id": str(agent.id), "delay_seconds": delay}, + ) + return + + if agent.wake_attempts >= MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN: + agent.status = "offline" + agent.checkin_deadline_at = None + agent.last_provision_error = ( + "Agent did not check in after wake; max wake attempts reached" + ) + agent.updated_at = utcnow() + session.add(agent) + await session.commit() + logger.warning( + "lifecycle.reconcile.max_attempts_reached", + extra={ + "agent_id": str(agent.id), + "wake_attempts": agent.wake_attempts, + "max_attempts": MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN, + }, + ) + return + + gateway = await Gateway.objects.by_id(agent.gateway_id).first(session) + if gateway is None: + logger.warning( + "lifecycle.reconcile.skip_missing_gateway", + extra={"agent_id": str(agent.id), "gateway_id": str(agent.gateway_id)}, + ) + return + board: Board | None = None + if agent.board_id is not None: + board = await Board.objects.by_id(agent.board_id).first(session) + if board is None: + logger.warning( + "lifecycle.reconcile.skip_missing_board", + extra={"agent_id": str(agent.id), "board_id": str(agent.board_id)}, + ) + return + + orchestrator = AgentLifecycleOrchestrator(session) + await asyncio.wait_for( + orchestrator.run_lifecycle( + gateway=gateway, + agent_id=agent.id, + board=board, + user=None, + action="update", + auth_token=None, + force_bootstrap=False, + reset_session=True, + wake=True, + deliver_wakeup=True, + wakeup_verb="updated", + clear_confirm_token=True, + raise_gateway_errors=True, + ), + timeout=_RECONCILE_TIMEOUT_SECONDS, + ) + logger.info( + "lifecycle.reconcile.retriggered", + extra={"agent_id": str(agent.id), "generation": payload.generation}, + ) diff --git a/backend/app/services/openclaw/provisioning_db.py b/backend/app/services/openclaw/provisioning_db.py index 90c50da7..15954a56 100644 --- a/backend/app/services/openclaw/provisioning_db.py +++ b/backend/app/services/openclaw/provisioning_db.py @@ -52,8 +52,6 @@ from app.services.openclaw.constants import ( OFFLINE_AFTER, ) from app.services.openclaw.db_agent_state import ( - mark_provision_complete, - mark_provision_requested, mint_agent_token, ) from app.services.openclaw.db_service import OpenClawDBService @@ -74,6 +72,7 @@ from app.services.openclaw.internal.session_keys import ( board_agent_session_key, board_lead_session_key, ) +from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator from app.services.openclaw.policies import OpenClawAuthorizationPolicy from app.services.openclaw.provisioning import ( OpenClawGatewayControlPlane, @@ -143,7 +142,6 @@ class OpenClawProvisioningService(OpenClawDBService): def __init__(self, session: AsyncSession) -> None: super().__init__(session) - self._gateway = OpenClawGatewayProvisioner() @staticmethod def lead_session_key(board: Board) -> str: @@ -213,25 +211,25 @@ class OpenClawProvisioningService(OpenClawDBService): openclaw_session_id=self.lead_session_key(board), ) raw_token = mint_agent_token(agent) - mark_provision_requested(agent, action=config_options.action, status="provisioning") await self.add_commit_refresh(agent) # Strict behavior: provisioning errors surface to the caller. The DB row exists # so a later retry can succeed with the same deterministic identity/session key. - await self._gateway.apply_agent_lifecycle( - agent=agent, + agent = await AgentLifecycleOrchestrator(self.session).run_lifecycle( gateway=request.gateway, + agent_id=agent.id, board=board, - auth_token=raw_token, user=request.user, action=config_options.action, + auth_token=raw_token, + force_bootstrap=False, + reset_session=False, wake=True, deliver_wakeup=True, + wakeup_verb=None, + clear_confirm_token=False, + raise_gateway_errors=True, ) - - mark_provision_complete(agent, status="online") - await self.add_commit_refresh(agent) - return agent, True async def sync_gateway_templates( @@ -298,7 +296,6 @@ class OpenClawProvisioningService(OpenClawDBService): control_plane=control_plane, backoff=GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"), options=options, - provisioner=self._gateway, ) if not await _ping_gateway(ctx, result): return result @@ -352,7 +349,6 @@ class _SyncContext: control_plane: OpenClawGatewayControlPlane backoff: GatewayBackoff options: GatewayTemplateSyncOptions - provisioner: OpenClawGatewayProvisioner def _parse_tools_md(content: str) -> dict[str, str]: @@ -584,18 +580,26 @@ async def _sync_one_agent( try: async def _do_provision() -> bool: - await ctx.provisioner.apply_agent_lifecycle( - agent=agent, - gateway=ctx.gateway, - board=board, - auth_token=auth_token, - user=ctx.options.user, - action="update", - force_bootstrap=ctx.options.force_bootstrap, - overwrite=ctx.options.overwrite, - reset_session=ctx.options.reset_sessions, - wake=False, - ) + try: + await AgentLifecycleOrchestrator(ctx.session).run_lifecycle( + gateway=ctx.gateway, + agent_id=agent.id, + board=board, + user=ctx.options.user, + action="update", + auth_token=auth_token, + force_bootstrap=ctx.options.force_bootstrap, + reset_session=ctx.options.reset_sessions, + wake=False, + deliver_wakeup=False, + wakeup_verb="updated", + clear_confirm_token=False, + raise_gateway_errors=True, + ) + except HTTPException as exc: + if exc.status_code == status.HTTP_502_BAD_GATEWAY: + raise OpenClawGatewayError(str(exc.detail)) from exc + raise return True await ctx.backoff.run(_do_provision) @@ -613,6 +617,15 @@ async def _sync_one_agent( message=f"Failed to sync templates: {exc}", ) return False + except HTTPException as exc: + result.agents_skipped += 1 + _append_sync_error( + result, + agent=agent, + board=board, + message=f"Failed to sync templates: {exc.detail}", + ) + return False else: return False @@ -655,18 +668,26 @@ async def _sync_main_agent( try: async def _do_provision_main() -> bool: - await ctx.provisioner.apply_agent_lifecycle( - agent=main_agent, - gateway=ctx.gateway, - board=None, - auth_token=token, - user=ctx.options.user, - action="update", - force_bootstrap=ctx.options.force_bootstrap, - overwrite=ctx.options.overwrite, - reset_session=ctx.options.reset_sessions, - wake=False, - ) + try: + await AgentLifecycleOrchestrator(ctx.session).run_lifecycle( + gateway=ctx.gateway, + agent_id=main_agent.id, + board=None, + user=ctx.options.user, + action="update", + auth_token=token, + force_bootstrap=ctx.options.force_bootstrap, + reset_session=ctx.options.reset_sessions, + wake=False, + deliver_wakeup=False, + wakeup_verb="updated", + clear_confirm_token=False, + raise_gateway_errors=True, + ) + except HTTPException as exc: + if exc.status_code == status.HTTP_502_BAD_GATEWAY: + raise OpenClawGatewayError(str(exc.detail)) from exc + raise return True await ctx.backoff.run(_do_provision_main) @@ -679,6 +700,12 @@ async def _sync_main_agent( agent=main_agent, message=f"Failed to sync gateway agent templates: {exc}", ) + except HTTPException as exc: + _append_sync_error( + result, + agent=main_agent, + message=f"Failed to sync gateway agent templates: {exc.detail}", + ) else: result.main_updated = True return stop_sync @@ -1038,7 +1065,6 @@ class AgentLifecycleService(OpenClawDBService): ) -> tuple[Agent, str]: agent = Agent.model_validate(data) raw_token = mint_agent_token(agent) - mark_provision_requested(agent, action="provision", status="provisioning") agent.openclaw_session_id = self.resolve_session_key(agent) await self.add_commit_refresh(agent) return agent, raw_token @@ -1068,92 +1094,63 @@ class AgentLifecycleService(OpenClawDBService): status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="board is required for non-main agent provisioning", ) - template_user = user - if target.is_main_agent and template_user is None: - template_user = await get_org_owner_user( - self.session, - organization_id=target.gateway.organization_id, - ) - if template_user is None: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, - detail=( - "User context is required to provision the gateway main agent " - "(org owner not found)." - ), - ) - await OpenClawGatewayProvisioner().apply_agent_lifecycle( - agent=agent, + provisioned = await AgentLifecycleOrchestrator(self.session).run_lifecycle( gateway=target.gateway, + agent_id=agent.id, board=target.board if not target.is_main_agent else None, - auth_token=auth_token, - user=template_user, + user=user, action=action, + auth_token=auth_token, force_bootstrap=force_bootstrap, reset_session=True, wake=True, deliver_wakeup=True, wakeup_verb=wakeup_verb, + clear_confirm_token=True, + raise_gateway_errors=raise_gateway_errors, ) - mark_provision_complete(agent, status="online", clear_confirm_token=True) - self.session.add(agent) - await self.session.commit() record_activity( self.session, event_type=f"agent.{action}.direct", - message=f"{action.capitalize()}d directly for {agent.name}.", - agent_id=agent.id, + message=f"{action.capitalize()}d directly for {provisioned.name}.", + agent_id=provisioned.id, ) record_activity( self.session, event_type="agent.wakeup.sent", - message=f"Wakeup message sent to {agent.name}.", - agent_id=agent.id, + message=f"Wakeup message sent to {provisioned.name}.", + agent_id=provisioned.id, ) await self.session.commit() self.logger.info( "agent.provision.success action=%s agent_id=%s", action, - agent.id, + provisioned.id, ) - except OpenClawGatewayError as exc: + except HTTPException as exc: self.record_instruction_failure( self.session, agent, - str(exc), + str(exc.detail), action, ) await self.session.commit() - self.logger.error( - "agent.provision.gateway_error action=%s agent_id=%s error=%s", - action, - agent.id, - str(exc), - ) + if exc.status_code == status.HTTP_502_BAD_GATEWAY: + self.logger.error( + "agent.provision.gateway_error action=%s agent_id=%s error=%s", + action, + agent.id, + str(exc.detail), + ) + else: + self.logger.critical( + "agent.provision.runtime_error action=%s agent_id=%s error=%s", + action, + agent.id, + str(exc.detail), + ) if raise_gateway_errors: - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail=f"Gateway {action} failed: {exc}", - ) from exc - except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover - self.record_instruction_failure( - self.session, - agent, - str(exc), - action, - ) - await self.session.commit() - self.logger.critical( - "agent.provision.runtime_error action=%s agent_id=%s error=%s", - action, - agent.id, - str(exc), - ) - if raise_gateway_errors: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Unexpected error {action}ing agent provisioning.", - ) from exc + raise async def provision_new_agent( self, @@ -1315,7 +1312,6 @@ class AgentLifecycleService(OpenClawDBService): @staticmethod def mark_agent_update_pending(agent: Agent) -> str: raw_token = mint_agent_token(agent) - mark_provision_requested(agent, action="update", status="updating") return raw_token async def provision_updated_agent( @@ -1390,7 +1386,6 @@ class AgentLifecycleService(OpenClawDBService): return raw_token = mint_agent_token(agent) - mark_provision_requested(agent, action="provision", status="provisioning") await self.add_commit_refresh(agent) board = await self.require_board( str(agent.board_id) if agent.board_id else None, @@ -1436,6 +1431,10 @@ class AgentLifecycleService(OpenClawDBService): elif agent.status == "provisioning": agent.status = "online" agent.last_seen_at = utcnow() + # Successful check-in ends the current wake escalation cycle. + agent.wake_attempts = 0 + agent.checkin_deadline_at = None + agent.last_provision_error = None agent.updated_at = utcnow() self.record_heartbeat(self.session, agent) self.session.add(agent) diff --git a/backend/app/services/queue.py b/backend/app/services/queue.py index 04529763..e0d6a1a1 100644 --- a/backend/app/services/queue.py +++ b/backend/app/services/queue.py @@ -150,6 +150,32 @@ def enqueue_task( return False +def enqueue_task_with_delay( + task: QueuedTask, + queue_name: str, + *, + delay_seconds: float, + redis_url: str | None = None, +) -> bool: + """Enqueue a task immediately or schedule it for delayed delivery.""" + delay = max(0.0, float(delay_seconds)) + if delay == 0: + return enqueue_task(task, queue_name, redis_url=redis_url) + try: + return _schedule_for_later(task, queue_name, delay, redis_url=redis_url) + except Exception as exc: + logger.warning( + "rq.queue.schedule_failed", + extra={ + "task_type": task.task_type, + "queue_name": queue_name, + "delay_seconds": delay, + "error": str(exc), + }, + ) + return False + + def _coerce_datetime(raw: object | None) -> datetime: if raw is None: return datetime.now(UTC) diff --git a/backend/app/services/queue_worker.py b/backend/app/services/queue_worker.py index c8761f7d..61eb0332 100644 --- a/backend/app/services/queue_worker.py +++ b/backend/app/services/queue_worker.py @@ -9,6 +9,11 @@ from dataclasses import dataclass from app.core.config import settings from app.core.logging import get_logger +from app.services.openclaw.lifecycle_queue import TASK_TYPE as LIFECYCLE_RECONCILE_TASK_TYPE +from app.services.openclaw.lifecycle_queue import ( + requeue_lifecycle_queue_task, +) +from app.services.openclaw.lifecycle_reconcile import process_lifecycle_queue_task from app.services.queue import QueuedTask, dequeue_task from app.services.webhooks.dispatch import ( process_webhook_queue_task, @@ -17,6 +22,7 @@ from app.services.webhooks.dispatch import ( from app.services.webhooks.queue import TASK_TYPE as WEBHOOK_TASK_TYPE logger = get_logger(__name__) +_WORKER_BLOCK_TIMEOUT_SECONDS = 5.0 @dataclass(frozen=True) @@ -27,6 +33,14 @@ class _TaskHandler: _TASK_HANDLERS: dict[str, _TaskHandler] = { + LIFECYCLE_RECONCILE_TASK_TYPE: _TaskHandler( + handler=process_lifecycle_queue_task, + attempts_to_delay=lambda attempts: min( + settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)), + settings.rq_dispatch_retry_max_seconds, + ), + requeue=lambda task, delay: requeue_lifecycle_queue_task(task, delay_seconds=delay), + ), WEBHOOK_TASK_TYPE: _TaskHandler( handler=process_webhook_queue_task, attempts_to_delay=lambda attempts: min( @@ -115,7 +129,8 @@ async def _run_worker_loop() -> None: try: await flush_queue( block=True, - block_timeout=0, + # Keep a finite timeout so scheduled tasks are periodically drained. + block_timeout=_WORKER_BLOCK_TIMEOUT_SECONDS, ) except Exception: logger.exception( diff --git a/backend/migrations/versions/e3a1b2c4d5f6_add_agent_lifecycle_metadata_columns.py b/backend/migrations/versions/e3a1b2c4d5f6_add_agent_lifecycle_metadata_columns.py new file mode 100644 index 00000000..a179fcbb --- /dev/null +++ b/backend/migrations/versions/e3a1b2c4d5f6_add_agent_lifecycle_metadata_columns.py @@ -0,0 +1,45 @@ +"""Add agent lifecycle metadata columns. + +Revision ID: e3a1b2c4d5f6 +Revises: b497b348ebb4 +Create Date: 2026-02-24 00:00:00.000000 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "e3a1b2c4d5f6" +down_revision = "b497b348ebb4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Add lifecycle generation, wake tracking, and failure metadata.""" + op.add_column( + "agents", + sa.Column("lifecycle_generation", sa.Integer(), nullable=False, server_default="0"), + ) + op.add_column( + "agents", + sa.Column("wake_attempts", sa.Integer(), nullable=False, server_default="0"), + ) + op.add_column("agents", sa.Column("last_wake_sent_at", sa.DateTime(), nullable=True)) + op.add_column("agents", sa.Column("checkin_deadline_at", sa.DateTime(), nullable=True)) + op.add_column("agents", sa.Column("last_provision_error", sa.Text(), nullable=True)) + op.alter_column("agents", "lifecycle_generation", server_default=None) + op.alter_column("agents", "wake_attempts", server_default=None) + + +def downgrade() -> None: + """Remove lifecycle generation, wake tracking, and failure metadata.""" + op.drop_column("agents", "last_provision_error") + op.drop_column("agents", "checkin_deadline_at") + op.drop_column("agents", "last_wake_sent_at") + op.drop_column("agents", "wake_attempts") + op.drop_column("agents", "lifecycle_generation") + diff --git a/backend/pyproject.toml b/backend/pyproject.toml index fcd9af8a..46261a6b 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -14,7 +14,7 @@ requires-python = ">=3.12" dependencies = [ "alembic==1.18.3", "clerk-backend-api==4.2.0", - "fastapi==0.128.6", + "fastapi==0.131.0", "fastapi-pagination==0.15.10", "jinja2==3.1.6", "psycopg[binary]==3.3.2", diff --git a/backend/templates/BOARD_BOOTSTRAP.md.j2 b/backend/templates/BOARD_BOOTSTRAP.md.j2 index ca847b26..63f2fd49 100644 --- a/backend/templates/BOARD_BOOTSTRAP.md.j2 +++ b/backend/templates/BOARD_BOOTSTRAP.md.j2 @@ -35,12 +35,19 @@ curl -fsS "{{ base_url }}/healthz" >/dev/null 5) Ensure today's daily file exists: `memory/YYYY-MM-DD.md`. {% if is_lead %} -6) Initialize current delivery status in `MEMORY.md`: +6) Immediately check in to Mission Control (do this before any task orchestration): + +```bash +curl -s -X POST "{{ base_url }}/api/v1/agent/heartbeat" \ + -H "X-Agent-Token: {{ auth_token }}" +``` + +7) Initialize current delivery status in `MEMORY.md`: - set objective if missing - set state to `Working` (or `Waiting` if external dependency exists) - set one concrete next step -7) Add one line to `MEMORY.md` noting bootstrap completion date. +8) Add one line to `MEMORY.md` noting bootstrap completion date. {% else %} 6) If any fields are blank, leave them blank. Do not invent values. diff --git a/backend/templates/BOARD_HEARTBEAT.md.j2 b/backend/templates/BOARD_HEARTBEAT.md.j2 index 3a56483e..2c6ab234 100644 --- a/backend/templates/BOARD_HEARTBEAT.md.j2 +++ b/backend/templates/BOARD_HEARTBEAT.md.j2 @@ -35,12 +35,20 @@ jq -r ' ## Schedule - If a heartbeat schedule is configured, send a lightweight check-in only. +- On first cycle after wake/bootstrap, run heartbeat check-in immediately (do not wait for cadence). - Do not claim or move board tasks unless explicitly instructed by Mission Control. - If you have any pending `LEAD REQUEST: ASK USER` messages in OpenClaw chat, handle them promptly (see AGENTS.md). ## Heartbeat checklist -1) Check in: +1) Check in immediately: - Use the `agent-main` heartbeat endpoint (`POST /api/v1/agent/heartbeat`). +- Startup check-in example: + +```bash +curl -s -X POST "{{ base_url }}/api/v1/agent/heartbeat" \ + -H "X-Agent-Token: {{ auth_token }}" +``` + - If check-in fails due to 5xx/network, stop and retry next heartbeat. - During that failure window, do **not** write memory updates (`MEMORY.md`, daily memory files). @@ -117,6 +125,7 @@ jq -r ' ## Schedule - Heartbeat cadence is controlled by gateway heartbeat config. +- On first cycle after wake/bootstrap, run heartbeat check-in immediately (do not wait for cadence). - Keep cadence conservative unless there is a clear latency need. ## Non-Negotiable Rules diff --git a/backend/tests/test_gateway_version_compat.py b/backend/tests/test_gateway_version_compat.py index a33a84ea..5e6039d1 100644 --- a/backend/tests/test_gateway_version_compat.py +++ b/backend/tests/test_gateway_version_compat.py @@ -47,7 +47,9 @@ def test_extract_config_last_touched_version_reads_config_meta_last_touched_vers assert gateway_compat.extract_config_last_touched_version(payload) == "2026.2.9" -def test_extract_config_last_touched_version_returns_none_without_config_meta_last_touched_version() -> None: +def test_extract_config_last_touched_version_returns_none_without_config_meta_last_touched_version() -> ( + None +): payload = { "config": {"wizard": {"lastRunVersion": "2026.2.9"}}, } diff --git a/backend/tests/test_lifecycle_reconcile_queue.py b/backend/tests/test_lifecycle_reconcile_queue.py new file mode 100644 index 00000000..ab181cbc --- /dev/null +++ b/backend/tests/test_lifecycle_reconcile_queue.py @@ -0,0 +1,126 @@ +# ruff: noqa: INP001 +"""Queue payload helpers for lifecycle reconcile tasks.""" + +from __future__ import annotations + +from datetime import timedelta +from uuid import uuid4 + +import pytest + +from app.core.time import utcnow +from app.services.openclaw.lifecycle_queue import ( + QueuedAgentLifecycleReconcile, + decode_lifecycle_task, + defer_lifecycle_reconcile, + enqueue_lifecycle_reconcile, +) +from app.services.queue import QueuedTask + + +def test_enqueue_lifecycle_reconcile_uses_delayed_enqueue( + monkeypatch: pytest.MonkeyPatch, +) -> None: + captured: dict[str, object] = {} + + def _fake_enqueue_with_delay( + task: QueuedTask, + queue_name: str, + *, + delay_seconds: float, + redis_url: str | None = None, + ) -> bool: + captured["task"] = task + captured["queue_name"] = queue_name + captured["delay_seconds"] = delay_seconds + captured["redis_url"] = redis_url + return True + + monkeypatch.setattr( + "app.services.openclaw.lifecycle_queue.enqueue_task_with_delay", + _fake_enqueue_with_delay, + ) + + payload = QueuedAgentLifecycleReconcile( + agent_id=uuid4(), + gateway_id=uuid4(), + board_id=uuid4(), + generation=7, + checkin_deadline_at=utcnow() + timedelta(seconds=30), + attempts=0, + ) + + assert enqueue_lifecycle_reconcile(payload) is True + task = captured["task"] + assert isinstance(task, QueuedTask) + assert task.task_type == "agent_lifecycle_reconcile" + assert float(captured["delay_seconds"]) > 0 + + +def test_defer_lifecycle_reconcile_keeps_attempt_count( + monkeypatch: pytest.MonkeyPatch, +) -> None: + captured: dict[str, object] = {} + + def _fake_enqueue_with_delay( + task: QueuedTask, + queue_name: str, + *, + delay_seconds: float, + redis_url: str | None = None, + ) -> bool: + captured["task"] = task + captured["queue_name"] = queue_name + captured["delay_seconds"] = delay_seconds + captured["redis_url"] = redis_url + return True + + monkeypatch.setattr( + "app.services.openclaw.lifecycle_queue.enqueue_task_with_delay", + _fake_enqueue_with_delay, + ) + deadline = utcnow() + timedelta(minutes=1) + task = QueuedTask( + task_type="agent_lifecycle_reconcile", + payload={ + "agent_id": str(uuid4()), + "gateway_id": str(uuid4()), + "board_id": None, + "generation": 3, + "checkin_deadline_at": deadline.isoformat(), + }, + created_at=utcnow(), + attempts=2, + ) + assert defer_lifecycle_reconcile(task, delay_seconds=12) is True + deferred_task = captured["task"] + assert isinstance(deferred_task, QueuedTask) + assert deferred_task.attempts == 2 + assert float(captured["delay_seconds"]) == 12 + + +def test_decode_lifecycle_task_roundtrip() -> None: + deadline = utcnow() + timedelta(minutes=3) + agent_id = uuid4() + gateway_id = uuid4() + board_id = uuid4() + task = QueuedTask( + task_type="agent_lifecycle_reconcile", + payload={ + "agent_id": str(agent_id), + "gateway_id": str(gateway_id), + "board_id": str(board_id), + "generation": 5, + "checkin_deadline_at": deadline.isoformat(), + }, + created_at=utcnow(), + attempts=1, + ) + + decoded = decode_lifecycle_task(task) + assert decoded.agent_id == agent_id + assert decoded.gateway_id == gateway_id + assert decoded.board_id == board_id + assert decoded.generation == 5 + assert decoded.checkin_deadline_at == deadline + assert decoded.attempts == 1 diff --git a/backend/tests/test_lifecycle_reconcile_state.py b/backend/tests/test_lifecycle_reconcile_state.py new file mode 100644 index 00000000..b1c36cba --- /dev/null +++ b/backend/tests/test_lifecycle_reconcile_state.py @@ -0,0 +1,53 @@ +# ruff: noqa: INP001 +"""Lifecycle reconcile state helpers.""" + +from __future__ import annotations + +from datetime import timedelta +from uuid import uuid4 + +from app.core.time import utcnow +from app.models.agents import Agent +from app.services.openclaw.constants import ( + CHECKIN_DEADLINE_AFTER_WAKE, + MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN, +) +from app.services.openclaw.lifecycle_reconcile import _has_checked_in_since_wake + + +def _agent(*, last_seen_offset_s: int | None, last_wake_offset_s: int | None) -> Agent: + now = utcnow() + return Agent( + name="reconcile-test", + gateway_id=uuid4(), + last_seen_at=( + (now + timedelta(seconds=last_seen_offset_s)) + if last_seen_offset_s is not None + else None + ), + last_wake_sent_at=( + (now + timedelta(seconds=last_wake_offset_s)) + if last_wake_offset_s is not None + else None + ), + ) + + +def test_checked_in_since_wake_when_last_seen_after_wake() -> None: + agent = _agent(last_seen_offset_s=5, last_wake_offset_s=0) + assert _has_checked_in_since_wake(agent) is True + + +def test_not_checked_in_since_wake_when_last_seen_before_wake() -> None: + agent = _agent(last_seen_offset_s=-5, last_wake_offset_s=0) + assert _has_checked_in_since_wake(agent) is False + + +def test_not_checked_in_since_wake_when_missing_last_seen() -> None: + agent = _agent(last_seen_offset_s=None, last_wake_offset_s=0) + assert _has_checked_in_since_wake(agent) is False + + +def test_lifecycle_convergence_policy_constants() -> None: + assert CHECKIN_DEADLINE_AFTER_WAKE == timedelta(seconds=30) + assert MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN == 3 diff --git a/backend/tests/test_queue_worker_lifecycle_handler.py b/backend/tests/test_queue_worker_lifecycle_handler.py new file mode 100644 index 00000000..8d4b8740 --- /dev/null +++ b/backend/tests/test_queue_worker_lifecycle_handler.py @@ -0,0 +1,11 @@ +# ruff: noqa: INP001 +"""Queue worker registration tests for lifecycle reconcile tasks.""" + +from __future__ import annotations + +from app.services.openclaw.lifecycle_queue import TASK_TYPE as LIFECYCLE_TASK_TYPE +from app.services.queue_worker import _TASK_HANDLERS + + +def test_worker_registers_lifecycle_reconcile_handler() -> None: + assert LIFECYCLE_TASK_TYPE in _TASK_HANDLERS diff --git a/backend/uv.lock b/backend/uv.lock index 8b82574f..8136c6ad 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -329,7 +329,7 @@ wheels = [ [[package]] name = "fastapi" -version = "0.128.6" +version = "0.131.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "annotated-doc" }, @@ -338,9 +338,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "typing-inspection" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/83/d1/195005b5e45b443e305136df47ee7df4493d782e0c039dd0d97065580324/fastapi-0.128.6.tar.gz", hash = "sha256:0cb3946557e792d731b26a42b04912f16367e3c3135ea8290f620e234f2b604f", size = 374757, upload-time = "2026-02-09T17:27:03.541Z" } +sdist = { url = "https://files.pythonhosted.org/packages/91/32/158cbf685b7d5a26f87131069da286bf10fc9fbf7fc968d169d48a45d689/fastapi-0.131.0.tar.gz", hash = "sha256:6531155e52bee2899a932c746c9a8250f210e3c3303a5f7b9f8a808bfe0548ff", size = 369612, upload-time = "2026-02-22T16:38:11.252Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/24/58/a2c4f6b240eeb148fb88cdac48f50a194aba760c1ca4988c6031c66a20ee/fastapi-0.128.6-py3-none-any.whl", hash = "sha256:bb1c1ef87d6086a7132d0ab60869d6f1ee67283b20fbf84ec0003bd335099509", size = 103674, upload-time = "2026-02-09T17:27:02.355Z" }, + { url = "https://files.pythonhosted.org/packages/ff/94/b58ec24c321acc2ad1327f69b033cadc005e0f26df9a73828c9e9c7db7ce/fastapi-0.131.0-py3-none-any.whl", hash = "sha256:ed0e53decccf4459de78837ce1b867cd04fa9ce4579497b842579755d20b405a", size = 103854, upload-time = "2026-02-22T16:38:09.814Z" }, ] [[package]] @@ -743,7 +743,7 @@ requires-dist = [ { name = "clerk-backend-api", specifier = "==4.2.0" }, { name = "coverage", extras = ["toml"], marker = "extra == 'dev'", specifier = "==7.13.4" }, { name = "cryptography", specifier = "==45.0.7" }, - { name = "fastapi", specifier = "==0.128.6" }, + { name = "fastapi", specifier = "==0.131.0" }, { name = "fastapi-pagination", specifier = "==0.15.10" }, { name = "flake8", marker = "extra == 'dev'", specifier = "==7.3.0" }, { name = "httpx", marker = "extra == 'dev'", specifier = "==0.28.1" }, diff --git a/docs/README.md b/docs/README.md index 378e12b2..99fc18e8 100644 --- a/docs/README.md +++ b/docs/README.md @@ -10,6 +10,7 @@ This folder is the starting point for Mission Control documentation. - [Deployment](./deployment/README.md) - [Production notes](./production/README.md) - [Troubleshooting](./troubleshooting/README.md) +- [Gateway agent provisioning and check-in troubleshooting](./troubleshooting/gateway-agent-provisioning.md) - [Gateway WebSocket protocol](./openclaw_gateway_ws.md) - [OpenClaw baseline configuration](./openclaw_baseline_config.md) diff --git a/docs/troubleshooting/README.md b/docs/troubleshooting/README.md index 1b897489..ad2526ee 100644 --- a/docs/troubleshooting/README.md +++ b/docs/troubleshooting/README.md @@ -1,3 +1,3 @@ # Troubleshooting -Placeholder. +- [Gateway agent provisioning and check-in](./gateway-agent-provisioning.md) diff --git a/docs/troubleshooting/gateway-agent-provisioning.md b/docs/troubleshooting/gateway-agent-provisioning.md new file mode 100644 index 00000000..212cb5de --- /dev/null +++ b/docs/troubleshooting/gateway-agent-provisioning.md @@ -0,0 +1,107 @@ +# Gateway Agent Provisioning and Check-In Troubleshooting + +This guide explains how agent provisioning converges to a healthy state, and how to debug when an agent appears stuck. + +## Fast Convergence Policy + +Mission Control now uses a fast convergence policy for wake/check-in: + +- Check-in deadline after each wake: **30 seconds** +- Maximum wake attempts without check-in: **3** +- If no check-in after the third attempt: agent is marked **offline** and provisioning escalation stops + +This applies to both gateway-main and board agents. + +## Expected Lifecycle + +1. Mission Control provisions/updates the agent and sends wake. +2. A delayed reconcile task is queued for the check-in deadline. +3. Agent should call heartbeat quickly after startup/bootstrap. +4. If heartbeat arrives: + - `last_seen_at` is updated + - wake escalation state is reset (`wake_attempts=0`, check-in deadline cleared) +5. If heartbeat does not arrive by deadline: + - reconcile re-runs lifecycle (wake again) + - up to 3 total wake attempts +6. If still no heartbeat after 3 attempts: + - agent status becomes `offline` + - `last_provision_error` is set + +## Startup Check-In Behavior + +Templates now explicitly require immediate first-cycle check-in: + +- Main agent heartbeat instructions require immediate check-in after wake/bootstrap. +- Board lead bootstrap requires heartbeat check-in before orchestration. +- Board worker bootstrap already included immediate check-in. + +If a gateway still has older templates, run template sync and reprovision/wake. + +## What You Should See in Logs + +Healthy flow usually includes: + +- `lifecycle.queue.enqueued` +- `queue.worker.success` (for lifecycle tasks) +- `lifecycle.reconcile.skip_not_stuck` (after heartbeat lands) + +If agent is not checking in: + +- `lifecycle.reconcile.deferred` (before deadline) +- `lifecycle.reconcile.retriggered` (retry wake) +- `lifecycle.reconcile.max_attempts_reached` (final fail-safe at attempt 3) + +If you do not see lifecycle events at all, verify queue worker health first. + +## Common Failure Modes + +### Wake was sent, but no check-in arrived + +Possible causes: + +- Agent process never started or crashed during bootstrap +- Agent ignored startup instructions due to stale templates +- Heartbeat call failed (network/auth/base URL mismatch) + +Actions: + +1. Confirm current templates were synced to gateway. +2. Re-run provisioning/update to trigger a fresh wake. +3. Verify agent can reach Mission Control API and send heartbeat with `X-Agent-Token`. + +### Agent stays provisioning/updating with no retries + +Possible causes: + +- Queue worker not running +- Queue/Redis mismatch between API process and worker process + +Actions: + +1. Verify worker process is running continuously. +2. Verify `rq_redis_url` and `rq_queue_name` are identical for API and worker. +3. Check worker logs for dequeue/handler errors. + +### Agent ended offline quickly + +This is expected when no check-in is received after 3 wake attempts. The system fails fast by design. + +Actions: + +1. Fix check-in path first (startup, network, token, API reachability). +2. Re-run provisioning/update to start a new attempt cycle. + +## Operator Recovery Checklist + +1. Ensure queue worker is running. +2. Sync templates for the gateway. +3. Trigger agent update/provision from Mission Control. +4. Watch logs for: + - `lifecycle.queue.enqueued` + - `lifecycle.reconcile.retriggered` (if needed) + - heartbeat activity / `skip_not_stuck` +5. If still failing, capture: + - gateway logs around bootstrap + - worker logs around lifecycle events + - agent `last_provision_error`, `wake_attempts`, `last_seen_at` +