diff --git a/backend/app/services/openclaw/coordination_service.py b/backend/app/services/openclaw/coordination_service.py index 202b2a21..3c4b6241 100644 --- a/backend/app/services/openclaw/coordination_service.py +++ b/backend/app/services/openclaw/coordination_service.py @@ -34,11 +34,10 @@ from app.services.openclaw.exceptions import ( map_gateway_error_message, map_gateway_error_to_http_exception, ) +from app.services.openclaw.internal import agent_key, with_coordination_gateway_retry from app.services.openclaw.provisioning import ( LeadAgentOptions, LeadAgentRequest, - _agent_key, - _with_coordination_gateway_retry, ensure_board_lead_agent, ) from app.services.openclaw.shared import ( @@ -80,7 +79,7 @@ class AbstractGatewayMessagingService(ABC): @staticmethod async def _with_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T: - return await _with_coordination_gateway_retry(fn) + return await with_coordination_gateway_retry(fn) async def _dispatch_gateway_message( self, @@ -297,7 +296,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): async def _do_get() -> object: return await openclaw_call( "agents.files.get", - {"agentId": _agent_key(target), "name": "SOUL.md"}, + {"agentId": agent_key(target), "name": "SOUL.md"}, config=config, ) @@ -378,7 +377,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService): return await openclaw_call( "agents.files.set", { - "agentId": _agent_key(target), + "agentId": agent_key(target), "name": "SOUL.md", "content": normalized_content, }, diff --git a/backend/app/services/openclaw/internal/__init__.py b/backend/app/services/openclaw/internal/__init__.py new file mode 100644 index 00000000..89680707 --- /dev/null +++ b/backend/app/services/openclaw/internal/__init__.py @@ -0,0 +1,6 @@ +"""Internal typed helpers shared across OpenClaw service modules.""" + +from .agent_key import agent_key +from .retry import with_coordination_gateway_retry + +__all__ = ["agent_key", "with_coordination_gateway_retry"] diff --git a/backend/app/services/openclaw/internal/agent_key.py b/backend/app/services/openclaw/internal/agent_key.py new file mode 100644 index 00000000..073d083c --- /dev/null +++ b/backend/app/services/openclaw/internal/agent_key.py @@ -0,0 +1,24 @@ +"""Agent key derivation helpers shared across OpenClaw modules.""" + +from __future__ import annotations + +import re +from uuid import uuid4 + +from app.models.agents import Agent +from app.services.openclaw.constants import _SESSION_KEY_PARTS_MIN + + +def _slugify(value: str) -> str: + slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") + return slug or uuid4().hex + + +def agent_key(agent: Agent) -> str: + """Return stable gateway agent id derived from session key or name fallback.""" + session_key = agent.openclaw_session_id or "" + if session_key.startswith("agent:"): + parts = session_key.split(":") + if len(parts) >= _SESSION_KEY_PARTS_MIN and parts[1]: + return parts[1] + return _slugify(agent.name) diff --git a/backend/app/services/openclaw/internal/retry.py b/backend/app/services/openclaw/internal/retry.py new file mode 100644 index 00000000..df097061 --- /dev/null +++ b/backend/app/services/openclaw/internal/retry.py @@ -0,0 +1,124 @@ +"""Internal gateway retry helpers for coordination flows.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from typing import TypeVar + +from app.integrations.openclaw_gateway import OpenClawGatewayError +from app.services.openclaw.constants import ( + _COORDINATION_GATEWAY_BASE_DELAY_S, + _COORDINATION_GATEWAY_MAX_DELAY_S, + _COORDINATION_GATEWAY_TIMEOUT_S, + _NON_TRANSIENT_GATEWAY_ERROR_MARKERS, + _SECURE_RANDOM, + _TRANSIENT_GATEWAY_ERROR_MARKERS, +) + +_T = TypeVar("_T") + + +def _is_transient_gateway_error(exc: Exception) -> bool: + if not isinstance(exc, OpenClawGatewayError): + return False + message = str(exc).lower() + if not message: + return False + if any(marker in message for marker in _NON_TRANSIENT_GATEWAY_ERROR_MARKERS): + return False + return ("503" in message and "websocket" in message) or any( + marker in message for marker in _TRANSIENT_GATEWAY_ERROR_MARKERS + ) + + +def _gateway_timeout_message( + exc: OpenClawGatewayError, + *, + timeout_s: float, + context: str, +) -> str: + rounded_timeout = int(timeout_s) + timeout_text = f"{rounded_timeout} seconds" + if rounded_timeout >= 120: + timeout_text = f"{rounded_timeout // 60} minutes" + return f"Gateway unreachable after {timeout_text} ({context} timeout). Last error: {exc}" + + +class GatewayBackoff: + """Exponential backoff with jitter for transient gateway errors.""" + + def __init__( + self, + *, + timeout_s: float = 10 * 60, + base_delay_s: float = 0.75, + max_delay_s: float = 30.0, + jitter: float = 0.2, + timeout_context: str = "gateway operation", + ) -> None: + self._timeout_s = timeout_s + self._base_delay_s = base_delay_s + self._max_delay_s = max_delay_s + self._jitter = jitter + self._timeout_context = timeout_context + self._delay_s = base_delay_s + + def reset(self) -> None: + self._delay_s = self._base_delay_s + + @staticmethod + async def _attempt( + fn: Callable[[], Awaitable[_T]], + ) -> tuple[_T | None, OpenClawGatewayError | None]: + try: + return await fn(), None + except OpenClawGatewayError as exc: + return None, exc + + async def run(self, fn: Callable[[], Awaitable[_T]]) -> _T: + deadline_s = asyncio.get_running_loop().time() + self._timeout_s + while True: + value, error = await self._attempt(fn) + if error is not None: + exc = error + if not _is_transient_gateway_error(exc): + raise exc + now = asyncio.get_running_loop().time() + remaining = deadline_s - now + if remaining <= 0: + raise TimeoutError( + _gateway_timeout_message( + exc, + timeout_s=self._timeout_s, + context=self._timeout_context, + ), + ) from exc + + sleep_s = min(self._delay_s, remaining) + if self._jitter: + sleep_s *= 1.0 + _SECURE_RANDOM.uniform( + -self._jitter, + self._jitter, + ) + sleep_s = max(0.0, min(sleep_s, remaining)) + await asyncio.sleep(sleep_s) + self._delay_s = min(self._delay_s * 2.0, self._max_delay_s) + continue + self.reset() + if value is None: + msg = "Gateway retry produced no value without an error" + raise RuntimeError(msg) + return value + + +async def with_coordination_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T: + """Run a gateway call under coordination retry policy.""" + backoff = GatewayBackoff( + timeout_s=_COORDINATION_GATEWAY_TIMEOUT_S, + base_delay_s=_COORDINATION_GATEWAY_BASE_DELAY_S, + max_delay_s=_COORDINATION_GATEWAY_MAX_DELAY_S, + jitter=0.15, + timeout_context="gateway coordination", + ) + return await backoff.run(fn)