From eba090a3d315fb9c3a96c9f4f44a83201241cecf Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Sun, 8 Mar 2026 00:34:24 +0530 Subject: [PATCH] fix(governor): satisfy backend CI checks Align the cherry-picked governor code with the repository's lint and\ntype-check expectations. This fixes import ordering, formats the new\nservice, tightens session and heartbeat typing, and removes stale\nannotations so the backend CI job passes on current master. Co-Authored-By: Claude --- .../app/services/auto_heartbeat_governor.py | 50 +++++++++++-------- backend/app/services/openclaw/provisioning.py | 6 +-- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/backend/app/services/auto_heartbeat_governor.py b/backend/app/services/auto_heartbeat_governor.py index 18662d02..f5994e24 100644 --- a/backend/app/services/auto_heartbeat_governor.py +++ b/backend/app/services/auto_heartbeat_governor.py @@ -18,27 +18,27 @@ from __future__ import annotations import asyncio from dataclasses import dataclass -from datetime import UTC, datetime, timedelta +from datetime import datetime, timedelta from typing import Any +from uuid import UUID from sqlalchemy import text from sqlmodel import col, select +from sqlmodel.ext.asyncio.session import AsyncSession from app.core.config import settings from app.core.logging import get_logger from app.core.time import utcnow from app.db.session import async_session_maker from app.models.agents import Agent -from app.models.board_memory import BoardMemory from app.models.boards import Board from app.models.gateways import Gateway -from app.models.tasks import Task +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig from app.services.openclaw.internal.agent_key import agent_key as _agent_key from app.services.openclaw.provisioning import ( OpenClawGatewayControlPlane, _workspace_path, ) -from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig logger = get_logger(__name__) @@ -108,8 +108,8 @@ def compute_desired_heartbeat( return DesiredHeartbeat(every=None, step=len(ladder) + 1, off=True) -async def _acquire_lock(session) -> bool: - result = await session.exec( +async def _acquire_lock(session: AsyncSession) -> bool: + result = await session.execute( text("SELECT pg_try_advisory_lock(:k1, :k2)"), params={"k1": _ADVISORY_LOCK_KEY_1, "k2": _ADVISORY_LOCK_KEY_2}, ) @@ -117,16 +117,16 @@ async def _acquire_lock(session) -> bool: return bool(row and row[0]) -async def _release_lock(session) -> None: - await session.exec( +async def _release_lock(session: AsyncSession) -> None: + await session.execute( text("SELECT pg_advisory_unlock(:k1, :k2)"), params={"k1": _ADVISORY_LOCK_KEY_1, "k2": _ADVISORY_LOCK_KEY_2}, ) -async def _latest_chat_by_board(session) -> dict[Any, datetime]: +async def _latest_chat_by_board(session: AsyncSession) -> dict[UUID, datetime]: # Only chat memory items. - rows = await session.exec( + rows = await session.execute( text( """ SELECT board_id, MAX(created_at) AS last_chat_at @@ -136,17 +136,17 @@ async def _latest_chat_by_board(session) -> dict[Any, datetime]: """, ), ) - result: dict[Any, datetime] = {} + result: dict[UUID, datetime] = {} for board_id, last_chat_at in rows.all(): if board_id and last_chat_at: result[board_id] = last_chat_at return result -async def _has_work_map(session) -> dict[Any, bool]: +async def _has_work_map(session: AsyncSession) -> dict[UUID, bool]: # Work = tasks assigned to the agent that are in_progress or review. # Map by agent_id. - rows = await session.exec( + rows = await session.execute( text( """ SELECT assigned_agent_id, COUNT(*) @@ -157,7 +157,7 @@ async def _has_work_map(session) -> dict[Any, bool]: """, ), ) - result: dict[Any, bool] = {} + result: dict[UUID, bool] = {} for agent_id, count in rows.all(): if agent_id: result[agent_id] = bool(count and int(count) > 0) @@ -185,7 +185,7 @@ async def _patch_gateway( GatewayClientConfig(url=gateway.url or "", token=gateway.token), ) # patch_agent_heartbeats supports None heartbeat entries after our patch. - await control_plane.patch_agent_heartbeats(agent_entries) # type: ignore[arg-type] + await control_plane.patch_agent_heartbeats(agent_entries) async def run_governor_once() -> None: @@ -211,8 +211,10 @@ async def run_governor_once() -> None: # Load board policies referenced by these agents. board_ids = {a.board_id for a in agents if a.board_id} boards = ( - await session.exec(select(Board).where(col(Board.id).in_(board_ids))) - ).all() if board_ids else [] + (await session.exec(select(Board).where(col(Board.id).in_(board_ids)))).all() + if board_ids + else [] + ) board_by_id = {b.id: b for b in boards} # Load gateways referenced by these agents. @@ -223,7 +225,7 @@ async def run_governor_once() -> None: gateway_by_id = {g.id: g for g in gateways} # Group patches per gateway. - patches_by_gateway: dict[Any, list[tuple[str, str, dict[str, Any] | None]]] = {} + patches_by_gateway: dict[UUID, list[tuple[str, str, dict[str, Any] | None]]] = {} changed = 0 for agent in agents: @@ -261,7 +263,11 @@ async def run_governor_once() -> None: if not ladder: ladder = None lead_cap = ( - str(getattr(board, "auto_heartbeat_governor_lead_cap_every", DEFAULT_LEAD_CAP_EVERY)) + str( + getattr( + board, "auto_heartbeat_governor_lead_cap_every", DEFAULT_LEAD_CAP_EVERY + ) + ) if board is not None else DEFAULT_LEAD_CAP_EVERY ) @@ -340,7 +346,11 @@ async def run_governor_once() -> None: logger.info( "auto_heartbeat.run_complete", - extra={"agents": len(agents), "db_changed": changed, "gateways": len(patches_by_gateway)}, + extra={ + "agents": len(agents), + "db_changed": changed, + "gateways": len(patches_by_gateway), + }, ) finally: try: diff --git a/backend/app/services/openclaw/provisioning.py b/backend/app/services/openclaw/provisioning.py index 522ef52a..4c6c7152 100644 --- a/backend/app/services/openclaw/provisioning.py +++ b/backend/app/services/openclaw/provisioning.py @@ -768,7 +768,7 @@ def _updated_agent_list( for agent_id, (workspace_path, heartbeat) in entry_by_id.items(): if agent_id in updated_ids: continue - entry = {"id": agent_id, "workspace": workspace_path} + entry: dict[str, Any] = {"id": agent_id, "workspace": workspace_path} if heartbeat is not None: entry["heartbeat"] = heartbeat new_list.append(entry) @@ -1077,7 +1077,7 @@ def _control_plane_for_gateway(gateway: Gateway) -> OpenClawGatewayControlPlane: async def _patch_gateway_agent_heartbeats( gateway: Gateway, *, - entries: list[tuple[str, str, dict[str, Any]]], + entries: list[tuple[str, str, dict[str, Any] | None]], ) -> None: """Patch multiple agent heartbeat configs in a single gateway config.patch call. @@ -1117,7 +1117,7 @@ class OpenClawGatewayProvisioner: if not gateway.workspace_root: msg = "gateway workspace_root is required" raise OpenClawGatewayError(msg) - entries: list[tuple[str, str, dict[str, Any]]] = [] + entries: list[tuple[str, str, dict[str, Any] | None]] = [] for agent in agents: agent_id = _agent_key(agent) workspace_path = _workspace_path(agent, gateway.workspace_root)