Merge branch 'master' into docs/backend-doc-pass

This commit is contained in:
Abhimanyu Saharan
2026-02-25 03:32:14 +05:30
committed by GitHub
344 changed files with 36956 additions and 3626 deletions

View File

@@ -196,10 +196,11 @@ async def pending_approval_conflicts_by_task(
legacy_statement = legacy_statement.where(col(Approval.id) != exclude_approval_id)
legacy_rows = list(await session.exec(legacy_statement))
for legacy_task_id, approval_id, _created_at in legacy_rows:
if legacy_task_id is None:
for legacy_task_id_opt, approval_id, _created_at in legacy_rows:
if legacy_task_id_opt is None:
continue
conflicts.setdefault(legacy_task_id, approval_id)
# mypy: SQL rows can include NULL task_id; guard before using as dict[UUID, UUID] key.
conflicts.setdefault(legacy_task_id_opt, approval_id)
return conflicts

View File

@@ -18,8 +18,12 @@ from app.models.approval_task_links import ApprovalTaskLink
from app.models.approvals import Approval
from app.models.board_memory import BoardMemory
from app.models.board_onboarding import BoardOnboardingSession
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.organization_board_access import OrganizationBoardAccess
from app.models.organization_invite_board_access import OrganizationInviteBoardAccess
from app.models.tag_assignments import TagAssignment
from app.models.task_custom_fields import BoardTaskCustomField, TaskCustomFieldValue
from app.models.task_dependencies import TaskDependency
from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
@@ -34,6 +38,17 @@ if TYPE_CHECKING:
from app.models.boards import Board
def _is_missing_gateway_agent_error(exc: OpenClawGatewayError) -> bool:
message = str(exc).lower()
if not message:
return False
if any(
marker in message for marker in ("unknown agent", "no such agent", "agent does not exist")
):
return True
return "agent" in message and "not found" in message
async def delete_board(session: AsyncSession, *, board: Board) -> OkResponse:
"""Delete a board and all dependent records, cleaning gateway state when configured."""
agents = await Agent.objects.filter_by(board_id=board.id).all(session)
@@ -43,17 +58,19 @@ async def delete_board(session: AsyncSession, *, board: Board) -> OkResponse:
gateway = await require_gateway_for_board(session, board, require_workspace_root=True)
# Ensure URL is present (required for gateway cleanup calls).
gateway_client_config(gateway)
try:
for agent in agents:
for agent in agents:
try:
await OpenClawGatewayProvisioner().delete_agent_lifecycle(
agent=agent,
gateway=gateway,
)
except OpenClawGatewayError as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway cleanup failed: {exc}",
) from exc
except OpenClawGatewayError as exc:
if _is_missing_gateway_agent_error(exc):
continue
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway cleanup failed: {exc}",
) from exc
if task_ids:
await crud.delete_where(
@@ -62,6 +79,20 @@ async def delete_board(session: AsyncSession, *, board: Board) -> OkResponse:
col(ActivityEvent.task_id).in_(task_ids),
commit=False,
)
await crud.delete_where(
session,
TagAssignment,
col(TagAssignment.task_id).in_(task_ids),
commit=False,
)
await crud.delete_where(
session,
TaskCustomFieldValue,
col(TaskCustomFieldValue.task_id).in_(task_ids),
commit=False,
)
# Keep teardown ordered around FK/reference chains so dependent rows are gone
# before deleting their parent task/agent/board records.
await crud.delete_where(
session,
TaskDependency,
@@ -84,6 +115,12 @@ async def delete_board(session: AsyncSession, *, board: Board) -> OkResponse:
await crud.delete_where(session, Approval, col(Approval.board_id) == board.id)
await crud.delete_where(session, BoardMemory, col(BoardMemory.board_id) == board.id)
await crud.delete_where(
session,
BoardWebhookPayload,
col(BoardWebhookPayload.board_id) == board.id,
)
await crud.delete_where(session, BoardWebhook, col(BoardWebhook.board_id) == board.id)
await crud.delete_where(
session,
BoardOnboardingSession,
@@ -99,6 +136,11 @@ async def delete_board(session: AsyncSession, *, board: Board) -> OkResponse:
OrganizationInviteBoardAccess,
col(OrganizationInviteBoardAccess.board_id) == board.id,
)
await crud.delete_where(
session,
BoardTaskCustomField,
col(BoardTaskCustomField.board_id) == board.id,
)
# Tasks reference agents and have dependent records.
# Delete tasks before agents.

View File

@@ -36,10 +36,21 @@ def _memory_to_read(memory: BoardMemory) -> BoardMemoryRead:
return BoardMemoryRead.model_validate(memory, from_attributes=True)
def _approval_to_read(approval: Approval, *, task_ids: list[UUID]) -> ApprovalRead:
def _approval_to_read(
approval: Approval,
*,
task_ids: list[UUID],
task_titles: list[str],
) -> ApprovalRead:
model = ApprovalRead.model_validate(approval, from_attributes=True)
primary_task_id = task_ids[0] if task_ids else None
return model.model_copy(update={"task_id": primary_task_id, "task_ids": task_ids})
return model.model_copy(
update={
"task_id": primary_task_id,
"task_ids": task_ids,
"task_titles": task_titles,
},
)
def _task_to_card(
@@ -137,13 +148,23 @@ async def build_board_snapshot(session: AsyncSession, board: Board) -> BoardSnap
session,
approval_ids=approval_ids,
)
task_title_by_id = {task.id: task.title for task in tasks}
# Hydrate each approval with linked task metadata, falling back to legacy
# single-task fields so older rows still render complete approval cards.
approval_reads = [
_approval_to_read(
approval,
task_ids=task_ids_by_approval.get(
approval.id,
[approval.task_id] if approval.task_id is not None else [],
task_ids=(
linked_task_ids := task_ids_by_approval.get(
approval.id,
[approval.task_id] if approval.task_id is not None else [],
)
),
task_titles=[
task_title_by_id[task_id]
for task_id in linked_task_ids
if task_id in task_title_by_id
],
)
for approval in approvals
]

View File

@@ -5,16 +5,16 @@ from __future__ import annotations
import hashlib
from typing import Mapping
CONFIDENCE_THRESHOLD = 80
CONFIDENCE_THRESHOLD = 80.0
MIN_PLANNING_SIGNALS = 2
def compute_confidence(rubric_scores: Mapping[str, int]) -> int:
def compute_confidence(rubric_scores: Mapping[str, int]) -> float:
"""Compute aggregate confidence from rubric score components."""
return int(sum(rubric_scores.values()))
return float(sum(rubric_scores.values()))
def approval_required(*, confidence: int, is_external: bool, is_risky: bool) -> bool:
def approval_required(*, confidence: float, is_external: bool, is_risky: bool) -> bool:
"""Return whether an action must go through explicit approval."""
return is_external or is_risky or confidence < CONFIDENCE_THRESHOLD

View File

@@ -16,26 +16,23 @@ from app.db import crud
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.board_webhooks import BoardWebhook
from app.models.gateways import Gateway
from app.models.tasks import Task
from app.schemas.gateways import GatewayTemplatesSyncResult
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.error_messages import normalize_gateway_error_message
from app.services.openclaw.gateway_compat import check_gateway_version_compatibility
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.provisioning_db import (
GatewayTemplateSyncOptions,
OpenClawProvisioningService,
)
from app.services.openclaw.session_service import GatewayTemplateSyncQuery
from app.services.openclaw.shared import GatewayAgentIdentity
from app.services.organizations import get_org_owner_user
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -165,7 +162,12 @@ class GatewayAdminLifecycleService(OpenClawDBService):
async def gateway_has_main_agent_entry(self, gateway: Gateway) -> bool:
if not gateway.url:
return False
config = GatewayClientConfig(url=gateway.url, token=gateway.token)
config = GatewayClientConfig(
url=gateway.url,
token=gateway.token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
)
target_id = GatewayAgentIdentity.openclaw_agent_id(gateway)
try:
await openclaw_call("agents.files.list", {"agentId": target_id}, config=config)
@@ -176,6 +178,35 @@ class GatewayAdminLifecycleService(OpenClawDBService):
return True
return True
async def assert_gateway_runtime_compatible(
self,
*,
url: str,
token: str | None,
allow_insecure_tls: bool = False,
disable_device_pairing: bool = False,
) -> None:
"""Validate that a gateway runtime meets minimum supported version."""
config = GatewayClientConfig(
url=url,
token=token,
allow_insecure_tls=allow_insecure_tls,
disable_device_pairing=disable_device_pairing,
)
try:
result = await check_gateway_version_compatibility(config)
except OpenClawGatewayError as exc:
detail = normalize_gateway_error_message(str(exc))
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway compatibility check failed: {detail}",
) from exc
if not result.compatible:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=result.message or "Gateway runtime version is not supported.",
)
async def provision_main_agent_record(
self,
gateway: Gateway,
@@ -185,69 +216,38 @@ class GatewayAdminLifecycleService(OpenClawDBService):
action: str,
notify: bool,
) -> Agent:
template_user = user or await get_org_owner_user(
self.session,
organization_id=gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Organization owner not found (required for gateway agent USER.md rendering).",
)
raw_token = mint_agent_token(agent)
mark_provision_requested(
agent,
action=action,
status="updating" if action == "update" else "provisioning",
)
await self.add_commit_refresh(agent)
if not gateway.url:
return agent
orchestrator = AgentLifecycleOrchestrator(self.session)
try:
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=agent,
provisioned = await orchestrator.run_lifecycle(
gateway=gateway,
agent_id=agent.id,
board=None,
auth_token=raw_token,
user=template_user,
user=user,
action=action,
auth_token=None,
force_bootstrap=False,
reset_session=False,
wake=notify,
deliver_wakeup=True,
wakeup_verb=None,
clear_confirm_token=False,
raise_gateway_errors=True,
)
except OpenClawGatewayError as exc:
except HTTPException:
self.logger.error(
"gateway.main_agent.provision_failed_gateway gateway_id=%s agent_id=%s error=%s",
"gateway.main_agent.provision_failed gateway_id=%s agent_id=%s action=%s",
gateway.id,
agent.id,
str(exc),
action,
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
except (OSError, RuntimeError, ValueError) as exc:
self.logger.error(
"gateway.main_agent.provision_failed gateway_id=%s agent_id=%s error=%s",
gateway.id,
agent.id,
str(exc),
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing gateway provisioning.",
) from exc
mark_provision_complete(agent, status="online")
await self.add_commit_refresh(agent)
raise
self.logger.info(
"gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s",
gateway.id,
agent.id,
provisioned.id,
action,
)
return agent
return provisioned
async def ensure_main_agent(
self,
@@ -323,6 +323,14 @@ class GatewayAdminLifecycleService(OpenClawDBService):
agent_id=None,
commit=False,
)
await crud.update_where(
self.session,
BoardWebhook,
col(BoardWebhook.agent_id) == agent_id,
agent_id=None,
updated_at=now,
commit=False,
)
async def sync_templates(
self,
@@ -343,9 +351,11 @@ class GatewayAdminLifecycleService(OpenClawDBService):
GatewayTemplateSyncOptions(
user=auth.user,
include_main=query.include_main,
lead_only=query.lead_only,
reset_sessions=query.reset_sessions,
rotate_tokens=query.rotate_tokens,
force_bootstrap=query.force_bootstrap,
overwrite=query.overwrite,
board_id=query.board_id,
),
)

View File

@@ -13,11 +13,16 @@ _GATEWAY_AGENT_SUFFIX = ":main"
DEFAULT_HEARTBEAT_CONFIG: dict[str, Any] = {
"every": "10m",
"target": "none",
"target": "last",
"includeReasoning": False,
}
OFFLINE_AFTER = timedelta(minutes=10)
# Provisioning convergence policy:
# - require first heartbeat/check-in within 30s of wake
# - allow up to 3 wake attempts before giving up
CHECKIN_DEADLINE_AFTER_WAKE = timedelta(seconds=30)
MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN = 3
AGENT_SESSION_PREFIX = "agent"
DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY: dict[str, bool] = {
@@ -55,41 +60,65 @@ DEFAULT_GATEWAY_FILES = frozenset(
{
"AGENTS.md",
"SOUL.md",
"TASK_SOUL.md",
"SELF.md",
"AUTONOMY.md",
"TOOLS.md",
"IDENTITY.md",
"USER.md",
"HEARTBEAT.md",
"BOOT.md",
"BOOTSTRAP.md",
"MEMORY.md",
},
)
# Lead-only workspace contract. Used for board leads to allow an iterative rollout
# without changing worker templates.
LEAD_GATEWAY_FILES = frozenset(
{
"AGENTS.md",
"BOOTSTRAP.md",
"IDENTITY.md",
"SOUL.md",
"USER.md",
"MEMORY.md",
"TOOLS.md",
"HEARTBEAT.md",
},
)
# These files are intended to evolve within the agent workspace.
# Provision them if missing, but avoid overwriting existing content during updates.
#
# Examples:
# - SELF.md: evolving identity/preferences
# - USER.md: human-provided context + lead intake notes
# - MEMORY.md: curated long-term memory (consolidated)
PRESERVE_AGENT_EDITABLE_FILES = frozenset({"SELF.md", "USER.md", "MEMORY.md", "TASK_SOUL.md"})
PRESERVE_AGENT_EDITABLE_FILES = frozenset({"USER.md", "MEMORY.md"})
HEARTBEAT_LEAD_TEMPLATE = "HEARTBEAT_LEAD.md"
HEARTBEAT_AGENT_TEMPLATE = "HEARTBEAT_AGENT.md"
HEARTBEAT_LEAD_TEMPLATE = "BOARD_HEARTBEAT.md.j2"
HEARTBEAT_AGENT_TEMPLATE = "BOARD_HEARTBEAT.md.j2"
SESSION_KEY_PARTS_MIN = 2
_SESSION_KEY_PARTS_MIN = SESSION_KEY_PARTS_MIN
MAIN_TEMPLATE_MAP = {
"AGENTS.md": "MAIN_AGENTS.md",
"HEARTBEAT.md": "MAIN_HEARTBEAT.md",
"USER.md": "MAIN_USER.md",
"BOOT.md": "MAIN_BOOT.md",
"TOOLS.md": "MAIN_TOOLS.md",
"AGENTS.md": "BOARD_AGENTS.md.j2",
"IDENTITY.md": "BOARD_IDENTITY.md.j2",
"SOUL.md": "BOARD_SOUL.md.j2",
"MEMORY.md": "BOARD_MEMORY.md.j2",
"HEARTBEAT.md": "BOARD_HEARTBEAT.md.j2",
"USER.md": "BOARD_USER.md.j2",
"TOOLS.md": "BOARD_TOOLS.md.j2",
}
BOARD_SHARED_TEMPLATE_MAP = {
"AGENTS.md": "BOARD_AGENTS.md.j2",
"BOOTSTRAP.md": "BOARD_BOOTSTRAP.md.j2",
"IDENTITY.md": "BOARD_IDENTITY.md.j2",
"SOUL.md": "BOARD_SOUL.md.j2",
"MEMORY.md": "BOARD_MEMORY.md.j2",
"HEARTBEAT.md": "BOARD_HEARTBEAT.md.j2",
"USER.md": "BOARD_USER.md.j2",
"TOOLS.md": "BOARD_TOOLS.md.j2",
}
LEAD_TEMPLATE_MAP: dict[str, str] = {}
_TOOLS_KV_RE = re.compile(r"^(?P<key>[A-Z0-9_]+)=(?P<value>.*)$")
_NON_TRANSIENT_GATEWAY_ERROR_MARKERS = ("unsupported file",)
_TRANSIENT_GATEWAY_ERROR_MARKERS = (

View File

@@ -184,7 +184,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
target = await self._board_agent_or_404(board=board, agent_id=target_agent_id)
if not target.openclaw_session_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Target agent has no session key",
)
_gateway, config = await GatewayDispatchService(
@@ -335,7 +335,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
normalized_content = content.strip()
if not normalized_content:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="content is required",
)
@@ -541,7 +541,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
)
if not lead.openclaw_session_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Lead agent has no session key",
)
await self._dispatch_gateway_message(

View File

@@ -0,0 +1,167 @@
"""OpenClaw-compatible device identity and connect-signature helpers."""
from __future__ import annotations
import hashlib
import json
import os
from dataclasses import dataclass
from pathlib import Path
from time import time
from typing import Any, cast
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
DEFAULT_DEVICE_IDENTITY_PATH = Path.home() / ".openclaw" / "identity" / "device.json"
@dataclass(frozen=True)
class DeviceIdentity:
"""Persisted gateway device identity used for connect signatures."""
device_id: str
public_key_pem: str
private_key_pem: str
def _identity_path() -> Path:
raw = os.getenv("OPENCLAW_GATEWAY_DEVICE_IDENTITY_PATH", "").strip()
if raw:
return Path(raw).expanduser().resolve()
return DEFAULT_DEVICE_IDENTITY_PATH
def _base64url_encode(raw: bytes) -> str:
import base64
return base64.urlsafe_b64encode(raw).decode("utf-8").rstrip("=")
def _derive_public_key_raw(public_key_pem: str) -> bytes:
loaded = serialization.load_pem_public_key(public_key_pem.encode("utf-8"))
if not isinstance(loaded, Ed25519PublicKey):
msg = "device identity public key is not Ed25519"
raise ValueError(msg)
return loaded.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
def _derive_device_id(public_key_pem: str) -> str:
return hashlib.sha256(_derive_public_key_raw(public_key_pem)).hexdigest()
def _write_identity(path: Path, identity: DeviceIdentity) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"version": 1,
"deviceId": identity.device_id,
"publicKeyPem": identity.public_key_pem,
"privateKeyPem": identity.private_key_pem,
"createdAtMs": int(time() * 1000),
}
path.write_text(f"{json.dumps(payload, indent=2)}\n", encoding="utf-8")
try:
path.chmod(0o600)
except OSError:
# Best effort on platforms/filesystems that ignore chmod.
pass
def _generate_identity() -> DeviceIdentity:
private_key = Ed25519PrivateKey.generate()
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode("utf-8")
public_key_pem = (
private_key.public_key()
.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
.decode("utf-8")
)
device_id = _derive_device_id(public_key_pem)
return DeviceIdentity(
device_id=device_id,
public_key_pem=public_key_pem,
private_key_pem=private_key_pem,
)
def load_or_create_device_identity() -> DeviceIdentity:
"""Load persisted device identity or create a new one when missing/invalid."""
path = _identity_path()
try:
if path.exists():
payload = cast(dict[str, Any], json.loads(path.read_text(encoding="utf-8")))
device_id = str(payload.get("deviceId") or "").strip()
public_key_pem = str(payload.get("publicKeyPem") or "").strip()
private_key_pem = str(payload.get("privateKeyPem") or "").strip()
if device_id and public_key_pem and private_key_pem:
derived_id = _derive_device_id(public_key_pem)
identity = DeviceIdentity(
device_id=derived_id,
public_key_pem=public_key_pem,
private_key_pem=private_key_pem,
)
if derived_id != device_id:
_write_identity(path, identity)
return identity
except (OSError, ValueError, json.JSONDecodeError):
# Fall through to regenerate.
pass
identity = _generate_identity()
_write_identity(path, identity)
return identity
def public_key_raw_base64url_from_pem(public_key_pem: str) -> str:
"""Return raw Ed25519 public key in base64url form expected by OpenClaw."""
return _base64url_encode(_derive_public_key_raw(public_key_pem))
def sign_device_payload(private_key_pem: str, payload: str) -> str:
"""Sign a device payload with Ed25519 and return base64url signature."""
loaded = serialization.load_pem_private_key(private_key_pem.encode("utf-8"), password=None)
if not isinstance(loaded, Ed25519PrivateKey):
msg = "device identity private key is not Ed25519"
raise ValueError(msg)
signature = loaded.sign(payload.encode("utf-8"))
return _base64url_encode(signature)
def build_device_auth_payload(
*,
device_id: str,
client_id: str,
client_mode: str,
role: str,
scopes: list[str],
signed_at_ms: int,
token: str | None,
nonce: str | None,
) -> str:
"""Build the OpenClaw canonical payload string for device signatures."""
version = "v2" if nonce else "v1"
parts = [
version,
device_id,
client_id,
client_mode,
role,
",".join(scopes),
str(signed_at_ms),
token or "",
]
if version == "v2":
parts.append(nonce or "")
return "|".join(parts)

View File

@@ -0,0 +1,31 @@
"""Normalization helpers for user-facing OpenClaw gateway errors."""
from __future__ import annotations
import re
_MISSING_SCOPE_PATTERN = re.compile(
r"missing\s+scope\s*:\s*(?P<scope>[A-Za-z0-9._:-]+)",
re.IGNORECASE,
)
def normalize_gateway_error_message(message: str) -> str:
"""Return a user-friendly message for common gateway auth failures."""
raw_message = message.strip()
if not raw_message:
return "Gateway authentication failed. Verify gateway token and operator scopes."
missing_scope = _MISSING_SCOPE_PATTERN.search(raw_message)
if missing_scope is not None:
scope = missing_scope.group("scope")
return (
f"Gateway token is missing required scope `{scope}`. "
"Update the gateway token scopes and retry."
)
lowered = raw_message.lower()
if "unauthorized" in lowered or "forbidden" in lowered:
return "Gateway authentication failed. Verify gateway token and operator scopes."
return raw_message

View File

@@ -0,0 +1,181 @@
"""Gateway runtime version compatibility checks."""
from __future__ import annotations
import re
from dataclasses import dataclass
from app.core.config import settings
from app.core.logging import get_logger
from app.services.openclaw.gateway_rpc import (
GatewayConfig,
OpenClawGatewayError,
openclaw_call,
openclaw_connect_metadata,
)
_CALVER_PATTERN = re.compile(
r"^v?(?P<year>\d{4})\.(?P<month>\d{1,2})\.(?P<day>\d{1,2})(?:-(?P<rev>\d+))?$",
re.IGNORECASE,
)
_CONNECT_VERSION_PATH: tuple[str, ...] = ("server", "version")
_CONFIG_VERSION_PATH: tuple[str, ...] = ("config", "meta", "lastTouchedVersion")
logger = get_logger(__name__)
@dataclass(frozen=True, slots=True)
class GatewayVersionCheckResult:
"""Compatibility verdict for a gateway runtime version."""
compatible: bool
minimum_version: str
current_version: str | None
message: str | None = None
def _normalized_minimum_version() -> str:
raw = (settings.gateway_min_version or "").strip()
return raw or "2026.1.30"
def _parse_version_parts(value: str) -> tuple[int, ...] | None:
match = _CALVER_PATTERN.match(value.strip())
if match is None:
return None
year = int(match.group("year"))
month = int(match.group("month"))
day = int(match.group("day"))
revision = int(match.group("rev") or 0)
if month < 1 or month > 12:
return None
if day < 1 or day > 31:
return None
return (year, month, day, revision)
def _compare_versions(left: tuple[int, ...], right: tuple[int, ...]) -> int:
width = max(len(left), len(right))
left_padded = left + (0,) * (width - len(left))
right_padded = right + (0,) * (width - len(right))
if left_padded < right_padded:
return -1
if left_padded > right_padded:
return 1
return 0
def _value_at_path(payload: object, path: tuple[str, ...]) -> object | None:
current = payload
for segment in path:
if not isinstance(current, dict):
return None
if segment not in current:
return None
current = current[segment]
return current
def _coerce_version_string(value: object) -> str | None:
if isinstance(value, str):
normalized = value.strip()
return normalized or None
if isinstance(value, (int, float)):
return str(value)
return None
def extract_connect_server_version(payload: object) -> str | None:
"""Extract the canonical runtime version from connect metadata."""
return _coerce_version_string(_value_at_path(payload, _CONNECT_VERSION_PATH))
def extract_config_last_touched_version(payload: object) -> str | None:
"""Extract a runtime version hint from config.get payload."""
return _coerce_version_string(_value_at_path(payload, _CONFIG_VERSION_PATH))
def evaluate_gateway_version(
*,
current_version: str | None,
minimum_version: str | None = None,
) -> GatewayVersionCheckResult:
"""Return compatibility result for the reported gateway version."""
min_version = (minimum_version or _normalized_minimum_version()).strip()
min_parts = _parse_version_parts(min_version)
if min_parts is None:
msg = (
"Server configuration error: GATEWAY_MIN_VERSION is invalid. "
f"Expected CalVer 'YYYY.M.D' or 'YYYY.M.D-REV', got '{min_version}'."
)
return GatewayVersionCheckResult(
compatible=False,
minimum_version=min_version,
current_version=current_version,
message=msg,
)
if current_version is None:
return GatewayVersionCheckResult(
compatible=False,
minimum_version=min_version,
current_version=None,
message=(
"Unable to determine gateway version from runtime metadata. "
f"Minimum supported version is {min_version}."
),
)
current_parts = _parse_version_parts(current_version)
if current_parts is None:
return GatewayVersionCheckResult(
compatible=False,
minimum_version=min_version,
current_version=current_version,
message=(
f"Gateway reported an unsupported version format '{current_version}'. "
f"Minimum supported version is {min_version}."
),
)
if _compare_versions(current_parts, min_parts) < 0:
return GatewayVersionCheckResult(
compatible=False,
minimum_version=min_version,
current_version=current_version,
message=(
f"Gateway version {current_version} is not supported. "
f"Minimum supported version is {min_version}."
),
)
return GatewayVersionCheckResult(
compatible=True,
minimum_version=min_version,
current_version=current_version,
)
async def check_gateway_version_compatibility(
config: GatewayConfig,
*,
minimum_version: str | None = None,
) -> GatewayVersionCheckResult:
"""Evaluate gateway compatibility using connect metadata with config fallback."""
connect_payload = await openclaw_connect_metadata(config=config)
current_version = extract_connect_server_version(connect_payload)
if current_version is None or _parse_version_parts(current_version) is None:
try:
config_payload = await openclaw_call("config.get", config=config)
except OpenClawGatewayError as exc:
logger.debug(
"gateway.compat.config_get_fallback_unavailable reason=%s",
str(exc),
)
else:
fallback_version = extract_config_last_touched_version(config_payload)
if fallback_version is not None:
current_version = fallback_version
return evaluate_gateway_version(
current_version=current_version,
minimum_version=minimum_version,
)

View File

@@ -28,11 +28,16 @@ def gateway_client_config(gateway: Gateway) -> GatewayClientConfig:
url = (gateway.url or "").strip()
if not url:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Gateway url is required",
)
token = (gateway.token or "").strip() or None
return GatewayClientConfig(url=url, token=token)
return GatewayClientConfig(
url=url,
token=token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
)
def optional_gateway_client_config(gateway: Gateway | None) -> GatewayClientConfig | None:
@@ -43,7 +48,12 @@ def optional_gateway_client_config(gateway: Gateway | None) -> GatewayClientConf
if not url:
return None
token = (gateway.token or "").strip() or None
return GatewayClientConfig(url=url, token=token)
return GatewayClientConfig(
url=url,
token=token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
)
def require_gateway_workspace_root(gateway: Gateway) -> str:
@@ -51,7 +61,7 @@ def require_gateway_workspace_root(gateway: Gateway) -> str:
workspace_root = (gateway.workspace_root or "").strip()
if not workspace_root:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Gateway workspace_root is required",
)
return workspace_root
@@ -82,13 +92,13 @@ async def require_gateway_for_board(
"""Return a board's gateway or raise a 422 with a stable error message."""
if board.gateway_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Board gateway_id is required",
)
gateway = await get_gateway_for_board(session, board)
if gateway is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Board gateway_id is invalid",
)
if require_workspace_root:

View File

@@ -9,9 +9,10 @@ from __future__ import annotations
import asyncio
import json
import ssl
from dataclasses import dataclass
from time import perf_counter
from typing import Any
from time import perf_counter, time
from typing import Any, Literal
from urllib.parse import urlencode, urlparse, urlunparse
from uuid import uuid4
@@ -19,9 +20,26 @@ import websockets
from websockets.exceptions import WebSocketException
from app.core.logging import TRACE_LEVEL, get_logger
from app.services.openclaw.device_identity import (
build_device_auth_payload,
load_or_create_device_identity,
public_key_raw_base64url_from_pem,
sign_device_payload,
)
PROTOCOL_VERSION = 3
logger = get_logger(__name__)
GATEWAY_OPERATOR_SCOPES = (
"operator.read",
"operator.admin",
"operator.approvals",
"operator.pairing",
)
DEFAULT_GATEWAY_CLIENT_ID = "gateway-client"
DEFAULT_GATEWAY_CLIENT_MODE = "backend"
CONTROL_UI_CLIENT_ID = "openclaw-control-ui"
CONTROL_UI_CLIENT_MODE = "ui"
GatewayConnectMode = Literal["device", "control_ui"]
# NOTE: These are the base gateway methods from the OpenClaw gateway repo.
# The gateway can expose additional methods at runtime via channel plugins.
@@ -154,6 +172,8 @@ class GatewayConfig:
url: str
token: str | None = None
allow_insecure_tls: bool = False
disable_device_pairing: bool = False
def _build_gateway_url(config: GatewayConfig) -> str:
@@ -174,6 +194,78 @@ def _redacted_url_for_log(raw_url: str) -> str:
return str(urlunparse(parsed._replace(query="", fragment="")))
def _create_ssl_context(config: GatewayConfig) -> ssl.SSLContext | None:
"""Create an insecure SSL context override for explicit opt-in TLS bypass.
This behavior is intentionally host-agnostic: when ``allow_insecure_tls`` is
enabled for a ``wss://`` gateway, certificate and hostname verification are
disabled for that gateway connection.
"""
parsed = urlparse(config.url)
if parsed.scheme != "wss":
return None
if not config.allow_insecure_tls:
return None
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
return ssl_context
def _build_control_ui_origin(gateway_url: str) -> str | None:
parsed = urlparse(gateway_url)
if not parsed.hostname:
return None
if parsed.scheme in {"ws", "http"}:
origin_scheme = "http"
elif parsed.scheme in {"wss", "https"}:
origin_scheme = "https"
else:
return None
host = parsed.hostname
if ":" in host and not host.startswith("["):
host = f"[{host}]"
if parsed.port is not None:
host = f"{host}:{parsed.port}"
return f"{origin_scheme}://{host}"
def _resolve_connect_mode(config: GatewayConfig) -> GatewayConnectMode:
return "control_ui" if config.disable_device_pairing else "device"
def _build_device_connect_payload(
*,
client_id: str,
client_mode: str,
role: str,
scopes: list[str],
auth_token: str | None,
connect_nonce: str | None,
) -> dict[str, Any]:
identity = load_or_create_device_identity()
signed_at_ms = int(time() * 1000)
payload = build_device_auth_payload(
device_id=identity.device_id,
client_id=client_id,
client_mode=client_mode,
role=role,
scopes=scopes,
signed_at_ms=signed_at_ms,
token=auth_token,
nonce=connect_nonce,
)
device_payload: dict[str, Any] = {
"id": identity.device_id,
"publicKey": public_key_raw_base64url_from_pem(identity.public_key_pem),
"signature": sign_device_payload(identity.private_key_pem, payload),
"signedAt": signed_at_ms,
}
if connect_nonce:
device_payload["nonce"] = connect_nonce
return device_payload
async def _await_response(
ws: websockets.ClientConnection,
request_id: str,
@@ -225,17 +317,36 @@ async def _send_request(
return await _await_response(ws, request_id)
def _build_connect_params(config: GatewayConfig) -> dict[str, Any]:
def _build_connect_params(
config: GatewayConfig,
*,
connect_nonce: str | None = None,
) -> dict[str, Any]:
role = "operator"
scopes = list(GATEWAY_OPERATOR_SCOPES)
connect_mode = _resolve_connect_mode(config)
use_control_ui = connect_mode == "control_ui"
params: dict[str, Any] = {
"minProtocol": PROTOCOL_VERSION,
"maxProtocol": PROTOCOL_VERSION,
"role": role,
"scopes": scopes,
"client": {
"id": "gateway-client",
"id": CONTROL_UI_CLIENT_ID if use_control_ui else DEFAULT_GATEWAY_CLIENT_ID,
"version": "1.0.0",
"platform": "web",
"mode": "ui",
"platform": "python",
"mode": CONTROL_UI_CLIENT_MODE if use_control_ui else DEFAULT_GATEWAY_CLIENT_MODE,
},
}
if not use_control_ui:
params["device"] = _build_device_connect_payload(
client_id=DEFAULT_GATEWAY_CLIENT_ID,
client_mode=DEFAULT_GATEWAY_CLIENT_MODE,
role=role,
scopes=scopes,
auth_token=config.token,
connect_nonce=connect_nonce,
)
if config.token:
params["auth"] = {"token": config.token}
return params
@@ -245,12 +356,19 @@ async def _ensure_connected(
ws: websockets.ClientConnection,
first_message: str | bytes | None,
config: GatewayConfig,
) -> None:
) -> object:
connect_nonce: str | None = None
if first_message:
if isinstance(first_message, bytes):
first_message = first_message.decode("utf-8")
data = json.loads(first_message)
if data.get("type") != "event" or data.get("event") != "connect.challenge":
if data.get("type") == "event" and data.get("event") == "connect.challenge":
payload = data.get("payload")
if isinstance(payload, dict):
nonce = payload.get("nonce")
if isinstance(nonce, str) and nonce.strip():
connect_nonce = nonce.strip()
else:
logger.warning(
"gateway.rpc.connect.unexpected_first_message type=%s event=%s",
data.get("type"),
@@ -261,10 +379,52 @@ async def _ensure_connected(
"type": "req",
"id": connect_id,
"method": "connect",
"params": _build_connect_params(config),
"params": _build_connect_params(config, connect_nonce=connect_nonce),
}
await ws.send(json.dumps(response))
await _await_response(ws, connect_id)
return await _await_response(ws, connect_id)
async def _recv_first_message_or_none(
ws: websockets.ClientConnection,
) -> str | bytes | None:
try:
return await asyncio.wait_for(ws.recv(), timeout=2)
except TimeoutError:
return None
async def _openclaw_call_once(
method: str,
params: dict[str, Any] | None,
*,
config: GatewayConfig,
gateway_url: str,
) -> object:
origin = _build_control_ui_origin(gateway_url) if config.disable_device_pairing else None
ssl_context = _create_ssl_context(config)
connect_kwargs: dict[str, Any] = {"ping_interval": None}
if origin is not None:
connect_kwargs["origin"] = origin
async with websockets.connect(gateway_url, ssl=ssl_context, **connect_kwargs) as ws:
first_message = await _recv_first_message_or_none(ws)
await _ensure_connected(ws, first_message, config)
return await _send_request(ws, method, params)
async def _openclaw_connect_metadata_once(
*,
config: GatewayConfig,
gateway_url: str,
) -> object:
origin = _build_control_ui_origin(gateway_url) if config.disable_device_pairing else None
ssl_context = _create_ssl_context(config)
connect_kwargs: dict[str, Any] = {"ping_interval": None}
if origin is not None:
connect_kwargs["origin"] = origin
async with websockets.connect(gateway_url, ssl=ssl_context, **connect_kwargs) as ws:
first_message = await _recv_first_message_or_none(ws)
return await _ensure_connected(ws, first_message, config)
async def openclaw_call(
@@ -277,25 +437,28 @@ async def openclaw_call(
gateway_url = _build_gateway_url(config)
started_at = perf_counter()
logger.debug(
"gateway.rpc.call.start method=%s gateway_url=%s",
(
"gateway.rpc.call.start method=%s gateway_url=%s allow_insecure_tls=%s "
"disable_device_pairing=%s"
),
method,
_redacted_url_for_log(gateway_url),
config.allow_insecure_tls,
config.disable_device_pairing,
)
try:
async with websockets.connect(gateway_url, ping_interval=None) as ws:
first_message = None
try:
first_message = await asyncio.wait_for(ws.recv(), timeout=2)
except TimeoutError:
first_message = None
await _ensure_connected(ws, first_message, config)
payload = await _send_request(ws, method, params)
logger.debug(
"gateway.rpc.call.success method=%s duration_ms=%s",
method,
int((perf_counter() - started_at) * 1000),
)
return payload
payload = await _openclaw_call_once(
method,
params,
config=config,
gateway_url=gateway_url,
)
logger.debug(
"gateway.rpc.call.success method=%s duration_ms=%s",
method,
int((perf_counter() - started_at) * 1000),
)
return payload
except OpenClawGatewayError:
logger.warning(
"gateway.rpc.call.gateway_error method=%s duration_ms=%s",
@@ -319,6 +482,45 @@ async def openclaw_call(
raise OpenClawGatewayError(str(exc)) from exc
async def openclaw_connect_metadata(*, config: GatewayConfig) -> object:
"""Open a gateway connection and return the connect/hello payload."""
gateway_url = _build_gateway_url(config)
started_at = perf_counter()
logger.debug(
"gateway.rpc.connect_metadata.start gateway_url=%s",
_redacted_url_for_log(gateway_url),
)
try:
metadata = await _openclaw_connect_metadata_once(
config=config,
gateway_url=gateway_url,
)
logger.debug(
"gateway.rpc.connect_metadata.success duration_ms=%s",
int((perf_counter() - started_at) * 1000),
)
return metadata
except OpenClawGatewayError:
logger.warning(
"gateway.rpc.connect_metadata.gateway_error duration_ms=%s",
int((perf_counter() - started_at) * 1000),
)
raise
except (
TimeoutError,
ConnectionError,
OSError,
ValueError,
WebSocketException,
) as exc: # pragma: no cover - network/protocol errors
logger.error(
"gateway.rpc.connect_metadata.transport_error duration_ms=%s error_type=%s",
int((perf_counter() - started_at) * 1000),
exc.__class__.__name__,
)
raise OpenClawGatewayError(str(exc)) from exc
async def send_message(
message: str,
*,

View File

@@ -0,0 +1,167 @@
"""Unified agent lifecycle orchestration.
This module centralizes DB-backed lifecycle transitions so call sites do not
duplicate provisioning/wake/state logic.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import HTTPException, status
from sqlmodel import col, select
from app.core.time import utcnow
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services.openclaw.constants import CHECKIN_DEADLINE_AFTER_WAKE
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.openclaw.lifecycle_queue import (
QueuedAgentLifecycleReconcile,
enqueue_lifecycle_reconcile,
)
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
from app.services.organizations import get_org_owner_user
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.users import User
class AgentLifecycleOrchestrator(OpenClawDBService):
"""Single lifecycle writer for agent provision/update transitions."""
def __init__(self, session: AsyncSession) -> None:
super().__init__(session)
async def _lock_agent(self, *, agent_id: UUID) -> Agent:
statement = select(Agent).where(col(Agent.id) == agent_id).with_for_update()
agent = (await self.session.exec(statement)).first()
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent not found")
return agent
async def run_lifecycle(
self,
*,
gateway: Gateway,
agent_id: UUID,
board: Board | None,
user: User | None,
action: str,
auth_token: str | None = None,
force_bootstrap: bool = False,
reset_session: bool = False,
wake: bool = True,
deliver_wakeup: bool = True,
wakeup_verb: str | None = None,
clear_confirm_token: bool = False,
raise_gateway_errors: bool = True,
) -> Agent:
"""Provision or update any agent under a per-agent lock."""
locked = await self._lock_agent(agent_id=agent_id)
template_user = user
if board is None and template_user is None:
template_user = await get_org_owner_user(
self.session,
organization_id=gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=(
"Organization owner not found "
"(required for gateway agent USER.md rendering)."
),
)
raw_token = auth_token or mint_agent_token(locked)
mark_provision_requested(
locked,
action=action,
status="updating" if action == "update" else "provisioning",
)
locked.lifecycle_generation += 1
locked.last_provision_error = None
locked.checkin_deadline_at = utcnow() + CHECKIN_DEADLINE_AFTER_WAKE if wake else None
if wake:
locked.wake_attempts += 1
locked.last_wake_sent_at = utcnow()
self.session.add(locked)
await self.session.flush()
if not gateway.url:
await self.session.commit()
await self.session.refresh(locked)
return locked
try:
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=locked,
gateway=gateway,
board=board,
auth_token=raw_token,
user=template_user,
action=action,
force_bootstrap=force_bootstrap,
reset_session=reset_session,
wake=wake,
deliver_wakeup=deliver_wakeup,
wakeup_verb=wakeup_verb,
)
except OpenClawGatewayError as exc:
locked.last_provision_error = str(exc)
locked.updated_at = utcnow()
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
return locked
except (OSError, RuntimeError, ValueError) as exc:
locked.last_provision_error = str(exc)
locked.updated_at = utcnow()
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing gateway provisioning.",
) from exc
return locked
mark_provision_complete(
locked,
status="online",
clear_confirm_token=clear_confirm_token,
)
locked.last_provision_error = None
locked.checkin_deadline_at = utcnow() + CHECKIN_DEADLINE_AFTER_WAKE if wake else None
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if wake and locked.checkin_deadline_at is not None:
enqueue_lifecycle_reconcile(
QueuedAgentLifecycleReconcile(
agent_id=locked.id,
gateway_id=locked.gateway_id,
board_id=locked.board_id,
generation=locked.lifecycle_generation,
checkin_deadline_at=locked.checkin_deadline_at,
)
)
return locked

View File

@@ -0,0 +1,122 @@
"""Queue payload helpers for stuck-agent lifecycle reconciliation."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from uuid import UUID
from app.core.config import settings
from app.core.logging import get_logger
from app.core.time import utcnow
from app.services.queue import QueuedTask, enqueue_task_with_delay
from app.services.queue import requeue_if_failed as generic_requeue_if_failed
logger = get_logger(__name__)
TASK_TYPE = "agent_lifecycle_reconcile"
@dataclass(frozen=True)
class QueuedAgentLifecycleReconcile:
"""Queued payload metadata for lifecycle reconciliation checks."""
agent_id: UUID
gateway_id: UUID
board_id: UUID | None
generation: int
checkin_deadline_at: datetime
attempts: int = 0
def _task_from_payload(payload: QueuedAgentLifecycleReconcile) -> QueuedTask:
return QueuedTask(
task_type=TASK_TYPE,
payload={
"agent_id": str(payload.agent_id),
"gateway_id": str(payload.gateway_id),
"board_id": str(payload.board_id) if payload.board_id is not None else None,
"generation": payload.generation,
"checkin_deadline_at": payload.checkin_deadline_at.isoformat(),
},
created_at=utcnow(),
attempts=payload.attempts,
)
def decode_lifecycle_task(task: QueuedTask) -> QueuedAgentLifecycleReconcile:
if task.task_type not in {TASK_TYPE, "legacy"}:
raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}")
payload: dict[str, Any] = task.payload
raw_board_id = payload.get("board_id")
board_id = UUID(raw_board_id) if isinstance(raw_board_id, str) and raw_board_id else None
raw_deadline = payload.get("checkin_deadline_at")
if not isinstance(raw_deadline, str):
raise ValueError("checkin_deadline_at is required")
return QueuedAgentLifecycleReconcile(
agent_id=UUID(str(payload["agent_id"])),
gateway_id=UUID(str(payload["gateway_id"])),
board_id=board_id,
generation=int(payload["generation"]),
checkin_deadline_at=datetime.fromisoformat(raw_deadline),
attempts=int(payload.get("attempts", task.attempts)),
)
def enqueue_lifecycle_reconcile(payload: QueuedAgentLifecycleReconcile) -> bool:
"""Enqueue a delayed reconcile check keyed to the expected check-in deadline."""
now = utcnow()
delay_seconds = max(0.0, (payload.checkin_deadline_at - now).total_seconds())
queued = _task_from_payload(payload)
ok = enqueue_task_with_delay(
queued,
settings.rq_queue_name,
delay_seconds=delay_seconds,
redis_url=settings.rq_redis_url,
)
if ok:
logger.info(
"lifecycle.queue.enqueued",
extra={
"agent_id": str(payload.agent_id),
"generation": payload.generation,
"delay_seconds": delay_seconds,
"attempt": payload.attempts,
},
)
return ok
def defer_lifecycle_reconcile(
task: QueuedTask,
*,
delay_seconds: float,
) -> bool:
"""Defer a reconcile task without incrementing retry attempts."""
payload = decode_lifecycle_task(task)
deferred = QueuedAgentLifecycleReconcile(
agent_id=payload.agent_id,
gateway_id=payload.gateway_id,
board_id=payload.board_id,
generation=payload.generation,
checkin_deadline_at=payload.checkin_deadline_at,
attempts=task.attempts,
)
queued = _task_from_payload(deferred)
return enqueue_task_with_delay(
queued,
settings.rq_queue_name,
delay_seconds=max(0.0, delay_seconds),
redis_url=settings.rq_redis_url,
)
def requeue_lifecycle_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool:
"""Requeue a failed lifecycle task with capped retries."""
return generic_requeue_if_failed(
task,
settings.rq_queue_name,
max_retries=settings.rq_dispatch_max_retries,
redis_url=settings.rq_redis_url,
delay_seconds=max(0.0, delay_seconds),
)

View File

@@ -0,0 +1,140 @@
"""Worker handlers for lifecycle reconciliation tasks."""
from __future__ import annotations
import asyncio
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services.openclaw.constants import MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.lifecycle_queue import decode_lifecycle_task, defer_lifecycle_reconcile
from app.services.queue import QueuedTask
logger = get_logger(__name__)
_RECONCILE_TIMEOUT_SECONDS = 60.0
def _has_checked_in_since_wake(agent: Agent) -> bool:
if agent.last_seen_at is None:
return False
if agent.last_wake_sent_at is None:
return True
return agent.last_seen_at >= agent.last_wake_sent_at
async def process_lifecycle_queue_task(task: QueuedTask) -> None:
"""Re-run lifecycle provisioning when an agent misses post-provision check-in."""
payload = decode_lifecycle_task(task)
now = utcnow()
async with async_session_maker() as session:
agent = await Agent.objects.by_id(payload.agent_id).first(session)
if agent is None:
logger.info(
"lifecycle.reconcile.skip_missing_agent",
extra={"agent_id": str(payload.agent_id)},
)
return
# Ignore stale queue messages after a newer lifecycle generation.
if agent.lifecycle_generation != payload.generation:
logger.info(
"lifecycle.reconcile.skip_stale_generation",
extra={
"agent_id": str(agent.id),
"queued_generation": payload.generation,
"current_generation": agent.lifecycle_generation,
},
)
return
if _has_checked_in_since_wake(agent):
logger.info(
"lifecycle.reconcile.skip_not_stuck",
extra={"agent_id": str(agent.id), "status": agent.status},
)
return
deadline = agent.checkin_deadline_at or payload.checkin_deadline_at
if agent.status == "deleting":
logger.info(
"lifecycle.reconcile.skip_deleting",
extra={"agent_id": str(agent.id)},
)
return
if now < deadline:
delay = max(0.0, (deadline - now).total_seconds())
if not defer_lifecycle_reconcile(task, delay_seconds=delay):
msg = "Failed to defer lifecycle reconcile task"
raise RuntimeError(msg)
logger.info(
"lifecycle.reconcile.deferred",
extra={"agent_id": str(agent.id), "delay_seconds": delay},
)
return
if agent.wake_attempts >= MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN:
agent.status = "offline"
agent.checkin_deadline_at = None
agent.last_provision_error = (
"Agent did not check in after wake; max wake attempts reached"
)
agent.updated_at = utcnow()
session.add(agent)
await session.commit()
logger.warning(
"lifecycle.reconcile.max_attempts_reached",
extra={
"agent_id": str(agent.id),
"wake_attempts": agent.wake_attempts,
"max_attempts": MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN,
},
)
return
gateway = await Gateway.objects.by_id(agent.gateway_id).first(session)
if gateway is None:
logger.warning(
"lifecycle.reconcile.skip_missing_gateway",
extra={"agent_id": str(agent.id), "gateway_id": str(agent.gateway_id)},
)
return
board: Board | None = None
if agent.board_id is not None:
board = await Board.objects.by_id(agent.board_id).first(session)
if board is None:
logger.warning(
"lifecycle.reconcile.skip_missing_board",
extra={"agent_id": str(agent.id), "board_id": str(agent.board_id)},
)
return
orchestrator = AgentLifecycleOrchestrator(session)
await asyncio.wait_for(
orchestrator.run_lifecycle(
gateway=gateway,
agent_id=agent.id,
board=board,
user=None,
action="update",
auth_token=None,
force_bootstrap=False,
reset_session=True,
wake=True,
deliver_wakeup=True,
wakeup_verb="updated",
clear_confirm_token=True,
raise_gateway_errors=True,
),
timeout=_RECONCILE_TIMEOUT_SECONDS,
)
logger.info(
"lifecycle.reconcile.retriggered",
extra={"agent_id": str(agent.id), "generation": payload.generation},
)

View File

@@ -63,7 +63,7 @@ class OpenClawAuthorizationPolicy:
def require_gateway_configured(gateway: Gateway) -> None:
if not gateway.url:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Gateway url is required",
)

View File

@@ -8,6 +8,7 @@ DB-backed workflows (template sync, lead-agent record creation) live in
from __future__ import annotations
import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
@@ -19,7 +20,9 @@ from app.core.config import settings
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services import souls_directory
from app.services.openclaw.constants import (
BOARD_SHARED_TEMPLATE_MAP,
DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY,
DEFAULT_GATEWAY_FILES,
DEFAULT_HEARTBEAT_CONFIG,
@@ -28,6 +31,8 @@ from app.services.openclaw.constants import (
HEARTBEAT_AGENT_TEMPLATE,
HEARTBEAT_LEAD_TEMPLATE,
IDENTITY_PROFILE_FIELDS,
LEAD_GATEWAY_FILES,
LEAD_TEMPLATE_MAP,
MAIN_TEMPLATE_MAP,
PRESERVE_AGENT_EDITABLE_FILES,
)
@@ -56,6 +61,11 @@ class ProvisionOptions:
action: str = "provision"
force_bootstrap: bool = False
overwrite: bool = False
_ROLE_SOUL_MAX_CHARS = 24_000
_ROLE_SOUL_WORD_RE = re.compile(r"[a-z0-9]+")
def _is_missing_session_error(exc: OpenClawGatewayError) -> bool:
@@ -73,6 +83,17 @@ def _is_missing_session_error(exc: OpenClawGatewayError) -> bool:
)
def _is_missing_agent_error(exc: OpenClawGatewayError) -> bool:
message = str(exc).lower()
if not message:
return False
if any(
marker in message for marker in ("unknown agent", "no such agent", "agent does not exist")
):
return True
return "agent" in message and "not found" in message
def _repo_root() -> Path:
return Path(__file__).resolve().parents[3]
@@ -175,16 +196,41 @@ def _workspace_path(agent: Agent, workspace_root: str) -> str:
return f"{root}/workspace-{slugify(key)}"
def _email_local_part(email: str) -> str:
normalized = email.strip()
if not normalized:
return ""
local, _sep, _domain = normalized.partition("@")
return local.strip() or normalized
def _display_name(user: User | None) -> str:
if user is None:
return ""
name = (user.name or "").strip()
if name:
return name
return (user.email or "").strip()
def _preferred_name(user: User | None) -> str:
preferred_name = (user.preferred_name or "") if user else ""
if preferred_name:
preferred_name = preferred_name.strip().split()[0]
return preferred_name
if preferred_name:
return preferred_name
display_name = _display_name(user)
if display_name:
if "@" in display_name:
return _email_local_part(display_name)
return display_name.split()[0]
email = (user.email or "") if user else ""
return _email_local_part(email)
def _user_context(user: User | None) -> dict[str, str]:
return {
"user_name": (user.name or "") if user else "",
"user_name": _display_name(user),
"user_preferred_name": _preferred_name(user),
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
@@ -226,6 +272,72 @@ def _identity_context(agent: Agent) -> dict[str, str]:
return {**identity_context, **extra_identity_context}
def _role_slug(role: str) -> str:
tokens = _ROLE_SOUL_WORD_RE.findall(role.strip().lower())
return "-".join(tokens)
def _select_role_soul_ref(
refs: list[souls_directory.SoulRef],
*,
role: str,
) -> souls_directory.SoulRef | None:
role_slug = _role_slug(role)
if not role_slug:
return None
exact_slug = next((ref for ref in refs if ref.slug.lower() == role_slug), None)
if exact_slug is not None:
return exact_slug
prefix_matches = [ref for ref in refs if ref.slug.lower().startswith(f"{role_slug}-")]
if prefix_matches:
return sorted(prefix_matches, key=lambda ref: len(ref.slug))[0]
contains_matches = [ref for ref in refs if role_slug in ref.slug.lower()]
if contains_matches:
return sorted(contains_matches, key=lambda ref: len(ref.slug))[0]
role_tokens = [token for token in role_slug.split("-") if token]
if len(role_tokens) < 2:
return None
scored: list[tuple[int, souls_directory.SoulRef]] = []
for ref in refs:
haystack = f"{ref.handle}-{ref.slug}".lower()
token_hits = sum(1 for token in role_tokens if token in haystack)
if token_hits >= 2:
scored.append((token_hits, ref))
if not scored:
return None
scored.sort(key=lambda item: (-item[0], len(item[1].slug)))
return scored[0][1]
async def _resolve_role_soul_markdown(role: str) -> tuple[str, str]:
if not role.strip():
return "", ""
try:
refs = await souls_directory.list_souls_directory_refs()
matched_ref = _select_role_soul_ref(refs, role=role)
if matched_ref is None:
return "", ""
content = await souls_directory.fetch_soul_markdown(
handle=matched_ref.handle,
slug=matched_ref.slug,
)
normalized = content.strip()
if not normalized:
return "", ""
if len(normalized) > _ROLE_SOUL_MAX_CHARS:
normalized = normalized[:_ROLE_SOUL_MAX_CHARS]
return normalized, matched_ref.page_url
except Exception:
# Best effort only. Provisioning must remain robust even if directory is unavailable.
return "", ""
def _build_context(
agent: Agent,
board: Board,
@@ -254,7 +366,15 @@ def _build_context(
"board_success_metrics": json.dumps(board.success_metrics or {}),
"board_target_date": board.target_date.isoformat() if board.target_date else "",
"board_goal_confirmed": str(board.goal_confirmed).lower(),
"board_rule_require_approval_for_done": str(board.require_approval_for_done).lower(),
"board_rule_require_review_before_done": str(board.require_review_before_done).lower(),
"board_rule_block_status_changes_with_pending_approval": str(
board.block_status_changes_with_pending_approval
).lower(),
"board_rule_only_lead_can_change_status": str(board.only_lead_can_change_status).lower(),
"board_rule_max_agents": str(board.max_agents),
"is_board_lead": str(agent.is_board_lead).lower(),
"is_main_agent": "false",
"session_key": session_key,
"workspace_path": workspace_path,
"base_url": base_url,
@@ -278,6 +398,7 @@ def _build_main_context(
return {
"agent_name": agent.name,
"agent_id": str(agent.id),
"is_main_agent": "true",
"session_key": agent.openclaw_session_id or "",
"base_url": base_url,
"auth_token": auth_token,
@@ -337,6 +458,9 @@ def _render_agent_files(
template_name = (
template_overrides[name] if template_overrides and name in template_overrides else name
)
if template_name == "SOUL.md":
# Use shared Jinja soul template as the default implementation.
template_name = "BOARD_SOUL.md.j2"
path = _templates_root() / template_name
if not path.exists():
msg = f"Missing template file: {template_name}"
@@ -394,6 +518,10 @@ class GatewayControlPlane(ABC):
async def set_agent_file(self, *, agent_id: str, name: str, content: str) -> None:
raise NotImplementedError
@abstractmethod
async def delete_agent_file(self, *, agent_id: str, name: str) -> None:
raise NotImplementedError
@abstractmethod
async def patch_agent_heartbeats(
self,
@@ -501,6 +629,13 @@ class OpenClawGatewayControlPlane(GatewayControlPlane):
config=self._config,
)
async def delete_agent_file(self, *, agent_id: str, name: str) -> None:
await openclaw_call(
"agents.files.delete",
{"agentId": agent_id, "name": name},
config=self._config,
)
async def patch_agent_heartbeats(
self,
entries: list[tuple[str, str, dict[str, Any]]],
@@ -603,36 +738,62 @@ class BaseAgentLifecycleManager(ABC):
) -> dict[str, str]:
raise NotImplementedError
def _template_overrides(self) -> dict[str, str] | None:
async def _augment_context(
self,
*,
agent: Agent,
context: dict[str, str],
) -> dict[str, str]:
_ = agent
return context
def _template_overrides(self, agent: Agent) -> dict[str, str] | None:
return None
def _preserve_files(self) -> set[str]:
def _file_names(self, agent: Agent) -> set[str]:
_ = agent
return set(DEFAULT_GATEWAY_FILES)
def _preserve_files(self, agent: Agent) -> set[str]:
_ = agent
"""Files that are expected to evolve inside the agent workspace."""
return set(PRESERVE_AGENT_EDITABLE_FILES)
def _allow_stale_file_deletion(self, agent: Agent) -> bool:
_ = agent
return False
def _stale_file_candidates(self, agent: Agent) -> set[str]:
_ = agent
return set()
async def _set_agent_files(
self,
*,
agent: Agent | None = None,
agent_id: str,
rendered: dict[str, str],
desired_file_names: set[str] | None = None,
existing_files: dict[str, dict[str, Any]],
action: str,
overwrite: bool = False,
) -> None:
preserve_files = (
self._preserve_files(agent) if agent is not None else set(PRESERVE_AGENT_EDITABLE_FILES)
)
target_file_names = desired_file_names or set(rendered.keys())
unsupported_names: list[str] = []
for name, content in rendered.items():
if content == "":
continue
# Preserve "editable" files only during updates. During first-time provisioning,
# the gateway may pre-create defaults for USER/SELF/etc, and we still want to
# the gateway may pre-create defaults for USER/MEMORY/etc, and we still want to
# apply Mission Control's templates.
if action == "update" and name in self._preserve_files():
if action == "update" and not overwrite and name in preserve_files:
entry = existing_files.get(name)
if entry and not bool(entry.get("missing")):
size = entry.get("size")
if isinstance(size, int) and size == 0:
# Treat 0-byte placeholders as missing so update can fill them.
pass
else:
continue
continue
try:
await self._control_plane.set_agent_file(
agent_id=agent_id,
@@ -641,6 +802,38 @@ class BaseAgentLifecycleManager(ABC):
)
except OpenClawGatewayError as exc:
if "unsupported file" in str(exc).lower():
unsupported_names.append(name)
continue
raise
if agent is not None and agent.is_board_lead and unsupported_names:
unsupported_sorted = ", ".join(sorted(set(unsupported_names)))
msg = (
"Gateway rejected required lead workspace files as unsupported: "
f"{unsupported_sorted}"
)
raise RuntimeError(msg)
if agent is None or not self._allow_stale_file_deletion(agent):
return
stale_names = (
set(existing_files.keys()) & self._stale_file_candidates(agent)
) - target_file_names
for name in sorted(stale_names):
try:
await self._control_plane.delete_agent_file(agent_id=agent_id, name=name)
except OpenClawGatewayError as exc:
message = str(exc).lower()
if any(
marker in message
for marker in (
"unsupported",
"unknown method",
"not found",
"no such file",
)
):
continue
raise
@@ -679,9 +872,10 @@ class BaseAgentLifecycleManager(ABC):
user=user,
board=board,
)
context = await self._augment_context(agent=agent, context=context)
# Always attempt to sync Mission Control's full template set.
# Do not introspect gateway defaults (avoids touching gateway "main" agent state).
file_names = set(DEFAULT_GATEWAY_FILES)
file_names = self._file_names(agent)
existing_files = await self._control_plane.list_agent_files(agent_id)
include_bootstrap = _should_include_bootstrap(
action=options.action,
@@ -693,14 +887,17 @@ class BaseAgentLifecycleManager(ABC):
agent,
file_names,
include_bootstrap=include_bootstrap,
template_overrides=self._template_overrides(),
template_overrides=self._template_overrides(agent),
)
await self._set_agent_files(
agent=agent,
agent_id=agent_id,
rendered=rendered,
desired_file_names=set(rendered.keys()),
existing_files=existing_files,
action=options.action,
overwrite=options.overwrite,
)
@@ -723,6 +920,55 @@ class BoardAgentLifecycleManager(BaseAgentLifecycleManager):
raise ValueError(msg)
return _build_context(agent, board, self._gateway, auth_token, user)
async def _augment_context(
self,
*,
agent: Agent,
context: dict[str, str],
) -> dict[str, str]:
context = dict(context)
if agent.is_board_lead:
context["directory_role_soul_markdown"] = ""
context["directory_role_soul_source_url"] = ""
return context
role = (context.get("identity_role") or "").strip()
markdown, source_url = await _resolve_role_soul_markdown(role)
context["directory_role_soul_markdown"] = markdown
context["directory_role_soul_source_url"] = source_url
return context
def _template_overrides(self, agent: Agent) -> dict[str, str] | None:
overrides = dict(BOARD_SHARED_TEMPLATE_MAP)
if agent.is_board_lead:
overrides.update(LEAD_TEMPLATE_MAP)
return overrides
def _file_names(self, agent: Agent) -> set[str]:
if agent.is_board_lead:
return set(LEAD_GATEWAY_FILES)
return super()._file_names(agent)
def _allow_stale_file_deletion(self, agent: Agent) -> bool:
return bool(agent.is_board_lead)
def _stale_file_candidates(self, agent: Agent) -> set[str]:
if not agent.is_board_lead:
return set()
return (
set(DEFAULT_GATEWAY_FILES)
| set(LEAD_GATEWAY_FILES)
| {
"USER.md",
"ROUTING.md",
"LEARNINGS.md",
"ROLE.md",
"WORKFLOW.md",
"STATUS.md",
"APIS.md",
}
)
class GatewayMainAgentLifecycleManager(BaseAgentLifecycleManager):
"""Provisioning manager for organization gateway-main agents."""
@@ -741,13 +987,15 @@ class GatewayMainAgentLifecycleManager(BaseAgentLifecycleManager):
_ = board
return _build_main_context(agent, self._gateway, auth_token, user)
def _template_overrides(self) -> dict[str, str] | None:
def _template_overrides(self, agent: Agent) -> dict[str, str] | None:
_ = agent
return MAIN_TEMPLATE_MAP
def _preserve_files(self) -> set[str]:
def _preserve_files(self, agent: Agent) -> set[str]:
_ = agent
# For gateway-main agents, USER.md is system-managed (derived from org/user context),
# so keep it in sync even during updates.
preserved = super()._preserve_files()
preserved = super()._preserve_files(agent)
preserved.discard("USER.md")
return preserved
@@ -757,7 +1005,12 @@ def _control_plane_for_gateway(gateway: Gateway) -> OpenClawGatewayControlPlane:
msg = "Gateway url is required"
raise OpenClawGatewayError(msg)
return OpenClawGatewayControlPlane(
GatewayClientConfig(url=gateway.url, token=gateway.token),
GatewayClientConfig(
url=gateway.url,
token=gateway.token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
),
)
@@ -791,8 +1044,8 @@ def _should_include_bootstrap(
def _wakeup_text(agent: Agent, *, verb: str) -> str:
return (
f"Hello {agent.name}. Your workspace has been {verb}.\n\n"
"Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run it once "
"then delete it. Begin heartbeats after startup."
"Start the agent. If BOOTSTRAP.md exists, read it first, then read AGENTS.md. "
"Begin heartbeats after startup."
)
@@ -824,6 +1077,7 @@ class OpenClawGatewayProvisioner:
user: User | None,
action: str = "provision",
force_bootstrap: bool = False,
overwrite: bool = False,
reset_session: bool = False,
wake: bool = True,
deliver_wakeup: bool = True,
@@ -833,7 +1087,7 @@ class OpenClawGatewayProvisioner:
Lifecycle steps (same for all agent types):
1) create agent (idempotent)
2) set/update all template files (best-effort for unsupported files)
2) set/update all template files
3) wake the agent session (chat.send)
"""
@@ -867,7 +1121,11 @@ class OpenClawGatewayProvisioner:
session_key=session_key,
auth_token=auth_token,
user=user,
options=ProvisionOptions(action=action, force_bootstrap=force_bootstrap),
options=ProvisionOptions(
action=action,
force_bootstrap=force_bootstrap,
overwrite=overwrite,
),
session_label=agent.name or "Gateway Agent",
)
@@ -881,7 +1139,12 @@ class OpenClawGatewayProvisioner:
if not wake:
return
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
client_config = GatewayClientConfig(
url=gateway.url,
token=gateway.token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
)
await ensure_session(session_key, config=client_config, label=agent.name)
verb = wakeup_verb or ("provisioned" if action == "provision" else "updated")
await send_message(
@@ -915,7 +1178,11 @@ class OpenClawGatewayProvisioner:
agent_gateway_id = GatewayAgentIdentity.openclaw_agent_id(gateway)
else:
agent_gateway_id = _agent_key(agent)
await control_plane.delete_agent(agent_gateway_id, delete_files=delete_files)
try:
await control_plane.delete_agent(agent_gateway_id, delete_files=delete_files)
except OpenClawGatewayError as exc:
if not _is_missing_agent_error(exc):
raise
if delete_session:
if agent.board_id is None:

View File

@@ -29,7 +29,9 @@ from app.db.pagination import paginate
from app.db.session import async_session_maker
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.board_memory import BoardMemory
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organizations import Organization
@@ -50,8 +52,6 @@ from app.services.openclaw.constants import (
OFFLINE_AFTER,
)
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
@@ -72,6 +72,7 @@ from app.services.openclaw.internal.session_keys import (
board_agent_session_key,
board_lead_session_key,
)
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.policies import OpenClawAuthorizationPolicy
from app.services.openclaw.provisioning import (
OpenClawGatewayControlPlane,
@@ -108,9 +109,11 @@ class GatewayTemplateSyncOptions:
user: User | None
include_main: bool = True
lead_only: bool = False
reset_sessions: bool = False
rotate_tokens: bool = False
force_bootstrap: bool = False
overwrite: bool = False
board_id: UUID | None = None
@@ -139,7 +142,6 @@ class OpenClawProvisioningService(OpenClawDBService):
def __init__(self, session: AsyncSession) -> None:
super().__init__(session)
self._gateway = OpenClawGatewayProvisioner()
@staticmethod
def lead_session_key(board: Board) -> str:
@@ -209,25 +211,25 @@ class OpenClawProvisioningService(OpenClawDBService):
openclaw_session_id=self.lead_session_key(board),
)
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action=config_options.action, status="provisioning")
await self.add_commit_refresh(agent)
# Strict behavior: provisioning errors surface to the caller. The DB row exists
# so a later retry can succeed with the same deterministic identity/session key.
await self._gateway.apply_agent_lifecycle(
agent=agent,
agent = await AgentLifecycleOrchestrator(self.session).run_lifecycle(
gateway=request.gateway,
agent_id=agent.id,
board=board,
auth_token=raw_token,
user=request.user,
action=config_options.action,
auth_token=raw_token,
force_bootstrap=False,
reset_session=False,
wake=True,
deliver_wakeup=True,
wakeup_verb=None,
clear_confirm_token=False,
raise_gateway_errors=True,
)
mark_provision_complete(agent, status="online")
await self.add_commit_refresh(agent)
return agent, True
async def sync_gateway_templates(
@@ -245,12 +247,29 @@ class OpenClawProvisioningService(OpenClawDBService):
options = GatewayTemplateSyncOptions(
user=template_user,
include_main=options.include_main,
lead_only=options.lead_only,
reset_sessions=options.reset_sessions,
rotate_tokens=options.rotate_tokens,
force_bootstrap=options.force_bootstrap,
overwrite=options.overwrite,
board_id=options.board_id,
)
if template_user is None:
result = _base_result(
gateway,
include_main=options.include_main,
reset_sessions=options.reset_sessions,
)
_append_sync_error(
result,
message=(
"Organization owner not found (required for gateway template USER.md "
"rendering)."
),
)
return result
result = _base_result(
gateway,
include_main=options.include_main,
@@ -264,7 +283,12 @@ class OpenClawProvisioningService(OpenClawDBService):
return result
control_plane = OpenClawGatewayControlPlane(
GatewayClientConfig(url=gateway.url, token=gateway.token),
GatewayClientConfig(
url=gateway.url,
token=gateway.token,
allow_insecure_tls=gateway.allow_insecure_tls,
disable_device_pairing=gateway.disable_device_pairing,
),
)
ctx = _SyncContext(
session=self.session,
@@ -272,7 +296,6 @@ class OpenClawProvisioningService(OpenClawDBService):
control_plane=control_plane,
backoff=GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"),
options=options,
provisioner=self._gateway,
)
if not await _ping_gateway(ctx, result):
return result
@@ -287,11 +310,12 @@ class OpenClawProvisioningService(OpenClawDBService):
return result
paused_board_ids = await _paused_board_ids(self.session, list(boards_by_id.keys()))
if boards_by_id:
agents = await (
Agent.objects.by_field_in("board_id", list(boards_by_id.keys()))
.order_by(col(Agent.created_at).asc())
.all(self.session)
query = Agent.objects.by_field_in("board_id", list(boards_by_id.keys())).order_by(
col(Agent.created_at).asc(),
)
if options.lead_only:
query = query.filter(col(Agent.is_board_lead).is_(True))
agents = await query.all(self.session)
else:
agents = []
@@ -325,7 +349,6 @@ class _SyncContext:
control_plane: OpenClawGatewayControlPlane
backoff: GatewayBackoff
options: GatewayTemplateSyncOptions
provisioner: OpenClawGatewayProvisioner
def _parse_tools_md(content: str) -> dict[str, str]:
@@ -557,17 +580,26 @@ async def _sync_one_agent(
try:
async def _do_provision() -> bool:
await ctx.provisioner.apply_agent_lifecycle(
agent=agent,
gateway=ctx.gateway,
board=board,
auth_token=auth_token,
user=ctx.options.user,
action="update",
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
wake=False,
)
try:
await AgentLifecycleOrchestrator(ctx.session).run_lifecycle(
gateway=ctx.gateway,
agent_id=agent.id,
board=board,
user=ctx.options.user,
action="update",
auth_token=auth_token,
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
wake=False,
deliver_wakeup=False,
wakeup_verb="updated",
clear_confirm_token=False,
raise_gateway_errors=True,
)
except HTTPException as exc:
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
raise OpenClawGatewayError(str(exc.detail)) from exc
raise
return True
await ctx.backoff.run(_do_provision)
@@ -585,6 +617,15 @@ async def _sync_one_agent(
message=f"Failed to sync templates: {exc}",
)
return False
except HTTPException as exc:
result.agents_skipped += 1
_append_sync_error(
result,
agent=agent,
board=board,
message=f"Failed to sync templates: {exc.detail}",
)
return False
else:
return False
@@ -627,17 +668,26 @@ async def _sync_main_agent(
try:
async def _do_provision_main() -> bool:
await ctx.provisioner.apply_agent_lifecycle(
agent=main_agent,
gateway=ctx.gateway,
board=None,
auth_token=token,
user=ctx.options.user,
action="update",
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
wake=False,
)
try:
await AgentLifecycleOrchestrator(ctx.session).run_lifecycle(
gateway=ctx.gateway,
agent_id=main_agent.id,
board=None,
user=ctx.options.user,
action="update",
auth_token=token,
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
wake=False,
deliver_wakeup=False,
wakeup_verb="updated",
clear_confirm_token=False,
raise_gateway_errors=True,
)
except HTTPException as exc:
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
raise OpenClawGatewayError(str(exc.detail)) from exc
raise
return True
await ctx.backoff.run(_do_provision_main)
@@ -650,6 +700,12 @@ async def _sync_main_agent(
agent=main_agent,
message=f"Failed to sync gateway agent templates: {exc}",
)
except HTTPException as exc:
_append_sync_error(
result,
agent=main_agent,
message=f"Failed to sync gateway agent templates: {exc.detail}",
)
else:
result.main_updated = True
return stop_sync
@@ -732,7 +788,7 @@ class AgentLifecycleService(OpenClawDBService):
if existing:
return existing
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Gateway main agent session key is required",
)
if agent.is_board_lead:
@@ -743,7 +799,7 @@ class AgentLifecycleService(OpenClawDBService):
def workspace_path(cls, agent_name: str, workspace_root: str | None) -> str:
if not workspace_root:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Gateway workspace_root is required",
)
root = workspace_root.rstrip("/")
@@ -758,7 +814,7 @@ class AgentLifecycleService(OpenClawDBService):
) -> Board:
if not board_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="board_id is required",
)
board = await Board.objects.by_id(board_id).first(self.session)
@@ -922,6 +978,49 @@ class AgentLifecycleService(OpenClawDBService):
return payload
async def count_non_lead_agents_for_board(
self,
*,
board_id: UUID,
) -> int:
"""Count board-scoped non-lead agents for spawn limit checks."""
statement = (
select(func.count(col(Agent.id)))
.where(col(Agent.board_id) == board_id)
.where(col(Agent.is_board_lead).is_(False))
)
count = (await self.session.exec(statement)).one()
return int(count or 0)
async def enforce_board_spawn_limit_for_lead(
self,
*,
board: Board,
actor: ActorContextLike,
) -> None:
"""Enforce `board.max_agents` when creation is requested by a lead agent.
The cap excludes the board lead itself.
"""
if actor.actor_type != "agent":
return
if actor.agent is None or not actor.agent.is_board_lead:
return
worker_count = await self.count_non_lead_agents_for_board(board_id=board.id)
if worker_count < board.max_agents:
return
noun = "agent" if board.max_agents == 1 else "agents"
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=(
"Board worker-agent limit reached: "
f"max_agents={board.max_agents} (excluding the lead); "
f"cannot create more than {board.max_agents} {noun}."
),
)
async def ensure_unique_agent_name(
self,
*,
@@ -966,7 +1065,6 @@ class AgentLifecycleService(OpenClawDBService):
) -> tuple[Agent, str]:
agent = Agent.model_validate(data)
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="provision", status="provisioning")
agent.openclaw_session_id = self.resolve_session_key(agent)
await self.add_commit_refresh(agent)
return agent, raw_token
@@ -996,92 +1094,63 @@ class AgentLifecycleService(OpenClawDBService):
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="board is required for non-main agent provisioning",
)
template_user = user
if target.is_main_agent and template_user is None:
template_user = await get_org_owner_user(
self.session,
organization_id=target.gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
"User context is required to provision the gateway main agent "
"(org owner not found)."
),
)
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=agent,
provisioned = await AgentLifecycleOrchestrator(self.session).run_lifecycle(
gateway=target.gateway,
agent_id=agent.id,
board=target.board if not target.is_main_agent else None,
auth_token=auth_token,
user=template_user,
user=user,
action=action,
auth_token=auth_token,
force_bootstrap=force_bootstrap,
reset_session=True,
wake=True,
deliver_wakeup=True,
wakeup_verb=wakeup_verb,
clear_confirm_token=True,
raise_gateway_errors=raise_gateway_errors,
)
mark_provision_complete(agent, status="online", clear_confirm_token=True)
self.session.add(agent)
await self.session.commit()
record_activity(
self.session,
event_type=f"agent.{action}.direct",
message=f"{action.capitalize()}d directly for {agent.name}.",
agent_id=agent.id,
message=f"{action.capitalize()}d directly for {provisioned.name}.",
agent_id=provisioned.id,
)
record_activity(
self.session,
event_type="agent.wakeup.sent",
message=f"Wakeup message sent to {agent.name}.",
agent_id=agent.id,
message=f"Wakeup message sent to {provisioned.name}.",
agent_id=provisioned.id,
)
await self.session.commit()
self.logger.info(
"agent.provision.success action=%s agent_id=%s",
action,
agent.id,
provisioned.id,
)
except OpenClawGatewayError as exc:
except HTTPException as exc:
self.record_instruction_failure(
self.session,
agent,
str(exc),
str(exc.detail),
action,
)
await self.session.commit()
self.logger.error(
"agent.provision.gateway_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc),
)
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
self.logger.error(
"agent.provision.gateway_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc.detail),
)
else:
self.logger.critical(
"agent.provision.runtime_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc.detail),
)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover
self.record_instruction_failure(
self.session,
agent,
str(exc),
action,
)
await self.session.commit()
self.logger.critical(
"agent.provision.runtime_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc),
)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing agent provisioning.",
) from exc
raise
async def provision_new_agent(
self,
@@ -1154,7 +1223,7 @@ class AgentLifecycleService(OpenClawDBService):
elif make_main is not None:
if "board_id" not in updates or updates["board_id"] is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=(
"board_id is required when converting a gateway-main agent "
"to board scope"
@@ -1163,7 +1232,7 @@ class AgentLifecycleService(OpenClawDBService):
board = await self.require_board(updates["board_id"])
if board.gateway_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Board gateway_id is required",
)
updates["gateway_id"] = board.gateway_id
@@ -1173,7 +1242,7 @@ class AgentLifecycleService(OpenClawDBService):
board = await self.require_board(updates["board_id"])
if board.gateway_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Board gateway_id is required",
)
updates["gateway_id"] = board.gateway_id
@@ -1188,7 +1257,7 @@ class AgentLifecycleService(OpenClawDBService):
board = await self.require_board(agent.board_id)
if board.gateway_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Board gateway_id is required",
)
agent.gateway_id = board.gateway_id
@@ -1211,7 +1280,7 @@ class AgentLifecycleService(OpenClawDBService):
if make_main:
if gateway_for_main is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Gateway agent requires a gateway configuration",
)
return AgentUpdateProvisionTarget(
@@ -1229,7 +1298,7 @@ class AgentLifecycleService(OpenClawDBService):
if agent.board_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="board_id is required for non-main agents",
)
board = await self.require_board(agent.board_id)
@@ -1243,7 +1312,6 @@ class AgentLifecycleService(OpenClawDBService):
@staticmethod
def mark_agent_update_pending(agent: Agent) -> str:
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="update", status="updating")
return raw_token
async def provision_updated_agent(
@@ -1318,7 +1386,6 @@ class AgentLifecycleService(OpenClawDBService):
return
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="provision", status="provisioning")
await self.add_commit_refresh(agent)
board = await self.require_board(
str(agent.board_id) if agent.board_id else None,
@@ -1364,6 +1431,10 @@ class AgentLifecycleService(OpenClawDBService):
elif agent.status == "provisioning":
agent.status = "online"
agent.last_seen_at = utcnow()
# Successful check-in ends the current wake escalation cycle.
agent.wake_attempts = 0
agent.checkin_deadline_at = None
agent.last_provision_error = None
agent.updated_at = utcnow()
self.record_heartbeat(self.session, agent)
self.session.add(agent)
@@ -1484,6 +1555,7 @@ class AgentLifecycleService(OpenClawDBService):
user=actor.user if actor.actor_type == "user" else None,
write=actor.actor_type == "user",
)
await self.enforce_board_spawn_limit_for_lead(board=board, actor=actor)
gateway, _client_config = await self.require_gateway(board)
data = payload.model_dump()
data["gateway_id"] = gateway.id
@@ -1775,6 +1847,21 @@ class AgentLifecycleService(OpenClawDBService):
agent_id=None,
commit=False,
)
await crud.update_where(
self.session,
Approval,
col(Approval.agent_id) == agent.id,
agent_id=None,
commit=False,
)
await crud.update_where(
self.session,
BoardWebhook,
col(BoardWebhook.agent_id) == agent.id,
agent_id=None,
updated_at=now,
commit=False,
)
await self.session.delete(agent)
await self.session.commit()

View File

@@ -20,6 +20,8 @@ from app.schemas.gateway_api import (
GatewaysStatusResponse,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.error_messages import normalize_gateway_error_message
from app.services.openclaw.gateway_compat import check_gateway_version_compatibility
from app.services.openclaw.gateway_resolver import gateway_client_config, require_gateway_for_board
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import (
@@ -44,9 +46,11 @@ class GatewayTemplateSyncQuery:
"""Sync options parsed from query args for gateway template operations."""
include_main: bool
lead_only: bool
reset_sessions: bool
rotate_tokens: bool
force_bootstrap: bool
overwrite: bool
board_id: UUID | None
@@ -61,11 +65,15 @@ class GatewaySessionService(OpenClawDBService):
board_id: str | None,
gateway_url: str | None,
gateway_token: str | None,
gateway_disable_device_pairing: bool = False,
gateway_allow_insecure_tls: bool = False,
) -> GatewayResolveQuery:
return GatewayResolveQuery(
board_id=board_id,
gateway_url=gateway_url,
gateway_token=gateway_token,
gateway_disable_device_pairing=gateway_disable_device_pairing,
gateway_allow_insecure_tls=gateway_allow_insecure_tls,
)
@staticmethod
@@ -98,7 +106,7 @@ class GatewaySessionService(OpenClawDBService):
raw_url = params.gateway_url.strip()
if not raw_url:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="board_id or gateway_url is required",
)
return (
@@ -106,12 +114,14 @@ class GatewaySessionService(OpenClawDBService):
GatewayClientConfig(
url=raw_url,
token=(params.gateway_token or "").strip() or None,
allow_insecure_tls=params.gateway_allow_insecure_tls,
disable_device_pairing=params.gateway_disable_device_pairing,
),
None,
)
if not params.board_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="board_id or gateway_url is required",
)
board = await Board.objects.by_id(params.board_id).first(self.session)
@@ -141,7 +151,7 @@ class GatewaySessionService(OpenClawDBService):
board, config, main_session = await self.resolve_gateway(params, user=user)
if board is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="board_id is required",
)
return board, config, main_session
@@ -186,6 +196,20 @@ class GatewaySessionService(OpenClawDBService):
) -> GatewaysStatusResponse:
board, config, main_session = await self.resolve_gateway(params, user=user)
self._require_same_org(board, organization_id)
try:
compatibility = await check_gateway_version_compatibility(config)
except OpenClawGatewayError as exc:
return GatewaysStatusResponse(
connected=False,
gateway_url=config.url,
error=normalize_gateway_error_message(str(exc)),
)
if not compatibility.compatible:
return GatewaysStatusResponse(
connected=False,
gateway_url=config.url,
error=compatibility.message,
)
try:
sessions = await openclaw_call("sessions.list", config=config)
if isinstance(sessions, dict):
@@ -217,7 +241,7 @@ class GatewaySessionService(OpenClawDBService):
return GatewaysStatusResponse(
connected=False,
gateway_url=config.url,
error=str(exc),
error=normalize_gateway_error_message(str(exc)),
)
async def get_sessions(

View File

@@ -2,12 +2,16 @@
from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable
from datetime import datetime
from typing import TYPE_CHECKING
from fastapi import HTTPException, status
from sqlalchemy import or_
from sqlalchemy.exc import IntegrityError
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.time import utcnow
from app.db import crud
@@ -17,13 +21,13 @@ from app.models.organization_invite_board_access import OrganizationInviteBoardA
from app.models.organization_invites import OrganizationInvite
from app.models.organization_members import OrganizationMember
from app.models.organizations import Organization
from app.models.skills import SkillPack
from app.models.users import User
if TYPE_CHECKING:
from uuid import UUID
from sqlalchemy.sql.elements import ColumnElement
from sqlmodel.ext.asyncio.session import AsyncSession
from app.schemas.organizations import (
OrganizationBoardAccessSpec,
@@ -31,6 +35,30 @@ if TYPE_CHECKING:
)
DEFAULT_ORG_NAME = "Personal"
def _normalize_skill_pack_source_url(source_url: str) -> str:
"""Normalize pack source URL so duplicates with trivial formatting differences match."""
normalized = str(source_url).strip().rstrip("/")
if normalized.endswith(".git"):
return normalized[: -len(".git")]
return normalized
DEFAULT_INSTALLER_SKILL_PACKS = (
(
"sickn33/antigravity-awesome-skills",
"antigravity-awesome-skills",
"The Ultimate Collection of 800+ Agentic Skills for Claude Code/Antigravity/Cursor. "
"Battle-tested, high-performance skills for AI agents including official skills from "
"Anthropic and Vercel.",
),
(
"BrianRWagner/ai-marketing-skills",
"ai-marketing-skills",
"Marketing frameworks that AI actually executes. Use for Claude Code, OpenClaw, etc.",
),
)
ADMIN_ROLES = {"owner", "admin"}
ROLE_RANK = {"member": 0, "admin": 1, "owner": 2}
@@ -175,6 +203,8 @@ async def accept_invite(
session.add(member)
await session.flush()
# For scoped invites, copy invite board-access rows onto the member at accept
# time so effective permissions survive invite lifecycle cleanup.
if not (invite.all_boards_read or invite.all_boards_write):
access_rows = list(
await session.exec(
@@ -207,6 +237,42 @@ async def accept_invite(
return member
def _get_default_skill_pack_records(org_id: UUID, now: "datetime") -> list[SkillPack]:
"""Build default installer skill pack rows for a new organization."""
source_base = "https://github.com"
seen_urls: set[str] = set()
records: list[SkillPack] = []
for repo, name, description in DEFAULT_INSTALLER_SKILL_PACKS:
source_url = _normalize_skill_pack_source_url(f"{source_base}/{repo}")
if source_url in seen_urls:
continue
seen_urls.add(source_url)
records.append(
SkillPack(
organization_id=org_id,
name=name,
description=description,
source_url=source_url,
created_at=now,
updated_at=now,
),
)
return records
async def _fetch_existing_default_pack_sources(
session: AsyncSession,
org_id: UUID,
) -> set[str]:
"""Return existing default skill pack URLs for the organization."""
if not isinstance(session, AsyncSession):
return set()
return {
_normalize_skill_pack_source_url(row.source_url)
for row in await SkillPack.objects.filter_by(organization_id=org_id).all(session)
}
async def ensure_member_for_user(
session: AsyncSession,
user: User,
@@ -248,10 +314,41 @@ async def ensure_member_for_user(
created_at=now,
updated_at=now,
)
default_skill_packs = _get_default_skill_pack_records(org_id=org_id, now=now)
existing_pack_urls = await _fetch_existing_default_pack_sources(session, org_id)
normalized_existing_pack_urls = {
_normalize_skill_pack_source_url(existing_pack_source)
for existing_pack_source in existing_pack_urls
}
user.active_organization_id = org_id
session.add(user)
session.add(member)
await session.commit()
try:
await session.commit()
except IntegrityError:
await session.rollback()
existing_member = await get_first_membership(session, user.id)
if existing_member is None:
raise
if user.active_organization_id != existing_member.organization_id:
user.active_organization_id = existing_member.organization_id
session.add(user)
await session.commit()
await session.refresh(existing_member)
return existing_member
for pack in default_skill_packs:
normalized_source_url = _normalize_skill_pack_source_url(pack.source_url)
if normalized_source_url in normalized_existing_pack_urls:
continue
session.add(pack)
try:
await session.commit()
except IntegrityError:
await session.rollback()
normalized_existing_pack_urls.add(normalized_source_url)
continue
await session.refresh(member)
return member

View File

@@ -0,0 +1,300 @@
"""Generic Redis-backed queue helpers for RQ-backed background workloads."""
from __future__ import annotations
import json
import time
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any, cast
import redis
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
_SCHEDULED_SUFFIX = ":scheduled"
_DRY_RUN_BATCH_SIZE = 100
@dataclass(frozen=True)
class QueuedTask:
"""Generic queued task envelope."""
task_type: str
payload: dict[str, Any]
created_at: datetime
attempts: int = 0
def to_json(self) -> str:
return json.dumps(
{
"task_type": self.task_type,
"payload": self.payload,
"created_at": self.created_at.isoformat(),
"attempts": self.attempts,
},
sort_keys=True,
)
def _redis_client(redis_url: str | None = None) -> redis.Redis:
return redis.Redis.from_url(redis_url or settings.rq_redis_url)
def _scheduled_queue_name(queue_name: str) -> str:
return f"{queue_name}{_SCHEDULED_SUFFIX}"
def _now_seconds() -> float:
return time.time()
def _drain_ready_scheduled_tasks(
client: redis.Redis,
queue_name: str,
*,
max_items: int = _DRY_RUN_BATCH_SIZE,
) -> float | None:
scheduled_queue = _scheduled_queue_name(queue_name)
now = _now_seconds()
ready_items = cast(
list[str | bytes],
client.zrangebyscore(
scheduled_queue,
"-inf",
now,
start=0,
num=max_items,
),
)
if ready_items:
ready_values = tuple(ready_items)
client.lpush(queue_name, *ready_values)
client.zrem(scheduled_queue, *ready_values)
logger.debug(
"rq.queue.drain_ready_scheduled",
extra={
"queue_name": queue_name,
"count": len(ready_items),
},
)
next_item = cast(
list[tuple[str | bytes, float]],
client.zrangebyscore(
scheduled_queue,
now,
"+inf",
start=0,
num=1,
withscores=True,
),
)
if not next_item:
return None
next_score = float(next_item[0][1])
return max(0.0, next_score - now)
def _schedule_for_later(
task: QueuedTask,
queue_name: str,
delay_seconds: float,
*,
redis_url: str | None = None,
) -> bool:
client = _redis_client(redis_url=redis_url)
scheduled_queue = _scheduled_queue_name(queue_name)
score = _now_seconds() + delay_seconds
client.zadd(scheduled_queue, {task.to_json(): score})
logger.info(
"rq.queue.scheduled",
extra={
"task_type": task.task_type,
"queue_name": queue_name,
"delay_seconds": delay_seconds,
},
)
return True
def enqueue_task(
task: QueuedTask,
queue_name: str,
*,
redis_url: str | None = None,
) -> bool:
"""Persist a task envelope in a Redis list-backed queue."""
try:
client = _redis_client(redis_url=redis_url)
client.lpush(queue_name, task.to_json())
logger.info(
"rq.queue.enqueued",
extra={
"task_type": task.task_type,
"queue_name": queue_name,
"attempt": task.attempts,
},
)
return True
except Exception as exc:
logger.warning(
"rq.queue.enqueue_failed",
extra={"task_type": task.task_type, "queue_name": queue_name, "error": str(exc)},
)
return False
def enqueue_task_with_delay(
task: QueuedTask,
queue_name: str,
*,
delay_seconds: float,
redis_url: str | None = None,
) -> bool:
"""Enqueue a task immediately or schedule it for delayed delivery."""
delay = max(0.0, float(delay_seconds))
if delay == 0:
return enqueue_task(task, queue_name, redis_url=redis_url)
try:
return _schedule_for_later(task, queue_name, delay, redis_url=redis_url)
except Exception as exc:
logger.warning(
"rq.queue.schedule_failed",
extra={
"task_type": task.task_type,
"queue_name": queue_name,
"delay_seconds": delay,
"error": str(exc),
},
)
return False
def _coerce_datetime(raw: object | None) -> datetime:
if raw is None:
return datetime.now(UTC)
if isinstance(raw, str):
try:
return datetime.fromisoformat(raw)
except ValueError:
return datetime.now(UTC)
if isinstance(raw, (int, float)):
try:
return datetime.fromtimestamp(raw, tz=UTC)
except (TypeError, ValueError, OverflowError):
return datetime.now(UTC)
return datetime.now(UTC)
def dequeue_task(
queue_name: str,
*,
redis_url: str | None = None,
block: bool = False,
block_timeout: float = 0,
) -> QueuedTask | None:
"""Pop one task envelope from the queue."""
client = _redis_client(redis_url=redis_url)
timeout = max(0.0, float(block_timeout))
raw: str | bytes | None
if block:
next_delay = _drain_ready_scheduled_tasks(client, queue_name)
if timeout == 0:
timeout = next_delay if next_delay is not None else 0
else:
timeout = min(timeout, next_delay) if next_delay is not None else timeout
raw_result = cast(
tuple[bytes | str, bytes | str] | None,
client.brpop([queue_name], timeout=timeout),
)
if raw_result is None:
_drain_ready_scheduled_tasks(client, queue_name)
return None
raw = raw_result[1]
else:
raw = cast(str | bytes | None, client.rpop(queue_name))
if raw is None:
_drain_ready_scheduled_tasks(client, queue_name)
return None
return _decode_task(raw, queue_name)
def _decode_task(raw: str | bytes, queue_name: str) -> QueuedTask:
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
try:
payload: dict[str, Any] = json.loads(raw)
if "task_type" not in payload and "payload" not in payload:
return QueuedTask(
task_type="legacy",
payload=payload,
created_at=_coerce_datetime(
payload.get("created_at") or payload.get("received_at")
),
attempts=int(payload.get("attempts", 0)),
)
return QueuedTask(
task_type=str(payload["task_type"]),
payload=payload["payload"],
created_at=datetime.fromisoformat(payload["created_at"]),
attempts=int(payload.get("attempts", 0)),
)
except Exception as exc:
logger.error(
"rq.queue.dequeue_failed",
extra={"queue_name": queue_name, "raw_payload": str(raw), "error": str(exc)},
)
raise
def _requeue_with_attempt(task: QueuedTask) -> QueuedTask:
return QueuedTask(
task_type=task.task_type,
payload=task.payload,
created_at=task.created_at,
attempts=task.attempts + 1,
)
def requeue_if_failed(
task: QueuedTask,
queue_name: str,
*,
max_retries: int,
redis_url: str | None = None,
delay_seconds: float = 0,
) -> bool:
"""Requeue a failed task with capped retries.
Returns True if requeued.
"""
requeued_task = _requeue_with_attempt(task)
if requeued_task.attempts > max_retries:
logger.warning(
"rq.queue.drop_failed_task",
extra={
"task_type": task.task_type,
"queue_name": queue_name,
"attempts": requeued_task.attempts,
},
)
return False
if delay_seconds > 0:
return _schedule_for_later(
requeued_task,
queue_name,
delay_seconds,
redis_url=redis_url,
)
return enqueue_task(
requeued_task,
queue_name,
redis_url=redis_url,
)

View File

@@ -0,0 +1,152 @@
"""Generic queue worker with task-type dispatch."""
from __future__ import annotations
import asyncio
import random
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from app.core.config import settings
from app.core.logging import get_logger
from app.services.openclaw.lifecycle_queue import TASK_TYPE as LIFECYCLE_RECONCILE_TASK_TYPE
from app.services.openclaw.lifecycle_queue import (
requeue_lifecycle_queue_task,
)
from app.services.openclaw.lifecycle_reconcile import process_lifecycle_queue_task
from app.services.queue import QueuedTask, dequeue_task
from app.services.webhooks.dispatch import (
process_webhook_queue_task,
requeue_webhook_queue_task,
)
from app.services.webhooks.queue import TASK_TYPE as WEBHOOK_TASK_TYPE
logger = get_logger(__name__)
_WORKER_BLOCK_TIMEOUT_SECONDS = 5.0
@dataclass(frozen=True)
class _TaskHandler:
handler: Callable[[QueuedTask], Awaitable[None]]
attempts_to_delay: Callable[[int], float]
requeue: Callable[[QueuedTask, float], bool]
_TASK_HANDLERS: dict[str, _TaskHandler] = {
LIFECYCLE_RECONCILE_TASK_TYPE: _TaskHandler(
handler=process_lifecycle_queue_task,
attempts_to_delay=lambda attempts: min(
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)),
settings.rq_dispatch_retry_max_seconds,
),
requeue=lambda task, delay: requeue_lifecycle_queue_task(task, delay_seconds=delay),
),
WEBHOOK_TASK_TYPE: _TaskHandler(
handler=process_webhook_queue_task,
attempts_to_delay=lambda attempts: min(
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)),
settings.rq_dispatch_retry_max_seconds,
),
requeue=lambda task, delay: requeue_webhook_queue_task(task, delay_seconds=delay),
),
}
def _compute_jitter(base_delay: float) -> float:
return random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, base_delay * 0.1))
async def flush_queue(*, block: bool = False, block_timeout: float = 0) -> int:
"""Consume one queue batch and dispatch by task type."""
processed = 0
while True:
try:
task = dequeue_task(
settings.rq_queue_name,
redis_url=settings.rq_redis_url,
block=block,
block_timeout=block_timeout,
)
except Exception:
logger.exception(
"queue.worker.dequeue_failed",
extra={"queue_name": settings.rq_queue_name},
)
continue
if task is None:
break
handler = _TASK_HANDLERS.get(task.task_type)
if handler is None:
logger.warning(
"queue.worker.task_unhandled",
extra={
"task_type": task.task_type,
"queue_name": settings.rq_queue_name,
},
)
continue
try:
await handler.handler(task)
processed += 1
logger.info(
"queue.worker.success",
extra={
"task_type": task.task_type,
"attempt": task.attempts,
},
)
except Exception as exc:
logger.exception(
"queue.worker.failed",
extra={
"task_type": task.task_type,
"attempt": task.attempts,
"error": str(exc),
},
)
base_delay = handler.attempts_to_delay(task.attempts)
delay = base_delay + _compute_jitter(base_delay)
if not handler.requeue(task, delay):
logger.warning(
"queue.worker.drop_task",
extra={
"task_type": task.task_type,
"attempt": task.attempts,
},
)
await asyncio.sleep(settings.rq_dispatch_throttle_seconds)
if processed > 0:
logger.info("queue.worker.batch_complete", extra={"count": processed})
return processed
async def _run_worker_loop() -> None:
while True:
try:
await flush_queue(
block=True,
# Keep a finite timeout so scheduled tasks are periodically drained.
block_timeout=_WORKER_BLOCK_TIMEOUT_SECONDS,
)
except Exception:
logger.exception(
"queue.worker.loop_failed",
extra={"queue_name": settings.rq_queue_name},
)
await asyncio.sleep(1)
def run_worker() -> None:
"""RQ entrypoint for running continuous queue processing."""
logger.info(
"queue.worker.batch_started",
extra={"throttle_seconds": settings.rq_dispatch_throttle_seconds},
)
try:
asyncio.run(_run_worker_loop())
finally:
logger.info("queue.worker.stopped", extra={"queue_name": settings.rq_queue_name})

View File

@@ -164,7 +164,8 @@ async def validate_dependency_update(
},
)
# Ensure the dependency graph is acyclic after applying the update.
# Rebuild the board-wide graph and overlay the pending edit for this task so
# validation catches indirect cycles created through existing edges.
task_ids = list(
await session.exec(
select(col(Task.id)).where(col(Task.board_id) == board_id),

View File

@@ -0,0 +1,20 @@
"""Webhook queueing + dispatch utilities.
Prefer importing from this package when used by other modules.
"""
from app.services.webhooks.dispatch import run_flush_webhook_delivery_queue
from app.services.webhooks.queue import (
QueuedInboundDelivery,
dequeue_webhook_delivery,
enqueue_webhook_delivery,
requeue_if_failed,
)
__all__ = [
"QueuedInboundDelivery",
"dequeue_webhook_delivery",
"enqueue_webhook_delivery",
"requeue_if_failed",
"run_flush_webhook_delivery_queue",
]

View File

@@ -0,0 +1,286 @@
"""Webhook dispatch worker routines."""
from __future__ import annotations
import asyncio
import random
import time
from uuid import UUID
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.config import settings
from app.core.logging import get_logger
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.queue import QueuedTask
from app.services.webhooks.queue import (
QueuedInboundDelivery,
decode_webhook_task,
requeue_if_failed,
)
logger = get_logger(__name__)
def _build_payload_preview(payload_value: object) -> str:
if isinstance(payload_value, str):
return payload_value
try:
import json
return json.dumps(payload_value, indent=2, ensure_ascii=True)
except TypeError:
return str(payload_value)
def _webhook_message(
*,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> str:
preview = _build_payload_preview(payload.payload)
return (
"WEBHOOK EVENT RECEIVED\n"
f"Board: {board.name}\n"
f"Webhook ID: {webhook.id}\n"
f"Payload ID: {payload.id}\n"
f"Instruction: {webhook.description}\n\n"
"Take action:\n"
"1) Triage this payload against the webhook instruction.\n"
"2) Create/update tasks as needed.\n"
f"3) Reference payload ID {payload.id} in task descriptions.\n\n"
"Payload preview:\n"
f"{preview}\n\n"
"To inspect board memory entries:\n"
f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false"
)
async def _notify_target_agent(
*,
session: AsyncSession,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> None:
target_agent: Agent | None = None
if webhook.agent_id is not None:
target_agent = await Agent.objects.filter_by(id=webhook.agent_id, board_id=board.id).first(
session
)
if target_agent is None:
target_agent = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(
session
)
if target_agent is None or not target_agent.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
message = _webhook_message(board=board, webhook=webhook, payload=payload)
await dispatch.try_send_agent_message(
session_key=target_agent.openclaw_session_id,
config=config,
agent_name=target_agent.name,
message=message,
deliver=False,
)
async def _load_webhook_payload(
*,
session: AsyncSession,
payload_id: UUID,
webhook_id: UUID,
board_id: UUID,
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
payload = await session.get(BoardWebhookPayload, payload_id)
if payload is None:
logger.warning(
"webhook.queue.payload_missing",
extra={
"payload_id": str(payload_id),
"webhook_id": str(webhook_id),
"board_id": str(board_id),
},
)
return None
if payload.board_id != board_id or payload.webhook_id != webhook_id:
logger.warning(
"webhook.queue.payload_mismatch",
extra={
"payload_id": str(payload_id),
"payload_webhook_id": str(payload.webhook_id),
"payload_board_id": str(payload.board_id),
},
)
return None
board = await Board.objects.by_id(board_id).first(session)
if board is None:
logger.warning(
"webhook.queue.board_missing",
extra={"board_id": str(board_id), "payload_id": str(payload_id)},
)
return None
webhook = await session.get(BoardWebhook, webhook_id)
if webhook is None:
logger.warning(
"webhook.queue.webhook_missing",
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
)
return None
if webhook.board_id != board_id:
logger.warning(
"webhook.queue.webhook_board_mismatch",
extra={
"webhook_id": str(webhook_id),
"payload_board_id": str(payload.board_id),
"expected_board_id": str(board_id),
},
)
return None
return board, webhook, payload
async def _process_single_item(item: QueuedInboundDelivery) -> None:
async with async_session_maker() as session:
loaded = await _load_webhook_payload(
session=session,
payload_id=item.payload_id,
webhook_id=item.webhook_id,
board_id=item.board_id,
)
if loaded is None:
return
board, webhook, payload = loaded
await _notify_target_agent(session=session, board=board, webhook=webhook, payload=payload)
await session.commit()
def _compute_webhook_retry_delay(attempts: int) -> float:
base = float(settings.rq_dispatch_retry_base_seconds) * (2 ** max(0, attempts))
return float(min(base, float(settings.rq_dispatch_retry_max_seconds)))
def _compute_webhook_retry_jitter(base_delay: float) -> float:
upper_bound = float(
min(float(settings.rq_dispatch_retry_max_seconds) / 10.0, float(base_delay) * 0.1)
)
return float(random.uniform(0.0, upper_bound))
async def process_webhook_queue_task(task: QueuedTask) -> None:
item = decode_webhook_task(task)
await _process_single_item(item)
def requeue_webhook_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool:
payload = decode_webhook_task(task)
return requeue_if_failed(payload, delay_seconds=delay_seconds)
async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: float = 0) -> int:
"""Consume queued webhook events and notify board leads in a throttled batch."""
processed = 0
while True:
try:
if block or block_timeout:
item = dequeue_webhook_delivery(block=block, block_timeout=block_timeout)
else:
item = dequeue_webhook_delivery()
except Exception:
logger.exception("webhook.dispatch.dequeue_failed")
continue
if item is None:
break
try:
await _process_single_item(item)
processed += 1
logger.info(
"webhook.dispatch.success",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
},
)
except Exception as exc:
logger.exception(
"webhook.dispatch.failed",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
"error": str(exc),
},
)
delay = _compute_webhook_retry_delay(item.attempts)
jitter = _compute_webhook_retry_jitter(delay)
try:
requeue_if_failed(item, delay_seconds=delay + jitter)
except TypeError:
requeue_if_failed(item)
time.sleep(0.0)
await asyncio.sleep(settings.rq_dispatch_throttle_seconds)
if processed > 0:
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
return processed
def dequeue_webhook_delivery(
*,
block: bool = False,
block_timeout: float = 0,
) -> QueuedInboundDelivery | None:
"""Pop one queued webhook delivery payload."""
from app.services.queue import dequeue_task
task = dequeue_task(
settings.rq_queue_name,
redis_url=settings.rq_redis_url,
block=block,
block_timeout=block_timeout,
)
if task is None:
return None
return decode_webhook_task(task)
def dequeue_webhook_delivery_task(
*,
block: bool = False,
block_timeout: float = 0,
) -> QueuedInboundDelivery | None:
"""Backward-compatible alias for queue dequeue helper."""
return dequeue_webhook_delivery(block=block, block_timeout=block_timeout)
def run_flush_webhook_delivery_queue() -> None:
"""RQ entrypoint for running the async queue flush from worker jobs."""
logger.info(
"webhook.dispatch.batch_started",
extra={"throttle_seconds": settings.rq_dispatch_throttle_seconds},
)
start = time.time()
asyncio.run(flush_webhook_delivery_queue())
elapsed_ms = int((time.time() - start) * 1000)
logger.info("webhook.dispatch.batch_finished", extra={"duration_ms": elapsed_ms})

View File

@@ -0,0 +1,154 @@
"""Webhook queue persistence and delivery helpers."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
from uuid import UUID
from app.core.config import settings
from app.core.logging import get_logger
from app.services.queue import QueuedTask, dequeue_task, enqueue_task
from app.services.queue import requeue_if_failed as generic_requeue_if_failed
logger = get_logger(__name__)
TASK_TYPE = "webhook_delivery"
@dataclass(frozen=True)
class QueuedInboundDelivery:
"""Payload metadata stored for deferred webhook lead dispatch."""
board_id: UUID
webhook_id: UUID
payload_id: UUID
received_at: datetime
attempts: int = 0
def _task_from_payload(payload: QueuedInboundDelivery) -> QueuedTask:
return QueuedTask(
task_type=TASK_TYPE,
payload={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"received_at": payload.received_at.isoformat(),
},
created_at=payload.received_at,
attempts=payload.attempts,
)
def decode_webhook_task(task: QueuedTask) -> QueuedInboundDelivery:
if task.task_type not in {TASK_TYPE, "legacy"}:
raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}")
payload: dict[str, Any] = task.payload
if task.task_type == "legacy":
received_at = payload.get("received_at") or payload.get("created_at")
return QueuedInboundDelivery(
board_id=UUID(payload["board_id"]),
webhook_id=UUID(payload["webhook_id"]),
payload_id=UUID(payload["payload_id"]),
received_at=(
datetime.fromisoformat(received_at)
if isinstance(received_at, str)
else datetime.now(UTC)
),
attempts=int(payload.get("attempts", task.attempts)),
)
return QueuedInboundDelivery(
board_id=UUID(payload["board_id"]),
webhook_id=UUID(payload["webhook_id"]),
payload_id=UUID(payload["payload_id"]),
received_at=datetime.fromisoformat(payload["received_at"]),
attempts=int(payload.get("attempts", task.attempts)),
)
def enqueue_webhook_delivery(payload: QueuedInboundDelivery) -> bool:
"""Persist webhook metadata in a Redis queue for batch dispatch."""
try:
queued = _task_from_payload(payload)
enqueue_task(queued, settings.rq_queue_name, redis_url=settings.rq_redis_url)
logger.info(
"webhook.queue.enqueued",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempt": payload.attempts,
},
)
return True
except Exception as exc:
logger.warning(
"webhook.queue.enqueue_failed",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"error": str(exc),
},
)
return False
def dequeue_webhook_delivery(
*,
block: bool = False,
block_timeout: float = 0,
) -> QueuedInboundDelivery | None:
"""Pop one queued webhook delivery payload."""
try:
task = dequeue_task(
settings.rq_queue_name,
redis_url=settings.rq_redis_url,
block=block,
block_timeout=block_timeout,
)
if task is None:
return None
return decode_webhook_task(task)
except Exception as exc:
logger.error(
"webhook.queue.dequeue_failed",
extra={
"queue_name": settings.rq_queue_name,
"error": str(exc),
},
)
raise
def requeue_if_failed(
payload: QueuedInboundDelivery,
*,
delay_seconds: float = 0,
) -> bool:
"""Requeue payload delivery with capped retries.
Returns True if requeued.
"""
try:
return generic_requeue_if_failed(
_task_from_payload(payload),
settings.rq_queue_name,
max_retries=settings.rq_dispatch_max_retries,
redis_url=settings.rq_redis_url,
delay_seconds=delay_seconds,
)
except Exception as exc:
logger.warning(
"webhook.queue.requeue_failed",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"error": str(exc),
},
)
raise