feat: implement unified agent lifecycle orchestration and metadata tracking

This commit is contained in:
Abhimanyu Saharan
2026-02-25 00:34:04 +05:30
parent 893e06f579
commit 0795f78eff
18 changed files with 957 additions and 156 deletions

View File

@@ -43,6 +43,11 @@ class Agent(QueryModel, table=True):
delete_requested_at: datetime | None = Field(default=None)
delete_confirm_token_hash: str | None = Field(default=None, index=True)
last_seen_at: datetime | None = Field(default=None)
lifecycle_generation: int = Field(default=0)
wake_attempts: int = Field(default=0)
last_wake_sent_at: datetime | None = Field(default=None)
checkin_deadline_at: datetime | None = Field(default=None)
last_provision_error: str | None = Field(default=None, sa_column=Column(Text))
is_board_lead: bool = Field(default=False, index=True)
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)

View File

@@ -21,24 +21,18 @@ from app.models.gateways import Gateway
from app.models.tasks import Task
from app.schemas.gateways import GatewayTemplatesSyncResult
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.error_messages import normalize_gateway_error_message
from app.services.openclaw.gateway_compat import check_gateway_version_compatibility
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.provisioning_db import (
GatewayTemplateSyncOptions,
OpenClawProvisioningService,
)
from app.services.openclaw.session_service import GatewayTemplateSyncQuery
from app.services.openclaw.shared import GatewayAgentIdentity
from app.services.organizations import get_org_owner_user
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -222,69 +216,38 @@ class GatewayAdminLifecycleService(OpenClawDBService):
action: str,
notify: bool,
) -> Agent:
template_user = user or await get_org_owner_user(
self.session,
organization_id=gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Organization owner not found (required for gateway agent USER.md rendering).",
)
raw_token = mint_agent_token(agent)
mark_provision_requested(
agent,
action=action,
status="updating" if action == "update" else "provisioning",
)
await self.add_commit_refresh(agent)
if not gateway.url:
return agent
orchestrator = AgentLifecycleOrchestrator(self.session)
try:
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=agent,
provisioned = await orchestrator.run_lifecycle(
gateway=gateway,
agent_id=agent.id,
board=None,
auth_token=raw_token,
user=template_user,
user=user,
action=action,
auth_token=None,
force_bootstrap=False,
reset_session=False,
wake=notify,
deliver_wakeup=True,
wakeup_verb=None,
clear_confirm_token=False,
raise_gateway_errors=True,
)
except OpenClawGatewayError as exc:
except HTTPException:
self.logger.error(
"gateway.main_agent.provision_failed_gateway gateway_id=%s agent_id=%s error=%s",
"gateway.main_agent.provision_failed gateway_id=%s agent_id=%s action=%s",
gateway.id,
agent.id,
str(exc),
action,
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
except (OSError, RuntimeError, ValueError) as exc:
self.logger.error(
"gateway.main_agent.provision_failed gateway_id=%s agent_id=%s error=%s",
gateway.id,
agent.id,
str(exc),
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing gateway provisioning.",
) from exc
mark_provision_complete(agent, status="online")
await self.add_commit_refresh(agent)
raise
self.logger.info(
"gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s",
gateway.id,
agent.id,
provisioned.id,
action,
)
return agent
return provisioned
async def ensure_main_agent(
self,

View File

@@ -18,6 +18,11 @@ DEFAULT_HEARTBEAT_CONFIG: dict[str, Any] = {
}
OFFLINE_AFTER = timedelta(minutes=10)
# Provisioning convergence policy:
# - require first heartbeat/check-in within 30s of wake
# - allow up to 3 wake attempts before giving up
CHECKIN_DEADLINE_AFTER_WAKE = timedelta(seconds=30)
MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN = 3
AGENT_SESSION_PREFIX = "agent"
DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY: dict[str, bool] = {

View File

@@ -0,0 +1,167 @@
"""Unified agent lifecycle orchestration.
This module centralizes DB-backed lifecycle transitions so call sites do not
duplicate provisioning/wake/state logic.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import HTTPException, status
from sqlmodel import col, select
from app.core.time import utcnow
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services.openclaw.constants import CHECKIN_DEADLINE_AFTER_WAKE
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.openclaw.lifecycle_queue import (
QueuedAgentLifecycleReconcile,
enqueue_lifecycle_reconcile,
)
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
from app.services.organizations import get_org_owner_user
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.users import User
class AgentLifecycleOrchestrator(OpenClawDBService):
"""Single lifecycle writer for agent provision/update transitions."""
def __init__(self, session: AsyncSession) -> None:
super().__init__(session)
async def _lock_agent(self, *, agent_id: UUID) -> Agent:
statement = select(Agent).where(col(Agent.id) == agent_id).with_for_update()
agent = (await self.session.exec(statement)).first()
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent not found")
return agent
async def run_lifecycle(
self,
*,
gateway: Gateway,
agent_id: UUID,
board: Board | None,
user: User | None,
action: str,
auth_token: str | None = None,
force_bootstrap: bool = False,
reset_session: bool = False,
wake: bool = True,
deliver_wakeup: bool = True,
wakeup_verb: str | None = None,
clear_confirm_token: bool = False,
raise_gateway_errors: bool = True,
) -> Agent:
"""Provision or update any agent under a per-agent lock."""
locked = await self._lock_agent(agent_id=agent_id)
template_user = user
if board is None and template_user is None:
template_user = await get_org_owner_user(
self.session,
organization_id=gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=(
"Organization owner not found "
"(required for gateway agent USER.md rendering)."
),
)
raw_token = auth_token or mint_agent_token(locked)
mark_provision_requested(
locked,
action=action,
status="updating" if action == "update" else "provisioning",
)
locked.lifecycle_generation += 1
locked.last_provision_error = None
locked.checkin_deadline_at = utcnow() + CHECKIN_DEADLINE_AFTER_WAKE if wake else None
if wake:
locked.wake_attempts += 1
locked.last_wake_sent_at = utcnow()
self.session.add(locked)
await self.session.flush()
if not gateway.url:
await self.session.commit()
await self.session.refresh(locked)
return locked
try:
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=locked,
gateway=gateway,
board=board,
auth_token=raw_token,
user=template_user,
action=action,
force_bootstrap=force_bootstrap,
reset_session=reset_session,
wake=wake,
deliver_wakeup=deliver_wakeup,
wakeup_verb=wakeup_verb,
)
except OpenClawGatewayError as exc:
locked.last_provision_error = str(exc)
locked.updated_at = utcnow()
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
return locked
except (OSError, RuntimeError, ValueError) as exc:
locked.last_provision_error = str(exc)
locked.updated_at = utcnow()
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing gateway provisioning.",
) from exc
return locked
mark_provision_complete(
locked,
status="online",
clear_confirm_token=clear_confirm_token,
)
locked.last_provision_error = None
locked.checkin_deadline_at = utcnow() + CHECKIN_DEADLINE_AFTER_WAKE if wake else None
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if wake and locked.checkin_deadline_at is not None:
enqueue_lifecycle_reconcile(
QueuedAgentLifecycleReconcile(
agent_id=locked.id,
gateway_id=locked.gateway_id,
board_id=locked.board_id,
generation=locked.lifecycle_generation,
checkin_deadline_at=locked.checkin_deadline_at,
)
)
return locked

View File

@@ -0,0 +1,122 @@
"""Queue payload helpers for stuck-agent lifecycle reconciliation."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from uuid import UUID
from app.core.config import settings
from app.core.logging import get_logger
from app.core.time import utcnow
from app.services.queue import QueuedTask, enqueue_task_with_delay
from app.services.queue import requeue_if_failed as generic_requeue_if_failed
logger = get_logger(__name__)
TASK_TYPE = "agent_lifecycle_reconcile"
@dataclass(frozen=True)
class QueuedAgentLifecycleReconcile:
"""Queued payload metadata for lifecycle reconciliation checks."""
agent_id: UUID
gateway_id: UUID
board_id: UUID | None
generation: int
checkin_deadline_at: datetime
attempts: int = 0
def _task_from_payload(payload: QueuedAgentLifecycleReconcile) -> QueuedTask:
return QueuedTask(
task_type=TASK_TYPE,
payload={
"agent_id": str(payload.agent_id),
"gateway_id": str(payload.gateway_id),
"board_id": str(payload.board_id) if payload.board_id is not None else None,
"generation": payload.generation,
"checkin_deadline_at": payload.checkin_deadline_at.isoformat(),
},
created_at=utcnow(),
attempts=payload.attempts,
)
def decode_lifecycle_task(task: QueuedTask) -> QueuedAgentLifecycleReconcile:
if task.task_type not in {TASK_TYPE, "legacy"}:
raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}")
payload: dict[str, Any] = task.payload
raw_board_id = payload.get("board_id")
board_id = UUID(raw_board_id) if isinstance(raw_board_id, str) and raw_board_id else None
raw_deadline = payload.get("checkin_deadline_at")
if not isinstance(raw_deadline, str):
raise ValueError("checkin_deadline_at is required")
return QueuedAgentLifecycleReconcile(
agent_id=UUID(str(payload["agent_id"])),
gateway_id=UUID(str(payload["gateway_id"])),
board_id=board_id,
generation=int(payload["generation"]),
checkin_deadline_at=datetime.fromisoformat(raw_deadline),
attempts=int(payload.get("attempts", task.attempts)),
)
def enqueue_lifecycle_reconcile(payload: QueuedAgentLifecycleReconcile) -> bool:
"""Enqueue a delayed reconcile check keyed to the expected check-in deadline."""
now = utcnow()
delay_seconds = max(0.0, (payload.checkin_deadline_at - now).total_seconds())
queued = _task_from_payload(payload)
ok = enqueue_task_with_delay(
queued,
settings.rq_queue_name,
delay_seconds=delay_seconds,
redis_url=settings.rq_redis_url,
)
if ok:
logger.info(
"lifecycle.queue.enqueued",
extra={
"agent_id": str(payload.agent_id),
"generation": payload.generation,
"delay_seconds": delay_seconds,
"attempt": payload.attempts,
},
)
return ok
def defer_lifecycle_reconcile(
task: QueuedTask,
*,
delay_seconds: float,
) -> bool:
"""Defer a reconcile task without incrementing retry attempts."""
payload = decode_lifecycle_task(task)
deferred = QueuedAgentLifecycleReconcile(
agent_id=payload.agent_id,
gateway_id=payload.gateway_id,
board_id=payload.board_id,
generation=payload.generation,
checkin_deadline_at=payload.checkin_deadline_at,
attempts=task.attempts,
)
queued = _task_from_payload(deferred)
return enqueue_task_with_delay(
queued,
settings.rq_queue_name,
delay_seconds=max(0.0, delay_seconds),
redis_url=settings.rq_redis_url,
)
def requeue_lifecycle_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool:
"""Requeue a failed lifecycle task with capped retries."""
return generic_requeue_if_failed(
task,
settings.rq_queue_name,
max_retries=settings.rq_dispatch_max_retries,
redis_url=settings.rq_redis_url,
delay_seconds=max(0.0, delay_seconds),
)

View File

@@ -0,0 +1,140 @@
"""Worker handlers for lifecycle reconciliation tasks."""
from __future__ import annotations
import asyncio
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services.openclaw.constants import MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.lifecycle_queue import decode_lifecycle_task, defer_lifecycle_reconcile
from app.services.queue import QueuedTask
logger = get_logger(__name__)
_RECONCILE_TIMEOUT_SECONDS = 60.0
def _has_checked_in_since_wake(agent: Agent) -> bool:
if agent.last_seen_at is None:
return False
if agent.last_wake_sent_at is None:
return True
return agent.last_seen_at >= agent.last_wake_sent_at
async def process_lifecycle_queue_task(task: QueuedTask) -> None:
"""Re-run lifecycle provisioning when an agent misses post-provision check-in."""
payload = decode_lifecycle_task(task)
now = utcnow()
async with async_session_maker() as session:
agent = await Agent.objects.by_id(payload.agent_id).first(session)
if agent is None:
logger.info(
"lifecycle.reconcile.skip_missing_agent",
extra={"agent_id": str(payload.agent_id)},
)
return
# Ignore stale queue messages after a newer lifecycle generation.
if agent.lifecycle_generation != payload.generation:
logger.info(
"lifecycle.reconcile.skip_stale_generation",
extra={
"agent_id": str(agent.id),
"queued_generation": payload.generation,
"current_generation": agent.lifecycle_generation,
},
)
return
if _has_checked_in_since_wake(agent):
logger.info(
"lifecycle.reconcile.skip_not_stuck",
extra={"agent_id": str(agent.id), "status": agent.status},
)
return
deadline = agent.checkin_deadline_at or payload.checkin_deadline_at
if agent.status == "deleting":
logger.info(
"lifecycle.reconcile.skip_deleting",
extra={"agent_id": str(agent.id)},
)
return
if now < deadline:
delay = max(0.0, (deadline - now).total_seconds())
if not defer_lifecycle_reconcile(task, delay_seconds=delay):
msg = "Failed to defer lifecycle reconcile task"
raise RuntimeError(msg)
logger.info(
"lifecycle.reconcile.deferred",
extra={"agent_id": str(agent.id), "delay_seconds": delay},
)
return
if agent.wake_attempts >= MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN:
agent.status = "offline"
agent.checkin_deadline_at = None
agent.last_provision_error = (
"Agent did not check in after wake; max wake attempts reached"
)
agent.updated_at = utcnow()
session.add(agent)
await session.commit()
logger.warning(
"lifecycle.reconcile.max_attempts_reached",
extra={
"agent_id": str(agent.id),
"wake_attempts": agent.wake_attempts,
"max_attempts": MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN,
},
)
return
gateway = await Gateway.objects.by_id(agent.gateway_id).first(session)
if gateway is None:
logger.warning(
"lifecycle.reconcile.skip_missing_gateway",
extra={"agent_id": str(agent.id), "gateway_id": str(agent.gateway_id)},
)
return
board: Board | None = None
if agent.board_id is not None:
board = await Board.objects.by_id(agent.board_id).first(session)
if board is None:
logger.warning(
"lifecycle.reconcile.skip_missing_board",
extra={"agent_id": str(agent.id), "board_id": str(agent.board_id)},
)
return
orchestrator = AgentLifecycleOrchestrator(session)
await asyncio.wait_for(
orchestrator.run_lifecycle(
gateway=gateway,
agent_id=agent.id,
board=board,
user=None,
action="update",
auth_token=None,
force_bootstrap=False,
reset_session=True,
wake=True,
deliver_wakeup=True,
wakeup_verb="updated",
clear_confirm_token=True,
raise_gateway_errors=True,
),
timeout=_RECONCILE_TIMEOUT_SECONDS,
)
logger.info(
"lifecycle.reconcile.retriggered",
extra={"agent_id": str(agent.id), "generation": payload.generation},
)

View File

@@ -52,8 +52,6 @@ from app.services.openclaw.constants import (
OFFLINE_AFTER,
)
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
@@ -74,6 +72,7 @@ from app.services.openclaw.internal.session_keys import (
board_agent_session_key,
board_lead_session_key,
)
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.policies import OpenClawAuthorizationPolicy
from app.services.openclaw.provisioning import (
OpenClawGatewayControlPlane,
@@ -143,7 +142,6 @@ class OpenClawProvisioningService(OpenClawDBService):
def __init__(self, session: AsyncSession) -> None:
super().__init__(session)
self._gateway = OpenClawGatewayProvisioner()
@staticmethod
def lead_session_key(board: Board) -> str:
@@ -213,25 +211,25 @@ class OpenClawProvisioningService(OpenClawDBService):
openclaw_session_id=self.lead_session_key(board),
)
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action=config_options.action, status="provisioning")
await self.add_commit_refresh(agent)
# Strict behavior: provisioning errors surface to the caller. The DB row exists
# so a later retry can succeed with the same deterministic identity/session key.
await self._gateway.apply_agent_lifecycle(
agent=agent,
agent = await AgentLifecycleOrchestrator(self.session).run_lifecycle(
gateway=request.gateway,
agent_id=agent.id,
board=board,
auth_token=raw_token,
user=request.user,
action=config_options.action,
auth_token=raw_token,
force_bootstrap=False,
reset_session=False,
wake=True,
deliver_wakeup=True,
wakeup_verb=None,
clear_confirm_token=False,
raise_gateway_errors=True,
)
mark_provision_complete(agent, status="online")
await self.add_commit_refresh(agent)
return agent, True
async def sync_gateway_templates(
@@ -298,7 +296,6 @@ class OpenClawProvisioningService(OpenClawDBService):
control_plane=control_plane,
backoff=GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"),
options=options,
provisioner=self._gateway,
)
if not await _ping_gateway(ctx, result):
return result
@@ -352,7 +349,6 @@ class _SyncContext:
control_plane: OpenClawGatewayControlPlane
backoff: GatewayBackoff
options: GatewayTemplateSyncOptions
provisioner: OpenClawGatewayProvisioner
def _parse_tools_md(content: str) -> dict[str, str]:
@@ -584,18 +580,26 @@ async def _sync_one_agent(
try:
async def _do_provision() -> bool:
await ctx.provisioner.apply_agent_lifecycle(
agent=agent,
gateway=ctx.gateway,
board=board,
auth_token=auth_token,
user=ctx.options.user,
action="update",
force_bootstrap=ctx.options.force_bootstrap,
overwrite=ctx.options.overwrite,
reset_session=ctx.options.reset_sessions,
wake=False,
)
try:
await AgentLifecycleOrchestrator(ctx.session).run_lifecycle(
gateway=ctx.gateway,
agent_id=agent.id,
board=board,
user=ctx.options.user,
action="update",
auth_token=auth_token,
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
wake=False,
deliver_wakeup=False,
wakeup_verb="updated",
clear_confirm_token=False,
raise_gateway_errors=True,
)
except HTTPException as exc:
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
raise OpenClawGatewayError(str(exc.detail)) from exc
raise
return True
await ctx.backoff.run(_do_provision)
@@ -613,6 +617,15 @@ async def _sync_one_agent(
message=f"Failed to sync templates: {exc}",
)
return False
except HTTPException as exc:
result.agents_skipped += 1
_append_sync_error(
result,
agent=agent,
board=board,
message=f"Failed to sync templates: {exc.detail}",
)
return False
else:
return False
@@ -655,18 +668,26 @@ async def _sync_main_agent(
try:
async def _do_provision_main() -> bool:
await ctx.provisioner.apply_agent_lifecycle(
agent=main_agent,
gateway=ctx.gateway,
board=None,
auth_token=token,
user=ctx.options.user,
action="update",
force_bootstrap=ctx.options.force_bootstrap,
overwrite=ctx.options.overwrite,
reset_session=ctx.options.reset_sessions,
wake=False,
)
try:
await AgentLifecycleOrchestrator(ctx.session).run_lifecycle(
gateway=ctx.gateway,
agent_id=main_agent.id,
board=None,
user=ctx.options.user,
action="update",
auth_token=token,
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
wake=False,
deliver_wakeup=False,
wakeup_verb="updated",
clear_confirm_token=False,
raise_gateway_errors=True,
)
except HTTPException as exc:
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
raise OpenClawGatewayError(str(exc.detail)) from exc
raise
return True
await ctx.backoff.run(_do_provision_main)
@@ -679,6 +700,12 @@ async def _sync_main_agent(
agent=main_agent,
message=f"Failed to sync gateway agent templates: {exc}",
)
except HTTPException as exc:
_append_sync_error(
result,
agent=main_agent,
message=f"Failed to sync gateway agent templates: {exc.detail}",
)
else:
result.main_updated = True
return stop_sync
@@ -1038,7 +1065,6 @@ class AgentLifecycleService(OpenClawDBService):
) -> tuple[Agent, str]:
agent = Agent.model_validate(data)
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="provision", status="provisioning")
agent.openclaw_session_id = self.resolve_session_key(agent)
await self.add_commit_refresh(agent)
return agent, raw_token
@@ -1068,92 +1094,63 @@ class AgentLifecycleService(OpenClawDBService):
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="board is required for non-main agent provisioning",
)
template_user = user
if target.is_main_agent and template_user is None:
template_user = await get_org_owner_user(
self.session,
organization_id=target.gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=(
"User context is required to provision the gateway main agent "
"(org owner not found)."
),
)
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=agent,
provisioned = await AgentLifecycleOrchestrator(self.session).run_lifecycle(
gateway=target.gateway,
agent_id=agent.id,
board=target.board if not target.is_main_agent else None,
auth_token=auth_token,
user=template_user,
user=user,
action=action,
auth_token=auth_token,
force_bootstrap=force_bootstrap,
reset_session=True,
wake=True,
deliver_wakeup=True,
wakeup_verb=wakeup_verb,
clear_confirm_token=True,
raise_gateway_errors=raise_gateway_errors,
)
mark_provision_complete(agent, status="online", clear_confirm_token=True)
self.session.add(agent)
await self.session.commit()
record_activity(
self.session,
event_type=f"agent.{action}.direct",
message=f"{action.capitalize()}d directly for {agent.name}.",
agent_id=agent.id,
message=f"{action.capitalize()}d directly for {provisioned.name}.",
agent_id=provisioned.id,
)
record_activity(
self.session,
event_type="agent.wakeup.sent",
message=f"Wakeup message sent to {agent.name}.",
agent_id=agent.id,
message=f"Wakeup message sent to {provisioned.name}.",
agent_id=provisioned.id,
)
await self.session.commit()
self.logger.info(
"agent.provision.success action=%s agent_id=%s",
action,
agent.id,
provisioned.id,
)
except OpenClawGatewayError as exc:
except HTTPException as exc:
self.record_instruction_failure(
self.session,
agent,
str(exc),
str(exc.detail),
action,
)
await self.session.commit()
self.logger.error(
"agent.provision.gateway_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc),
)
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
self.logger.error(
"agent.provision.gateway_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc.detail),
)
else:
self.logger.critical(
"agent.provision.runtime_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc.detail),
)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover
self.record_instruction_failure(
self.session,
agent,
str(exc),
action,
)
await self.session.commit()
self.logger.critical(
"agent.provision.runtime_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc),
)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing agent provisioning.",
) from exc
raise
async def provision_new_agent(
self,
@@ -1315,7 +1312,6 @@ class AgentLifecycleService(OpenClawDBService):
@staticmethod
def mark_agent_update_pending(agent: Agent) -> str:
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="update", status="updating")
return raw_token
async def provision_updated_agent(
@@ -1390,7 +1386,6 @@ class AgentLifecycleService(OpenClawDBService):
return
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="provision", status="provisioning")
await self.add_commit_refresh(agent)
board = await self.require_board(
str(agent.board_id) if agent.board_id else None,
@@ -1436,6 +1431,10 @@ class AgentLifecycleService(OpenClawDBService):
elif agent.status == "provisioning":
agent.status = "online"
agent.last_seen_at = utcnow()
# Successful check-in ends the current wake escalation cycle.
agent.wake_attempts = 0
agent.checkin_deadline_at = None
agent.last_provision_error = None
agent.updated_at = utcnow()
self.record_heartbeat(self.session, agent)
self.session.add(agent)

View File

@@ -150,6 +150,32 @@ def enqueue_task(
return False
def enqueue_task_with_delay(
task: QueuedTask,
queue_name: str,
*,
delay_seconds: float,
redis_url: str | None = None,
) -> bool:
"""Enqueue a task immediately or schedule it for delayed delivery."""
delay = max(0.0, float(delay_seconds))
if delay == 0:
return enqueue_task(task, queue_name, redis_url=redis_url)
try:
return _schedule_for_later(task, queue_name, delay, redis_url=redis_url)
except Exception as exc:
logger.warning(
"rq.queue.schedule_failed",
extra={
"task_type": task.task_type,
"queue_name": queue_name,
"delay_seconds": delay,
"error": str(exc),
},
)
return False
def _coerce_datetime(raw: object | None) -> datetime:
if raw is None:
return datetime.now(UTC)

View File

@@ -9,6 +9,11 @@ from dataclasses import dataclass
from app.core.config import settings
from app.core.logging import get_logger
from app.services.openclaw.lifecycle_queue import TASK_TYPE as LIFECYCLE_RECONCILE_TASK_TYPE
from app.services.openclaw.lifecycle_queue import (
requeue_lifecycle_queue_task,
)
from app.services.openclaw.lifecycle_reconcile import process_lifecycle_queue_task
from app.services.queue import QueuedTask, dequeue_task
from app.services.webhooks.dispatch import (
process_webhook_queue_task,
@@ -17,6 +22,7 @@ from app.services.webhooks.dispatch import (
from app.services.webhooks.queue import TASK_TYPE as WEBHOOK_TASK_TYPE
logger = get_logger(__name__)
_WORKER_BLOCK_TIMEOUT_SECONDS = 5.0
@dataclass(frozen=True)
@@ -27,6 +33,14 @@ class _TaskHandler:
_TASK_HANDLERS: dict[str, _TaskHandler] = {
LIFECYCLE_RECONCILE_TASK_TYPE: _TaskHandler(
handler=process_lifecycle_queue_task,
attempts_to_delay=lambda attempts: min(
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)),
settings.rq_dispatch_retry_max_seconds,
),
requeue=lambda task, delay: requeue_lifecycle_queue_task(task, delay_seconds=delay),
),
WEBHOOK_TASK_TYPE: _TaskHandler(
handler=process_webhook_queue_task,
attempts_to_delay=lambda attempts: min(
@@ -115,7 +129,8 @@ async def _run_worker_loop() -> None:
try:
await flush_queue(
block=True,
block_timeout=0,
# Keep a finite timeout so scheduled tasks are periodically drained.
block_timeout=_WORKER_BLOCK_TIMEOUT_SECONDS,
)
except Exception:
logger.exception(

View File

@@ -0,0 +1,45 @@
"""Add agent lifecycle metadata columns.
Revision ID: e3a1b2c4d5f6
Revises: b497b348ebb4
Create Date: 2026-02-24 00:00:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "e3a1b2c4d5f6"
down_revision = "b497b348ebb4"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Add lifecycle generation, wake tracking, and failure metadata."""
op.add_column(
"agents",
sa.Column("lifecycle_generation", sa.Integer(), nullable=False, server_default="0"),
)
op.add_column(
"agents",
sa.Column("wake_attempts", sa.Integer(), nullable=False, server_default="0"),
)
op.add_column("agents", sa.Column("last_wake_sent_at", sa.DateTime(), nullable=True))
op.add_column("agents", sa.Column("checkin_deadline_at", sa.DateTime(), nullable=True))
op.add_column("agents", sa.Column("last_provision_error", sa.Text(), nullable=True))
op.alter_column("agents", "lifecycle_generation", server_default=None)
op.alter_column("agents", "wake_attempts", server_default=None)
def downgrade() -> None:
"""Remove lifecycle generation, wake tracking, and failure metadata."""
op.drop_column("agents", "last_provision_error")
op.drop_column("agents", "checkin_deadline_at")
op.drop_column("agents", "last_wake_sent_at")
op.drop_column("agents", "wake_attempts")
op.drop_column("agents", "lifecycle_generation")

View File

@@ -35,12 +35,19 @@ curl -fsS "{{ base_url }}/healthz" >/dev/null
5) Ensure today's daily file exists: `memory/YYYY-MM-DD.md`.
{% if is_lead %}
6) Initialize current delivery status in `MEMORY.md`:
6) Immediately check in to Mission Control (do this before any task orchestration):
```bash
curl -s -X POST "{{ base_url }}/api/v1/agent/heartbeat" \
-H "X-Agent-Token: {{ auth_token }}"
```
7) Initialize current delivery status in `MEMORY.md`:
- set objective if missing
- set state to `Working` (or `Waiting` if external dependency exists)
- set one concrete next step
7) Add one line to `MEMORY.md` noting bootstrap completion date.
8) Add one line to `MEMORY.md` noting bootstrap completion date.
{% else %}
6) If any fields are blank, leave them blank. Do not invent values.

View File

@@ -35,12 +35,20 @@ jq -r '
## Schedule
- If a heartbeat schedule is configured, send a lightweight check-in only.
- On first cycle after wake/bootstrap, run heartbeat check-in immediately (do not wait for cadence).
- Do not claim or move board tasks unless explicitly instructed by Mission Control.
- If you have any pending `LEAD REQUEST: ASK USER` messages in OpenClaw chat, handle them promptly (see AGENTS.md).
## Heartbeat checklist
1) Check in:
1) Check in immediately:
- Use the `agent-main` heartbeat endpoint (`POST /api/v1/agent/heartbeat`).
- Startup check-in example:
```bash
curl -s -X POST "{{ base_url }}/api/v1/agent/heartbeat" \
-H "X-Agent-Token: {{ auth_token }}"
```
- If check-in fails due to 5xx/network, stop and retry next heartbeat.
- During that failure window, do **not** write memory updates (`MEMORY.md`, daily memory files).
@@ -117,6 +125,7 @@ jq -r '
## Schedule
- Heartbeat cadence is controlled by gateway heartbeat config.
- On first cycle after wake/bootstrap, run heartbeat check-in immediately (do not wait for cadence).
- Keep cadence conservative unless there is a clear latency need.
## Non-Negotiable Rules

View File

@@ -0,0 +1,126 @@
# ruff: noqa: INP001
"""Queue payload helpers for lifecycle reconcile tasks."""
from __future__ import annotations
from datetime import timedelta
from uuid import uuid4
import pytest
from app.core.time import utcnow
from app.services.openclaw.lifecycle_queue import (
QueuedAgentLifecycleReconcile,
decode_lifecycle_task,
defer_lifecycle_reconcile,
enqueue_lifecycle_reconcile,
)
from app.services.queue import QueuedTask
def test_enqueue_lifecycle_reconcile_uses_delayed_enqueue(
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}
def _fake_enqueue_with_delay(
task: QueuedTask,
queue_name: str,
*,
delay_seconds: float,
redis_url: str | None = None,
) -> bool:
captured["task"] = task
captured["queue_name"] = queue_name
captured["delay_seconds"] = delay_seconds
captured["redis_url"] = redis_url
return True
monkeypatch.setattr(
"app.services.openclaw.lifecycle_queue.enqueue_task_with_delay",
_fake_enqueue_with_delay,
)
payload = QueuedAgentLifecycleReconcile(
agent_id=uuid4(),
gateway_id=uuid4(),
board_id=uuid4(),
generation=7,
checkin_deadline_at=utcnow() + timedelta(seconds=30),
attempts=0,
)
assert enqueue_lifecycle_reconcile(payload) is True
task = captured["task"]
assert isinstance(task, QueuedTask)
assert task.task_type == "agent_lifecycle_reconcile"
assert float(captured["delay_seconds"]) > 0
def test_defer_lifecycle_reconcile_keeps_attempt_count(
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}
def _fake_enqueue_with_delay(
task: QueuedTask,
queue_name: str,
*,
delay_seconds: float,
redis_url: str | None = None,
) -> bool:
captured["task"] = task
captured["queue_name"] = queue_name
captured["delay_seconds"] = delay_seconds
captured["redis_url"] = redis_url
return True
monkeypatch.setattr(
"app.services.openclaw.lifecycle_queue.enqueue_task_with_delay",
_fake_enqueue_with_delay,
)
deadline = utcnow() + timedelta(minutes=1)
task = QueuedTask(
task_type="agent_lifecycle_reconcile",
payload={
"agent_id": str(uuid4()),
"gateway_id": str(uuid4()),
"board_id": None,
"generation": 3,
"checkin_deadline_at": deadline.isoformat(),
},
created_at=utcnow(),
attempts=2,
)
assert defer_lifecycle_reconcile(task, delay_seconds=12) is True
deferred_task = captured["task"]
assert isinstance(deferred_task, QueuedTask)
assert deferred_task.attempts == 2
assert float(captured["delay_seconds"]) == 12
def test_decode_lifecycle_task_roundtrip() -> None:
deadline = utcnow() + timedelta(minutes=3)
agent_id = uuid4()
gateway_id = uuid4()
board_id = uuid4()
task = QueuedTask(
task_type="agent_lifecycle_reconcile",
payload={
"agent_id": str(agent_id),
"gateway_id": str(gateway_id),
"board_id": str(board_id),
"generation": 5,
"checkin_deadline_at": deadline.isoformat(),
},
created_at=utcnow(),
attempts=1,
)
decoded = decode_lifecycle_task(task)
assert decoded.agent_id == agent_id
assert decoded.gateway_id == gateway_id
assert decoded.board_id == board_id
assert decoded.generation == 5
assert decoded.checkin_deadline_at == deadline
assert decoded.attempts == 1

View File

@@ -0,0 +1,53 @@
# ruff: noqa: INP001
"""Lifecycle reconcile state helpers."""
from __future__ import annotations
from datetime import timedelta
from uuid import uuid4
from app.core.time import utcnow
from app.models.agents import Agent
from app.services.openclaw.constants import (
CHECKIN_DEADLINE_AFTER_WAKE,
MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN,
)
from app.services.openclaw.lifecycle_reconcile import _has_checked_in_since_wake
def _agent(*, last_seen_offset_s: int | None, last_wake_offset_s: int | None) -> Agent:
now = utcnow()
return Agent(
name="reconcile-test",
gateway_id=uuid4(),
last_seen_at=(
(now + timedelta(seconds=last_seen_offset_s))
if last_seen_offset_s is not None
else None
),
last_wake_sent_at=(
(now + timedelta(seconds=last_wake_offset_s))
if last_wake_offset_s is not None
else None
),
)
def test_checked_in_since_wake_when_last_seen_after_wake() -> None:
agent = _agent(last_seen_offset_s=5, last_wake_offset_s=0)
assert _has_checked_in_since_wake(agent) is True
def test_not_checked_in_since_wake_when_last_seen_before_wake() -> None:
agent = _agent(last_seen_offset_s=-5, last_wake_offset_s=0)
assert _has_checked_in_since_wake(agent) is False
def test_not_checked_in_since_wake_when_missing_last_seen() -> None:
agent = _agent(last_seen_offset_s=None, last_wake_offset_s=0)
assert _has_checked_in_since_wake(agent) is False
def test_lifecycle_convergence_policy_constants() -> None:
assert CHECKIN_DEADLINE_AFTER_WAKE == timedelta(seconds=30)
assert MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN == 3

View File

@@ -0,0 +1,11 @@
# ruff: noqa: INP001
"""Queue worker registration tests for lifecycle reconcile tasks."""
from __future__ import annotations
from app.services.openclaw.lifecycle_queue import TASK_TYPE as LIFECYCLE_TASK_TYPE
from app.services.queue_worker import _TASK_HANDLERS
def test_worker_registers_lifecycle_reconcile_handler() -> None:
assert LIFECYCLE_TASK_TYPE in _TASK_HANDLERS

View File

@@ -10,6 +10,7 @@ This folder is the starting point for Mission Control documentation.
- [Deployment](./deployment/README.md)
- [Production notes](./production/README.md)
- [Troubleshooting](./troubleshooting/README.md)
- [Gateway agent provisioning and check-in troubleshooting](./troubleshooting/gateway-agent-provisioning.md)
- [Gateway WebSocket protocol](./openclaw_gateway_ws.md)
- [OpenClaw baseline configuration](./openclaw_baseline_config.md)

View File

@@ -1,3 +1,3 @@
# Troubleshooting
Placeholder.
- [Gateway agent provisioning and check-in](./gateway-agent-provisioning.md)

View File

@@ -0,0 +1,107 @@
# Gateway Agent Provisioning and Check-In Troubleshooting
This guide explains how agent provisioning converges to a healthy state, and how to debug when an agent appears stuck.
## Fast Convergence Policy
Mission Control now uses a fast convergence policy for wake/check-in:
- Check-in deadline after each wake: **30 seconds**
- Maximum wake attempts without check-in: **3**
- If no check-in after the third attempt: agent is marked **offline** and provisioning escalation stops
This applies to both gateway-main and board agents.
## Expected Lifecycle
1. Mission Control provisions/updates the agent and sends wake.
2. A delayed reconcile task is queued for the check-in deadline.
3. Agent should call heartbeat quickly after startup/bootstrap.
4. If heartbeat arrives:
- `last_seen_at` is updated
- wake escalation state is reset (`wake_attempts=0`, check-in deadline cleared)
5. If heartbeat does not arrive by deadline:
- reconcile re-runs lifecycle (wake again)
- up to 3 total wake attempts
6. If still no heartbeat after 3 attempts:
- agent status becomes `offline`
- `last_provision_error` is set
## Startup Check-In Behavior
Templates now explicitly require immediate first-cycle check-in:
- Main agent heartbeat instructions require immediate check-in after wake/bootstrap.
- Board lead bootstrap requires heartbeat check-in before orchestration.
- Board worker bootstrap already included immediate check-in.
If a gateway still has older templates, run template sync and reprovision/wake.
## What You Should See in Logs
Healthy flow usually includes:
- `lifecycle.queue.enqueued`
- `queue.worker.success` (for lifecycle tasks)
- `lifecycle.reconcile.skip_not_stuck` (after heartbeat lands)
If agent is not checking in:
- `lifecycle.reconcile.deferred` (before deadline)
- `lifecycle.reconcile.retriggered` (retry wake)
- `lifecycle.reconcile.max_attempts_reached` (final fail-safe at attempt 3)
If you do not see lifecycle events at all, verify queue worker health first.
## Common Failure Modes
### Wake was sent, but no check-in arrived
Possible causes:
- Agent process never started or crashed during bootstrap
- Agent ignored startup instructions due to stale templates
- Heartbeat call failed (network/auth/base URL mismatch)
Actions:
1. Confirm current templates were synced to gateway.
2. Re-run provisioning/update to trigger a fresh wake.
3. Verify agent can reach Mission Control API and send heartbeat with `X-Agent-Token`.
### Agent stays provisioning/updating with no retries
Possible causes:
- Queue worker not running
- Queue/Redis mismatch between API process and worker process
Actions:
1. Verify worker process is running continuously.
2. Verify `rq_redis_url` and `rq_queue_name` are identical for API and worker.
3. Check worker logs for dequeue/handler errors.
### Agent ended offline quickly
This is expected when no check-in is received after 3 wake attempts. The system fails fast by design.
Actions:
1. Fix check-in path first (startup, network, token, API reachability).
2. Re-run provisioning/update to start a new attempt cycle.
## Operator Recovery Checklist
1. Ensure queue worker is running.
2. Sync templates for the gateway.
3. Trigger agent update/provision from Mission Control.
4. Watch logs for:
- `lifecycle.queue.enqueued`
- `lifecycle.reconcile.retriggered` (if needed)
- heartbeat activity / `skip_not_stuck`
5. If still failing, capture:
- gateway logs around bootstrap
- worker logs around lifecycle events
- agent `last_provision_error`, `wake_attempts`, `last_seen_at`