refactor: introduce agent key and retry helpers for coordination flows
This commit is contained in:
@@ -34,11 +34,10 @@ from app.services.openclaw.exceptions import (
|
|||||||
map_gateway_error_message,
|
map_gateway_error_message,
|
||||||
map_gateway_error_to_http_exception,
|
map_gateway_error_to_http_exception,
|
||||||
)
|
)
|
||||||
|
from app.services.openclaw.internal import agent_key, with_coordination_gateway_retry
|
||||||
from app.services.openclaw.provisioning import (
|
from app.services.openclaw.provisioning import (
|
||||||
LeadAgentOptions,
|
LeadAgentOptions,
|
||||||
LeadAgentRequest,
|
LeadAgentRequest,
|
||||||
_agent_key,
|
|
||||||
_with_coordination_gateway_retry,
|
|
||||||
ensure_board_lead_agent,
|
ensure_board_lead_agent,
|
||||||
)
|
)
|
||||||
from app.services.openclaw.shared import (
|
from app.services.openclaw.shared import (
|
||||||
@@ -80,7 +79,7 @@ class AbstractGatewayMessagingService(ABC):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _with_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T:
|
async def _with_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T:
|
||||||
return await _with_coordination_gateway_retry(fn)
|
return await with_coordination_gateway_retry(fn)
|
||||||
|
|
||||||
async def _dispatch_gateway_message(
|
async def _dispatch_gateway_message(
|
||||||
self,
|
self,
|
||||||
@@ -297,7 +296,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
|
|||||||
async def _do_get() -> object:
|
async def _do_get() -> object:
|
||||||
return await openclaw_call(
|
return await openclaw_call(
|
||||||
"agents.files.get",
|
"agents.files.get",
|
||||||
{"agentId": _agent_key(target), "name": "SOUL.md"},
|
{"agentId": agent_key(target), "name": "SOUL.md"},
|
||||||
config=config,
|
config=config,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -378,7 +377,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
|
|||||||
return await openclaw_call(
|
return await openclaw_call(
|
||||||
"agents.files.set",
|
"agents.files.set",
|
||||||
{
|
{
|
||||||
"agentId": _agent_key(target),
|
"agentId": agent_key(target),
|
||||||
"name": "SOUL.md",
|
"name": "SOUL.md",
|
||||||
"content": normalized_content,
|
"content": normalized_content,
|
||||||
},
|
},
|
||||||
|
|||||||
6
backend/app/services/openclaw/internal/__init__.py
Normal file
6
backend/app/services/openclaw/internal/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""Internal typed helpers shared across OpenClaw service modules."""
|
||||||
|
|
||||||
|
from .agent_key import agent_key
|
||||||
|
from .retry import with_coordination_gateway_retry
|
||||||
|
|
||||||
|
__all__ = ["agent_key", "with_coordination_gateway_retry"]
|
||||||
24
backend/app/services/openclaw/internal/agent_key.py
Normal file
24
backend/app/services/openclaw/internal/agent_key.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
"""Agent key derivation helpers shared across OpenClaw modules."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from app.models.agents import Agent
|
||||||
|
from app.services.openclaw.constants import _SESSION_KEY_PARTS_MIN
|
||||||
|
|
||||||
|
|
||||||
|
def _slugify(value: str) -> str:
|
||||||
|
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
|
||||||
|
return slug or uuid4().hex
|
||||||
|
|
||||||
|
|
||||||
|
def agent_key(agent: Agent) -> str:
|
||||||
|
"""Return stable gateway agent id derived from session key or name fallback."""
|
||||||
|
session_key = agent.openclaw_session_id or ""
|
||||||
|
if session_key.startswith("agent:"):
|
||||||
|
parts = session_key.split(":")
|
||||||
|
if len(parts) >= _SESSION_KEY_PARTS_MIN and parts[1]:
|
||||||
|
return parts[1]
|
||||||
|
return _slugify(agent.name)
|
||||||
124
backend/app/services/openclaw/internal/retry.py
Normal file
124
backend/app/services/openclaw/internal/retry.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
"""Internal gateway retry helpers for coordination flows."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
from typing import TypeVar
|
||||||
|
|
||||||
|
from app.integrations.openclaw_gateway import OpenClawGatewayError
|
||||||
|
from app.services.openclaw.constants import (
|
||||||
|
_COORDINATION_GATEWAY_BASE_DELAY_S,
|
||||||
|
_COORDINATION_GATEWAY_MAX_DELAY_S,
|
||||||
|
_COORDINATION_GATEWAY_TIMEOUT_S,
|
||||||
|
_NON_TRANSIENT_GATEWAY_ERROR_MARKERS,
|
||||||
|
_SECURE_RANDOM,
|
||||||
|
_TRANSIENT_GATEWAY_ERROR_MARKERS,
|
||||||
|
)
|
||||||
|
|
||||||
|
_T = TypeVar("_T")
|
||||||
|
|
||||||
|
|
||||||
|
def _is_transient_gateway_error(exc: Exception) -> bool:
|
||||||
|
if not isinstance(exc, OpenClawGatewayError):
|
||||||
|
return False
|
||||||
|
message = str(exc).lower()
|
||||||
|
if not message:
|
||||||
|
return False
|
||||||
|
if any(marker in message for marker in _NON_TRANSIENT_GATEWAY_ERROR_MARKERS):
|
||||||
|
return False
|
||||||
|
return ("503" in message and "websocket" in message) or any(
|
||||||
|
marker in message for marker in _TRANSIENT_GATEWAY_ERROR_MARKERS
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _gateway_timeout_message(
|
||||||
|
exc: OpenClawGatewayError,
|
||||||
|
*,
|
||||||
|
timeout_s: float,
|
||||||
|
context: str,
|
||||||
|
) -> str:
|
||||||
|
rounded_timeout = int(timeout_s)
|
||||||
|
timeout_text = f"{rounded_timeout} seconds"
|
||||||
|
if rounded_timeout >= 120:
|
||||||
|
timeout_text = f"{rounded_timeout // 60} minutes"
|
||||||
|
return f"Gateway unreachable after {timeout_text} ({context} timeout). Last error: {exc}"
|
||||||
|
|
||||||
|
|
||||||
|
class GatewayBackoff:
|
||||||
|
"""Exponential backoff with jitter for transient gateway errors."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
timeout_s: float = 10 * 60,
|
||||||
|
base_delay_s: float = 0.75,
|
||||||
|
max_delay_s: float = 30.0,
|
||||||
|
jitter: float = 0.2,
|
||||||
|
timeout_context: str = "gateway operation",
|
||||||
|
) -> None:
|
||||||
|
self._timeout_s = timeout_s
|
||||||
|
self._base_delay_s = base_delay_s
|
||||||
|
self._max_delay_s = max_delay_s
|
||||||
|
self._jitter = jitter
|
||||||
|
self._timeout_context = timeout_context
|
||||||
|
self._delay_s = base_delay_s
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
self._delay_s = self._base_delay_s
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def _attempt(
|
||||||
|
fn: Callable[[], Awaitable[_T]],
|
||||||
|
) -> tuple[_T | None, OpenClawGatewayError | None]:
|
||||||
|
try:
|
||||||
|
return await fn(), None
|
||||||
|
except OpenClawGatewayError as exc:
|
||||||
|
return None, exc
|
||||||
|
|
||||||
|
async def run(self, fn: Callable[[], Awaitable[_T]]) -> _T:
|
||||||
|
deadline_s = asyncio.get_running_loop().time() + self._timeout_s
|
||||||
|
while True:
|
||||||
|
value, error = await self._attempt(fn)
|
||||||
|
if error is not None:
|
||||||
|
exc = error
|
||||||
|
if not _is_transient_gateway_error(exc):
|
||||||
|
raise exc
|
||||||
|
now = asyncio.get_running_loop().time()
|
||||||
|
remaining = deadline_s - now
|
||||||
|
if remaining <= 0:
|
||||||
|
raise TimeoutError(
|
||||||
|
_gateway_timeout_message(
|
||||||
|
exc,
|
||||||
|
timeout_s=self._timeout_s,
|
||||||
|
context=self._timeout_context,
|
||||||
|
),
|
||||||
|
) from exc
|
||||||
|
|
||||||
|
sleep_s = min(self._delay_s, remaining)
|
||||||
|
if self._jitter:
|
||||||
|
sleep_s *= 1.0 + _SECURE_RANDOM.uniform(
|
||||||
|
-self._jitter,
|
||||||
|
self._jitter,
|
||||||
|
)
|
||||||
|
sleep_s = max(0.0, min(sleep_s, remaining))
|
||||||
|
await asyncio.sleep(sleep_s)
|
||||||
|
self._delay_s = min(self._delay_s * 2.0, self._max_delay_s)
|
||||||
|
continue
|
||||||
|
self.reset()
|
||||||
|
if value is None:
|
||||||
|
msg = "Gateway retry produced no value without an error"
|
||||||
|
raise RuntimeError(msg)
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
async def with_coordination_gateway_retry(fn: Callable[[], Awaitable[_T]]) -> _T:
|
||||||
|
"""Run a gateway call under coordination retry policy."""
|
||||||
|
backoff = GatewayBackoff(
|
||||||
|
timeout_s=_COORDINATION_GATEWAY_TIMEOUT_S,
|
||||||
|
base_delay_s=_COORDINATION_GATEWAY_BASE_DELAY_S,
|
||||||
|
max_delay_s=_COORDINATION_GATEWAY_MAX_DELAY_S,
|
||||||
|
jitter=0.15,
|
||||||
|
timeout_context="gateway coordination",
|
||||||
|
)
|
||||||
|
return await backoff.run(fn)
|
||||||
Reference in New Issue
Block a user