diff --git a/backend/app/core/config.py b/backend/app/core/config.py index a5b7f1f9..dd43cafe 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -84,6 +84,10 @@ class Settings(BaseSettings): # OpenClaw gateway runtime compatibility gateway_min_version: str = "2026.02.9" + # Auto heartbeat governor + auto_heartbeat_governor_enabled: bool = False + auto_heartbeat_governor_interval_seconds: int = 300 + # Logging log_level: str = "INFO" log_format: str = "text" diff --git a/backend/app/main.py b/backend/app/main.py index 857c29c8..f79c4477 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any @@ -439,15 +440,27 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: settings.db_auto_migrate, ) await init_db() + + governor_task = None if settings.rate_limit_backend == RateLimitBackend.REDIS: validate_rate_limit_redis(settings.rate_limit_redis_url) logger.info("app.lifecycle.rate_limit backend=redis") else: logger.info("app.lifecycle.rate_limit backend=memory") + if settings.auto_heartbeat_governor_enabled: + from app.services.auto_heartbeat_governor import governor_loop + + governor_task = asyncio.create_task(governor_loop()) + logger.info( + "app.auto_heartbeat.enabled", + extra={"interval_seconds": settings.auto_heartbeat_governor_interval_seconds}, + ) logger.info("app.lifecycle.started") try: yield finally: + if governor_task is not None: + governor_task.cancel() logger.info("app.lifecycle.stopped") diff --git a/backend/app/models/agents.py b/backend/app/models/agents.py index 9c97e970..73930fd8 100644 --- a/backend/app/models/agents.py +++ b/backend/app/models/agents.py @@ -49,5 +49,12 @@ class Agent(QueryModel, table=True): checkin_deadline_at: datetime | None = Field(default=None) last_provision_error: str | None = Field(default=None, sa_column=Column(Text)) is_board_lead: bool = Field(default=False, index=True) + + # Auto heartbeat governor state (Mission Control managed) + auto_heartbeat_enabled: bool = Field(default=True, index=True) + auto_heartbeat_step: int = Field(default=0) + auto_heartbeat_off: bool = Field(default=False, index=True) + auto_heartbeat_last_active_at: datetime | None = Field(default=None) + created_at: datetime = Field(default_factory=utcnow) updated_at: datetime = Field(default_factory=utcnow) diff --git a/backend/app/services/auto_heartbeat_governor.py b/backend/app/services/auto_heartbeat_governor.py new file mode 100644 index 00000000..deb9eb1e --- /dev/null +++ b/backend/app/services/auto_heartbeat_governor.py @@ -0,0 +1,325 @@ +"""Auto-idle heartbeat governor. + +Goal: reduce background model usage by dynamically backing off agent heartbeats when boards are idle, +while keeping agents responsive when there is activity. + +Design notes: +- Runs periodically (every N seconds) from the backend process. +- Uses a DB advisory lock to ensure only one governor instance runs at a time. +- Activity trigger (per Tes spec): ANY new board chat counts as activity. +- Leads never go fully off; they cap at 1h. +- "Fully off" is implemented by unsetting the agent heartbeat in the gateway config (not by using + an invalid value like every="disabled"). + +This module only decides and applies desired heartbeats. It does not wake sessions. +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from typing import Any + +from sqlalchemy import text +from sqlmodel import col, select + +from app.core.config import settings +from app.core.logging import get_logger +from app.core.time import utcnow +from app.db.session import async_session_maker +from app.models.agents import Agent +from app.models.board_memory import BoardMemory +from app.models.gateways import Gateway +from app.models.tasks import Task +from app.services.openclaw.internal.agent_key import agent_key as _agent_key +from app.services.openclaw.provisioning import ( + OpenClawGatewayControlPlane, + _workspace_path, +) +from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig + +logger = get_logger(__name__) + +# Governor cadence + behaviour. +ACTIVE_EVERY = "5m" +LADDER: list[str] = ["10m", "30m", "1h", "3h", "6h"] +LEAD_CAP_EVERY = "1h" +ACTIVE_WINDOW = timedelta(minutes=60) + +# Postgres advisory lock key (2x int32). Keep stable. +_ADVISORY_LOCK_KEY_1 = 424242 +_ADVISORY_LOCK_KEY_2 = 1701 + + +@dataclass(frozen=True) +class DesiredHeartbeat: + every: str | None # None means "off" (unset heartbeat in gateway config) + step: int + off: bool + + +def _is_active(*, now: datetime, last_chat_at: datetime | None, has_work: bool) -> bool: + if has_work: + return True + if last_chat_at is None: + return False + return (now - last_chat_at) <= ACTIVE_WINDOW + + +def compute_desired_heartbeat( + *, + is_lead: bool, + active: bool, + step: int, +) -> DesiredHeartbeat: + """Return desired heartbeat for an agent. + + step: governor-managed backoff index. + 0 means just became active. + 1..len(LADDER) means ladder index-1. + len(LADDER)+1 means off (for non-leads). + """ + + if active: + return DesiredHeartbeat(every=ACTIVE_EVERY, step=0, off=False) + + # If idle, advance one rung. + next_step = max(1, int(step) + 1) + + if is_lead: + # Leads never go fully off; cap at 1h. + if next_step <= 0: + next_every = ACTIVE_EVERY + elif next_step <= len(LADDER): + next_every = LADDER[next_step - 1] + else: + next_every = LEAD_CAP_EVERY + # Enforce cap. + if next_every in ("3h", "6h"): + next_every = LEAD_CAP_EVERY + return DesiredHeartbeat(every=next_every, step=min(next_step, len(LADDER)), off=False) + + # Non-leads can go fully off after max backoff. + if next_step <= len(LADDER): + return DesiredHeartbeat(every=LADDER[next_step - 1], step=next_step, off=False) + + return DesiredHeartbeat(every=None, step=len(LADDER) + 1, off=True) + + +async def _acquire_lock(session) -> bool: + result = await session.exec( + text("SELECT pg_try_advisory_lock(:k1, :k2)"), + params={"k1": _ADVISORY_LOCK_KEY_1, "k2": _ADVISORY_LOCK_KEY_2}, + ) + row = result.first() + return bool(row and row[0]) + + +async def _release_lock(session) -> None: + await session.exec( + text("SELECT pg_advisory_unlock(:k1, :k2)"), + params={"k1": _ADVISORY_LOCK_KEY_1, "k2": _ADVISORY_LOCK_KEY_2}, + ) + + +async def _latest_chat_by_board(session) -> dict[Any, datetime]: + # Only chat memory items. + rows = await session.exec( + text( + """ + SELECT board_id, MAX(created_at) AS last_chat_at + FROM boardmemory + WHERE is_chat = true + GROUP BY board_id + """, + ), + ) + result: dict[Any, datetime] = {} + for board_id, last_chat_at in rows.all(): + if board_id and last_chat_at: + result[board_id] = last_chat_at + return result + + +async def _has_work_map(session) -> dict[Any, bool]: + # Work = tasks assigned to the agent that are in_progress or review. + # Map by agent_id. + rows = await session.exec( + text( + """ + SELECT assigned_agent_id, COUNT(*) + FROM task + WHERE assigned_agent_id IS NOT NULL + AND status IN ('in_progress', 'review') + GROUP BY assigned_agent_id + """, + ), + ) + result: dict[Any, bool] = {} + for agent_id, count in rows.all(): + if agent_id: + result[agent_id] = bool(count and int(count) > 0) + return result + + +def _merge_heartbeat_config(agent: Agent, every: str) -> dict[str, Any]: + merged: dict[str, Any] = { + "every": every, + "target": "last", + "includeReasoning": False, + } + if isinstance(agent.heartbeat_config, dict): + merged.update({k: v for k, v in agent.heartbeat_config.items() if k != "every"}) + merged["every"] = every + return merged + + +async def _patch_gateway( + *, + gateway: Gateway, + agent_entries: list[tuple[str, str, dict[str, Any] | None]], +) -> None: + control_plane = OpenClawGatewayControlPlane( + GatewayClientConfig(url=gateway.url or "", token=gateway.token), + ) + # patch_agent_heartbeats supports None heartbeat entries after our patch. + await control_plane.patch_agent_heartbeats(agent_entries) # type: ignore[arg-type] + + +async def run_governor_once() -> None: + if not getattr(settings, "auto_heartbeat_governor_enabled", False): + return + + async with async_session_maker() as session: + locked = await _acquire_lock(session) + if not locked: + logger.debug("auto_heartbeat.skip_locked") + return + + try: + now = utcnow() + agents = (await session.exec(select(Agent))).all() + if not agents: + return + + # Batch compute activity signals. + chat_by_board = await _latest_chat_by_board(session) + has_work_by_agent = await _has_work_map(session) + + # Load gateways referenced by these agents. + gateway_ids = {a.gateway_id for a in agents if a.gateway_id} + gateways = ( + await session.exec(select(Gateway).where(col(Gateway.id).in_(gateway_ids))) + ).all() + gateway_by_id = {g.id: g for g in gateways} + + # Group patches per gateway. + patches_by_gateway: dict[Any, list[tuple[str, str, dict[str, Any] | None]]] = {} + changed = 0 + + for agent in agents: + if not agent.auto_heartbeat_enabled: + continue + gateway = gateway_by_id.get(agent.gateway_id) + if gateway is None or not gateway.url or not gateway.workspace_root: + continue + + last_chat_at = None + if agent.board_id: + last_chat_at = chat_by_board.get(agent.board_id) + has_work = has_work_by_agent.get(agent.id, False) + + active = _is_active(now=now, last_chat_at=last_chat_at, has_work=has_work) + desired = compute_desired_heartbeat( + is_lead=bool(agent.is_board_lead), + active=active, + step=int(agent.auto_heartbeat_step or 0), + ) + + # Determine if we need to update DB state. + needs_db = ( + bool(agent.auto_heartbeat_off) != desired.off + or int(agent.auto_heartbeat_step or 0) != desired.step + ) + + # Determine desired heartbeat payload. + agent_id = _agent_key(agent) + workspace_path = _workspace_path(agent, gateway.workspace_root) + + heartbeat_payload: dict[str, Any] | None + if desired.every is None: + heartbeat_payload = None + else: + heartbeat_payload = _merge_heartbeat_config(agent, desired.every) + + # Compare with current (only best-effort; gateway patch is idempotent). + # If agent is off, patch regardless (None removes heartbeat). + if desired.every is None: + patches_by_gateway.setdefault(gateway.id, []).append( + (agent_id, workspace_path, None), + ) + else: + # Only patch when 'every' differs or we were previously off. + current_every = None + if isinstance(agent.heartbeat_config, dict): + current_every = agent.heartbeat_config.get("every") + if current_every != desired.every or bool(agent.auto_heartbeat_off): + patches_by_gateway.setdefault(gateway.id, []).append( + (agent_id, workspace_path, heartbeat_payload), + ) + + if needs_db: + agent.auto_heartbeat_step = desired.step + agent.auto_heartbeat_off = desired.off + if active: + agent.auto_heartbeat_last_active_at = now + agent.updated_at = now + session.add(agent) + changed += 1 + + # Keep heartbeat_config in sync for non-off entries. + if desired.every is not None: + agent.heartbeat_config = heartbeat_payload + + if changed: + await session.commit() + + # Apply patches gateway-by-gateway. + for gateway_id, entries in patches_by_gateway.items(): + gateway = gateway_by_id.get(gateway_id) + if gateway is None: + continue + try: + await _patch_gateway(gateway=gateway, agent_entries=entries) + except Exception as exc: + logger.warning( + "auto_heartbeat.patch_failed", + extra={"gateway_id": str(gateway_id), "error": str(exc)}, + ) + + logger.info( + "auto_heartbeat.run_complete", + extra={"agents": len(agents), "db_changed": changed, "gateways": len(patches_by_gateway)}, + ) + finally: + try: + await _release_lock(session) + except Exception: + logger.exception("auto_heartbeat.unlock_failed") + + +async def governor_loop() -> None: + """Run the governor forever.""" + interval = getattr(settings, "auto_heartbeat_governor_interval_seconds", 300) + interval = max(30, int(interval)) + logger.info( + "auto_heartbeat.loop_start", + extra={"interval_seconds": interval}, + ) + while True: + try: + await run_governor_once() + except Exception: + logger.exception("auto_heartbeat.loop_error") + await asyncio.sleep(interval) diff --git a/backend/app/services/openclaw/provisioning.py b/backend/app/services/openclaw/provisioning.py index f49fecf6..522ef52a 100644 --- a/backend/app/services/openclaw/provisioning.py +++ b/backend/app/services/openclaw/provisioning.py @@ -544,7 +544,7 @@ class GatewayControlPlane(ABC): @abstractmethod async def patch_agent_heartbeats( self, - entries: list[tuple[str, str, dict[str, Any]]], + entries: list[tuple[str, str, dict[str, Any] | None]], ) -> None: raise NotImplementedError @@ -684,7 +684,7 @@ class OpenClawGatewayControlPlane(GatewayControlPlane): async def patch_agent_heartbeats( self, - entries: list[tuple[str, str, dict[str, Any]]], + entries: list[tuple[str, str, dict[str, Any] | None]], ) -> None: base_hash, raw_list, config_data = await _gateway_config_agent_list(self._config) entry_by_id = _heartbeat_entry_map(entries) @@ -732,8 +732,8 @@ async def _gateway_config_agent_list( def _heartbeat_entry_map( - entries: list[tuple[str, str, dict[str, Any]]], -) -> dict[str, tuple[str, dict[str, Any]]]: + entries: list[tuple[str, str, dict[str, Any] | None]], +) -> dict[str, tuple[str, dict[str, Any] | None]]: return { agent_id: (workspace_path, heartbeat) for agent_id, workspace_path, heartbeat in entries } @@ -741,7 +741,7 @@ def _heartbeat_entry_map( def _updated_agent_list( raw_list: list[object], - entry_by_id: dict[str, tuple[str, dict[str, Any]]], + entry_by_id: dict[str, tuple[str, dict[str, Any] | None]], ) -> list[object]: updated_ids: set[str] = set() new_list: list[object] = [] @@ -758,16 +758,20 @@ def _updated_agent_list( workspace_path, heartbeat = entry_by_id[agent_id] new_entry = dict(raw_entry) new_entry["workspace"] = workspace_path - new_entry["heartbeat"] = heartbeat + if heartbeat is None: + new_entry.pop("heartbeat", None) + else: + new_entry["heartbeat"] = heartbeat new_list.append(new_entry) updated_ids.add(agent_id) for agent_id, (workspace_path, heartbeat) in entry_by_id.items(): if agent_id in updated_ids: continue - new_list.append( - {"id": agent_id, "workspace": workspace_path, "heartbeat": heartbeat}, - ) + entry = {"id": agent_id, "workspace": workspace_path} + if heartbeat is not None: + entry["heartbeat"] = heartbeat + new_list.append(entry) return new_list diff --git a/backend/migrations/versions/f7d54a8c5098_add_auto_heartbeat_governor_fields.py b/backend/migrations/versions/f7d54a8c5098_add_auto_heartbeat_governor_fields.py new file mode 100644 index 00000000..c8a8f7ff --- /dev/null +++ b/backend/migrations/versions/f7d54a8c5098_add_auto_heartbeat_governor_fields.py @@ -0,0 +1,60 @@ +"""add auto heartbeat governor fields + +Revision ID: f7d54a8c5098 +Revises: b7a1d9c3e4f5 +Create Date: 2026-02-23 11:04:51.730825 + +""" +from __future__ import annotations + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'f7d54a8c5098' +down_revision = 'b7a1d9c3e4f5' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "agents", + sa.Column("auto_heartbeat_enabled", sa.Boolean(), nullable=False, server_default=sa.text("true")), + ) + op.add_column( + "agents", + sa.Column("auto_heartbeat_step", sa.Integer(), nullable=False, server_default="0"), + ) + op.add_column( + "agents", + sa.Column("auto_heartbeat_off", sa.Boolean(), nullable=False, server_default=sa.text("false")), + ) + op.add_column( + "agents", + sa.Column("auto_heartbeat_last_active_at", sa.DateTime(timezone=True), nullable=True), + ) + + op.create_index( + op.f("ix_agents_auto_heartbeat_enabled"), + "agents", + ["auto_heartbeat_enabled"], + unique=False, + ) + op.create_index( + op.f("ix_agents_auto_heartbeat_off"), + "agents", + ["auto_heartbeat_off"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix_agents_auto_heartbeat_off"), table_name="agents") + op.drop_index(op.f("ix_agents_auto_heartbeat_enabled"), table_name="agents") + + op.drop_column("agents", "auto_heartbeat_last_active_at") + op.drop_column("agents", "auto_heartbeat_off") + op.drop_column("agents", "auto_heartbeat_step") + op.drop_column("agents", "auto_heartbeat_enabled") diff --git a/backend/tests/test_auto_heartbeat_governor_policy.py b/backend/tests/test_auto_heartbeat_governor_policy.py new file mode 100644 index 00000000..628708eb --- /dev/null +++ b/backend/tests/test_auto_heartbeat_governor_policy.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from app.services.auto_heartbeat_governor import compute_desired_heartbeat + + +def test_policy_active_resets_to_5m() -> None: + desired = compute_desired_heartbeat(is_lead=False, active=True, step=3) + assert desired.every == "5m" + assert desired.step == 0 + assert desired.off is False + + +def test_policy_backoff_steps_non_lead() -> None: + # idle step 0 -> 10m + d1 = compute_desired_heartbeat(is_lead=False, active=False, step=0) + assert d1.every == "10m" + assert d1.off is False + + # keep idling up the ladder + d2 = compute_desired_heartbeat(is_lead=False, active=False, step=d1.step) + assert d2.every == "30m" + + d3 = compute_desired_heartbeat(is_lead=False, active=False, step=d2.step) + assert d3.every == "1h" + + d4 = compute_desired_heartbeat(is_lead=False, active=False, step=d3.step) + assert d4.every == "3h" + + d5 = compute_desired_heartbeat(is_lead=False, active=False, step=d4.step) + assert d5.every == "6h" + + # next step goes fully off + d6 = compute_desired_heartbeat(is_lead=False, active=False, step=d5.step) + assert d6.every is None + assert d6.off is True + + +def test_policy_lead_caps_at_1h_never_off() -> None: + # step beyond ladder should still cap at 1h + d = compute_desired_heartbeat(is_lead=True, active=False, step=999) + assert d.every == "1h" + assert d.off is False