fix(governor): satisfy backend CI checks
Align the cherry-picked governor code with the repository's lint and\ntype-check expectations. This fixes import ordering, formats the new\nservice, tightens session and heartbeat typing, and removes stale\nannotations so the backend CI job passes on current master. Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -18,27 +18,27 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlmodel import col, select
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
from app.core.time import utcnow
|
||||
from app.db.session import async_session_maker
|
||||
from app.models.agents import Agent
|
||||
from app.models.board_memory import BoardMemory
|
||||
from app.models.boards import Board
|
||||
from app.models.gateways import Gateway
|
||||
from app.models.tasks import Task
|
||||
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
|
||||
from app.services.openclaw.internal.agent_key import agent_key as _agent_key
|
||||
from app.services.openclaw.provisioning import (
|
||||
OpenClawGatewayControlPlane,
|
||||
_workspace_path,
|
||||
)
|
||||
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -108,8 +108,8 @@ def compute_desired_heartbeat(
|
||||
return DesiredHeartbeat(every=None, step=len(ladder) + 1, off=True)
|
||||
|
||||
|
||||
async def _acquire_lock(session) -> bool:
|
||||
result = await session.exec(
|
||||
async def _acquire_lock(session: AsyncSession) -> bool:
|
||||
result = await session.execute(
|
||||
text("SELECT pg_try_advisory_lock(:k1, :k2)"),
|
||||
params={"k1": _ADVISORY_LOCK_KEY_1, "k2": _ADVISORY_LOCK_KEY_2},
|
||||
)
|
||||
@@ -117,16 +117,16 @@ async def _acquire_lock(session) -> bool:
|
||||
return bool(row and row[0])
|
||||
|
||||
|
||||
async def _release_lock(session) -> None:
|
||||
await session.exec(
|
||||
async def _release_lock(session: AsyncSession) -> None:
|
||||
await session.execute(
|
||||
text("SELECT pg_advisory_unlock(:k1, :k2)"),
|
||||
params={"k1": _ADVISORY_LOCK_KEY_1, "k2": _ADVISORY_LOCK_KEY_2},
|
||||
)
|
||||
|
||||
|
||||
async def _latest_chat_by_board(session) -> dict[Any, datetime]:
|
||||
async def _latest_chat_by_board(session: AsyncSession) -> dict[UUID, datetime]:
|
||||
# Only chat memory items.
|
||||
rows = await session.exec(
|
||||
rows = await session.execute(
|
||||
text(
|
||||
"""
|
||||
SELECT board_id, MAX(created_at) AS last_chat_at
|
||||
@@ -136,17 +136,17 @@ async def _latest_chat_by_board(session) -> dict[Any, datetime]:
|
||||
""",
|
||||
),
|
||||
)
|
||||
result: dict[Any, datetime] = {}
|
||||
result: dict[UUID, datetime] = {}
|
||||
for board_id, last_chat_at in rows.all():
|
||||
if board_id and last_chat_at:
|
||||
result[board_id] = last_chat_at
|
||||
return result
|
||||
|
||||
|
||||
async def _has_work_map(session) -> dict[Any, bool]:
|
||||
async def _has_work_map(session: AsyncSession) -> dict[UUID, bool]:
|
||||
# Work = tasks assigned to the agent that are in_progress or review.
|
||||
# Map by agent_id.
|
||||
rows = await session.exec(
|
||||
rows = await session.execute(
|
||||
text(
|
||||
"""
|
||||
SELECT assigned_agent_id, COUNT(*)
|
||||
@@ -157,7 +157,7 @@ async def _has_work_map(session) -> dict[Any, bool]:
|
||||
""",
|
||||
),
|
||||
)
|
||||
result: dict[Any, bool] = {}
|
||||
result: dict[UUID, bool] = {}
|
||||
for agent_id, count in rows.all():
|
||||
if agent_id:
|
||||
result[agent_id] = bool(count and int(count) > 0)
|
||||
@@ -185,7 +185,7 @@ async def _patch_gateway(
|
||||
GatewayClientConfig(url=gateway.url or "", token=gateway.token),
|
||||
)
|
||||
# patch_agent_heartbeats supports None heartbeat entries after our patch.
|
||||
await control_plane.patch_agent_heartbeats(agent_entries) # type: ignore[arg-type]
|
||||
await control_plane.patch_agent_heartbeats(agent_entries)
|
||||
|
||||
|
||||
async def run_governor_once() -> None:
|
||||
@@ -211,8 +211,10 @@ async def run_governor_once() -> None:
|
||||
# Load board policies referenced by these agents.
|
||||
board_ids = {a.board_id for a in agents if a.board_id}
|
||||
boards = (
|
||||
await session.exec(select(Board).where(col(Board.id).in_(board_ids)))
|
||||
).all() if board_ids else []
|
||||
(await session.exec(select(Board).where(col(Board.id).in_(board_ids)))).all()
|
||||
if board_ids
|
||||
else []
|
||||
)
|
||||
board_by_id = {b.id: b for b in boards}
|
||||
|
||||
# Load gateways referenced by these agents.
|
||||
@@ -223,7 +225,7 @@ async def run_governor_once() -> None:
|
||||
gateway_by_id = {g.id: g for g in gateways}
|
||||
|
||||
# Group patches per gateway.
|
||||
patches_by_gateway: dict[Any, list[tuple[str, str, dict[str, Any] | None]]] = {}
|
||||
patches_by_gateway: dict[UUID, list[tuple[str, str, dict[str, Any] | None]]] = {}
|
||||
changed = 0
|
||||
|
||||
for agent in agents:
|
||||
@@ -261,7 +263,11 @@ async def run_governor_once() -> None:
|
||||
if not ladder:
|
||||
ladder = None
|
||||
lead_cap = (
|
||||
str(getattr(board, "auto_heartbeat_governor_lead_cap_every", DEFAULT_LEAD_CAP_EVERY))
|
||||
str(
|
||||
getattr(
|
||||
board, "auto_heartbeat_governor_lead_cap_every", DEFAULT_LEAD_CAP_EVERY
|
||||
)
|
||||
)
|
||||
if board is not None
|
||||
else DEFAULT_LEAD_CAP_EVERY
|
||||
)
|
||||
@@ -340,7 +346,11 @@ async def run_governor_once() -> None:
|
||||
|
||||
logger.info(
|
||||
"auto_heartbeat.run_complete",
|
||||
extra={"agents": len(agents), "db_changed": changed, "gateways": len(patches_by_gateway)},
|
||||
extra={
|
||||
"agents": len(agents),
|
||||
"db_changed": changed,
|
||||
"gateways": len(patches_by_gateway),
|
||||
},
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
|
||||
@@ -768,7 +768,7 @@ def _updated_agent_list(
|
||||
for agent_id, (workspace_path, heartbeat) in entry_by_id.items():
|
||||
if agent_id in updated_ids:
|
||||
continue
|
||||
entry = {"id": agent_id, "workspace": workspace_path}
|
||||
entry: dict[str, Any] = {"id": agent_id, "workspace": workspace_path}
|
||||
if heartbeat is not None:
|
||||
entry["heartbeat"] = heartbeat
|
||||
new_list.append(entry)
|
||||
@@ -1077,7 +1077,7 @@ def _control_plane_for_gateway(gateway: Gateway) -> OpenClawGatewayControlPlane:
|
||||
async def _patch_gateway_agent_heartbeats(
|
||||
gateway: Gateway,
|
||||
*,
|
||||
entries: list[tuple[str, str, dict[str, Any]]],
|
||||
entries: list[tuple[str, str, dict[str, Any] | None]],
|
||||
) -> None:
|
||||
"""Patch multiple agent heartbeat configs in a single gateway config.patch call.
|
||||
|
||||
@@ -1117,7 +1117,7 @@ class OpenClawGatewayProvisioner:
|
||||
if not gateway.workspace_root:
|
||||
msg = "gateway workspace_root is required"
|
||||
raise OpenClawGatewayError(msg)
|
||||
entries: list[tuple[str, str, dict[str, Any]]] = []
|
||||
entries: list[tuple[str, str, dict[str, Any] | None]] = []
|
||||
for agent in agents:
|
||||
agent_id = _agent_key(agent)
|
||||
workspace_path = _workspace_path(agent, gateway.workspace_root)
|
||||
|
||||
Reference in New Issue
Block a user