feat: add board group models and update related interfaces
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import re
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import TypeVar
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlmodel import col, select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
@@ -14,6 +16,7 @@ from app.core.time import utcnow
|
||||
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
|
||||
from app.integrations.openclaw_gateway import OpenClawGatewayError, openclaw_call
|
||||
from app.models.agents import Agent
|
||||
from app.models.board_memory import BoardMemory
|
||||
from app.models.boards import Board
|
||||
from app.models.gateways import Gateway
|
||||
from app.models.users import User
|
||||
@@ -38,10 +41,22 @@ def _is_transient_gateway_error(exc: Exception) -> bool:
|
||||
return False
|
||||
if "unsupported file" in message:
|
||||
return False
|
||||
if "connect call failed" in message or "connection refused" in message:
|
||||
return True
|
||||
if "errno 111" in message or "econnrefused" in message:
|
||||
return True
|
||||
if "did not receive a valid http response" in message:
|
||||
return True
|
||||
if "no route to host" in message or "network is unreachable" in message:
|
||||
return True
|
||||
if "host is down" in message or "name or service not known" in message:
|
||||
return True
|
||||
if "received 1012" in message or "service restart" in message:
|
||||
return True
|
||||
if "http 503" in message or ("503" in message and "websocket" in message):
|
||||
return True
|
||||
if "http 502" in message or "http 504" in message:
|
||||
return True
|
||||
if "temporar" in message:
|
||||
return True
|
||||
if "timeout" in message or "timed out" in message:
|
||||
@@ -51,20 +66,58 @@ def _is_transient_gateway_error(exc: Exception) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
class _GatewayBackoff:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
timeout_s: float = 10 * 60,
|
||||
base_delay_s: float = 0.75,
|
||||
max_delay_s: float = 30.0,
|
||||
jitter: float = 0.2,
|
||||
) -> None:
|
||||
self._timeout_s = timeout_s
|
||||
self._base_delay_s = base_delay_s
|
||||
self._max_delay_s = max_delay_s
|
||||
self._jitter = jitter
|
||||
self._delay_s = base_delay_s
|
||||
|
||||
def reset(self) -> None:
|
||||
self._delay_s = self._base_delay_s
|
||||
|
||||
async def run(self, fn: Callable[[], Awaitable[T]]) -> T:
|
||||
# Use per-call deadlines so long-running syncs can still tolerate a later
|
||||
# gateway restart without having an already-expired retry window.
|
||||
deadline_s = asyncio.get_running_loop().time() + self._timeout_s
|
||||
while True:
|
||||
try:
|
||||
value = await fn()
|
||||
self.reset()
|
||||
return value
|
||||
except Exception as exc:
|
||||
if not _is_transient_gateway_error(exc):
|
||||
raise
|
||||
now = asyncio.get_running_loop().time()
|
||||
remaining = deadline_s - now
|
||||
if remaining <= 0:
|
||||
raise TimeoutError(
|
||||
"Gateway unreachable after 10 minutes (template sync timeout). "
|
||||
f"Last error: {exc}"
|
||||
) from exc
|
||||
|
||||
sleep_s = min(self._delay_s, remaining)
|
||||
if self._jitter:
|
||||
sleep_s *= 1.0 + random.uniform(-self._jitter, self._jitter)
|
||||
sleep_s = max(0.0, min(sleep_s, remaining))
|
||||
await asyncio.sleep(sleep_s)
|
||||
self._delay_s = min(self._delay_s * 2.0, self._max_delay_s)
|
||||
|
||||
|
||||
async def _with_gateway_retry(
|
||||
fn: Callable[[], Awaitable[T]],
|
||||
*,
|
||||
attempts: int = 3,
|
||||
base_delay_s: float = 0.75,
|
||||
backoff: _GatewayBackoff,
|
||||
) -> T:
|
||||
for attempt in range(attempts):
|
||||
try:
|
||||
return await fn()
|
||||
except Exception as exc:
|
||||
if attempt >= attempts - 1 or not _is_transient_gateway_error(exc):
|
||||
raise
|
||||
await asyncio.sleep(base_delay_s * (2**attempt))
|
||||
raise AssertionError("unreachable")
|
||||
return await backoff.run(fn)
|
||||
|
||||
|
||||
def _agent_id_from_session_key(session_key: str | None) -> str | None:
|
||||
@@ -137,13 +190,18 @@ async def _get_agent_file(
|
||||
agent_gateway_id: str,
|
||||
name: str,
|
||||
config: GatewayClientConfig,
|
||||
backoff: _GatewayBackoff | None = None,
|
||||
) -> str | None:
|
||||
try:
|
||||
payload = await openclaw_call(
|
||||
"agents.files.get",
|
||||
{"agentId": agent_gateway_id, "name": name},
|
||||
config=config,
|
||||
)
|
||||
|
||||
async def _do_get() -> object:
|
||||
return await openclaw_call(
|
||||
"agents.files.get",
|
||||
{"agentId": agent_gateway_id, "name": name},
|
||||
config=config,
|
||||
)
|
||||
|
||||
payload = await (backoff.run(_do_get) if backoff else _do_get())
|
||||
except OpenClawGatewayError:
|
||||
return None
|
||||
if isinstance(payload, str):
|
||||
@@ -167,8 +225,14 @@ async def _get_existing_auth_token(
|
||||
*,
|
||||
agent_gateway_id: str,
|
||||
config: GatewayClientConfig,
|
||||
backoff: _GatewayBackoff | None = None,
|
||||
) -> str | None:
|
||||
tools = await _get_agent_file(agent_gateway_id=agent_gateway_id, name="TOOLS.md", config=config)
|
||||
tools = await _get_agent_file(
|
||||
agent_gateway_id=agent_gateway_id,
|
||||
name="TOOLS.md",
|
||||
config=config,
|
||||
backoff=backoff,
|
||||
)
|
||||
if not tools:
|
||||
return None
|
||||
values = _parse_tools_md(tools)
|
||||
@@ -183,32 +247,45 @@ async def _gateway_default_agent_id(
|
||||
config: GatewayClientConfig,
|
||||
*,
|
||||
fallback_session_key: str | None = None,
|
||||
backoff: _GatewayBackoff | None = None,
|
||||
) -> str | None:
|
||||
last_error: OpenClawGatewayError | None = None
|
||||
# Gateways may reject WS connects transiently under load (HTTP 503).
|
||||
for attempt in range(3):
|
||||
try:
|
||||
payload = await openclaw_call("agents.list", config=config)
|
||||
agent_id = _extract_agent_id(payload)
|
||||
if agent_id:
|
||||
return agent_id
|
||||
break
|
||||
except OpenClawGatewayError as exc:
|
||||
last_error = exc
|
||||
message = str(exc).lower()
|
||||
if (
|
||||
"503" not in message
|
||||
and "temporar" not in message
|
||||
and "rejected" not in message
|
||||
and "timeout" not in message
|
||||
):
|
||||
break
|
||||
await asyncio.sleep(0.5 * (2**attempt))
|
||||
try:
|
||||
|
||||
_ = last_error
|
||||
async def _do_list() -> object:
|
||||
return await openclaw_call("agents.list", config=config)
|
||||
|
||||
payload = await (backoff.run(_do_list) if backoff else _do_list())
|
||||
agent_id = _extract_agent_id(payload)
|
||||
if agent_id:
|
||||
return agent_id
|
||||
except OpenClawGatewayError:
|
||||
pass
|
||||
return _agent_id_from_session_key(fallback_session_key)
|
||||
|
||||
|
||||
async def _paused_board_ids(session: AsyncSession, board_ids: list[UUID]) -> set[UUID]:
|
||||
if not board_ids:
|
||||
return set()
|
||||
|
||||
commands = {"/pause", "/resume"}
|
||||
statement = (
|
||||
select(BoardMemory.board_id, BoardMemory.content)
|
||||
.where(col(BoardMemory.board_id).in_(board_ids))
|
||||
.where(col(BoardMemory.is_chat).is_(True))
|
||||
.where(func.lower(func.trim(col(BoardMemory.content))).in_(commands))
|
||||
.order_by(col(BoardMemory.board_id), col(BoardMemory.created_at).desc())
|
||||
# Postgres: DISTINCT ON (board_id) to get latest command per board.
|
||||
.distinct(col(BoardMemory.board_id))
|
||||
)
|
||||
|
||||
paused: set[UUID] = set()
|
||||
for board_id, content in await session.exec(statement):
|
||||
cmd = (content or "").strip().lower()
|
||||
if cmd == "/pause":
|
||||
paused.add(board_id)
|
||||
return paused
|
||||
|
||||
|
||||
async def sync_gateway_templates(
|
||||
session: AsyncSession,
|
||||
gateway: Gateway,
|
||||
@@ -235,6 +312,21 @@ async def sync_gateway_templates(
|
||||
return result
|
||||
|
||||
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
|
||||
backoff = _GatewayBackoff(timeout_s=10 * 60)
|
||||
|
||||
# First, wait for the gateway to be reachable (e.g. while it is restarting).
|
||||
try:
|
||||
|
||||
async def _do_ping() -> object:
|
||||
return await openclaw_call("agents.list", config=client_config)
|
||||
|
||||
await backoff.run(_do_ping)
|
||||
except TimeoutError as exc:
|
||||
result.errors.append(GatewayTemplatesSyncError(message=str(exc)))
|
||||
return result
|
||||
except OpenClawGatewayError as exc:
|
||||
result.errors.append(GatewayTemplatesSyncError(message=str(exc)))
|
||||
return result
|
||||
|
||||
boards = list(await session.exec(select(Board).where(col(Board.gateway_id) == gateway.id)))
|
||||
boards_by_id = {board.id: board for board in boards}
|
||||
@@ -250,6 +342,8 @@ async def sync_gateway_templates(
|
||||
return result
|
||||
boards_by_id = {board_id: board}
|
||||
|
||||
paused_board_ids = await _paused_board_ids(session, list(boards_by_id.keys()))
|
||||
|
||||
if boards_by_id:
|
||||
agents = list(
|
||||
await session.exec(
|
||||
@@ -275,10 +369,27 @@ async def sync_gateway_templates(
|
||||
)
|
||||
continue
|
||||
|
||||
if board.id in paused_board_ids:
|
||||
result.agents_skipped += 1
|
||||
continue
|
||||
|
||||
agent_gateway_id = _gateway_agent_id(agent)
|
||||
auth_token = await _get_existing_auth_token(
|
||||
agent_gateway_id=agent_gateway_id, config=client_config
|
||||
)
|
||||
try:
|
||||
auth_token = await _get_existing_auth_token(
|
||||
agent_gateway_id=agent_gateway_id,
|
||||
config=client_config,
|
||||
backoff=backoff,
|
||||
)
|
||||
except TimeoutError as exc:
|
||||
result.errors.append(
|
||||
GatewayTemplatesSyncError(
|
||||
agent_id=agent.id,
|
||||
agent_name=agent.name,
|
||||
board_id=board.id,
|
||||
message=str(exc),
|
||||
)
|
||||
)
|
||||
return result
|
||||
|
||||
if not auth_token:
|
||||
if not rotate_tokens:
|
||||
@@ -321,6 +432,7 @@ async def sync_gateway_templates(
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
async def _do_provision() -> None:
|
||||
await provision_agent(
|
||||
agent,
|
||||
@@ -333,8 +445,19 @@ async def sync_gateway_templates(
|
||||
reset_session=reset_sessions,
|
||||
)
|
||||
|
||||
await _with_gateway_retry(_do_provision)
|
||||
await _with_gateway_retry(_do_provision, backoff=backoff)
|
||||
result.agents_updated += 1
|
||||
except TimeoutError as exc: # pragma: no cover - gateway/network dependent
|
||||
result.agents_skipped += 1
|
||||
result.errors.append(
|
||||
GatewayTemplatesSyncError(
|
||||
agent_id=agent.id,
|
||||
agent_name=agent.name,
|
||||
board_id=board.id,
|
||||
message=str(exc),
|
||||
)
|
||||
)
|
||||
return result
|
||||
except Exception as exc: # pragma: no cover - gateway/network dependent
|
||||
result.agents_skipped += 1
|
||||
result.errors.append(
|
||||
@@ -360,10 +483,21 @@ async def sync_gateway_templates(
|
||||
)
|
||||
return result
|
||||
|
||||
main_gateway_agent_id = await _gateway_default_agent_id(
|
||||
client_config,
|
||||
fallback_session_key=gateway.main_session_key,
|
||||
)
|
||||
try:
|
||||
main_gateway_agent_id = await _gateway_default_agent_id(
|
||||
client_config,
|
||||
fallback_session_key=gateway.main_session_key,
|
||||
backoff=backoff,
|
||||
)
|
||||
except TimeoutError as exc:
|
||||
result.errors.append(
|
||||
GatewayTemplatesSyncError(
|
||||
agent_id=main_agent.id,
|
||||
agent_name=main_agent.name,
|
||||
message=str(exc),
|
||||
)
|
||||
)
|
||||
return result
|
||||
if not main_gateway_agent_id:
|
||||
result.errors.append(
|
||||
GatewayTemplatesSyncError(
|
||||
@@ -374,9 +508,21 @@ async def sync_gateway_templates(
|
||||
)
|
||||
return result
|
||||
|
||||
main_token = await _get_existing_auth_token(
|
||||
agent_gateway_id=main_gateway_agent_id, config=client_config
|
||||
)
|
||||
try:
|
||||
main_token = await _get_existing_auth_token(
|
||||
agent_gateway_id=main_gateway_agent_id,
|
||||
config=client_config,
|
||||
backoff=backoff,
|
||||
)
|
||||
except TimeoutError as exc:
|
||||
result.errors.append(
|
||||
GatewayTemplatesSyncError(
|
||||
agent_id=main_agent.id,
|
||||
agent_name=main_agent.name,
|
||||
message=str(exc),
|
||||
)
|
||||
)
|
||||
return result
|
||||
if not main_token:
|
||||
if rotate_tokens:
|
||||
raw_token = generate_agent_token()
|
||||
@@ -417,6 +563,7 @@ async def sync_gateway_templates(
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
async def _do_provision_main() -> None:
|
||||
await provision_main_agent(
|
||||
main_agent,
|
||||
@@ -428,8 +575,17 @@ async def sync_gateway_templates(
|
||||
reset_session=reset_sessions,
|
||||
)
|
||||
|
||||
await _with_gateway_retry(_do_provision_main)
|
||||
await _with_gateway_retry(_do_provision_main, backoff=backoff)
|
||||
result.main_updated = True
|
||||
except TimeoutError as exc: # pragma: no cover - gateway/network dependent
|
||||
result.errors.append(
|
||||
GatewayTemplatesSyncError(
|
||||
agent_id=main_agent.id,
|
||||
agent_name=main_agent.name,
|
||||
message=str(exc),
|
||||
)
|
||||
)
|
||||
return result
|
||||
except Exception as exc: # pragma: no cover - gateway/network dependent
|
||||
result.errors.append(
|
||||
GatewayTemplatesSyncError(
|
||||
|
||||
Reference in New Issue
Block a user