feat: auto heartbeat governor (elastic backoff)
(cherry picked from commit 2d1d691879)
This commit is contained in:
committed by
Abhimanyu Saharan
parent
fa445127d9
commit
2a3b1022c2
@@ -84,6 +84,10 @@ class Settings(BaseSettings):
|
||||
# OpenClaw gateway runtime compatibility
|
||||
gateway_min_version: str = "2026.02.9"
|
||||
|
||||
# Auto heartbeat governor
|
||||
auto_heartbeat_governor_enabled: bool = False
|
||||
auto_heartbeat_governor_interval_seconds: int = 300
|
||||
|
||||
# Logging
|
||||
log_level: str = "INFO"
|
||||
log_format: str = "text"
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
@@ -439,15 +440,27 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
|
||||
settings.db_auto_migrate,
|
||||
)
|
||||
await init_db()
|
||||
|
||||
governor_task = None
|
||||
if settings.rate_limit_backend == RateLimitBackend.REDIS:
|
||||
validate_rate_limit_redis(settings.rate_limit_redis_url)
|
||||
logger.info("app.lifecycle.rate_limit backend=redis")
|
||||
else:
|
||||
logger.info("app.lifecycle.rate_limit backend=memory")
|
||||
if settings.auto_heartbeat_governor_enabled:
|
||||
from app.services.auto_heartbeat_governor import governor_loop
|
||||
|
||||
governor_task = asyncio.create_task(governor_loop())
|
||||
logger.info(
|
||||
"app.auto_heartbeat.enabled",
|
||||
extra={"interval_seconds": settings.auto_heartbeat_governor_interval_seconds},
|
||||
)
|
||||
logger.info("app.lifecycle.started")
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if governor_task is not None:
|
||||
governor_task.cancel()
|
||||
logger.info("app.lifecycle.stopped")
|
||||
|
||||
|
||||
|
||||
@@ -49,5 +49,12 @@ class Agent(QueryModel, table=True):
|
||||
checkin_deadline_at: datetime | None = Field(default=None)
|
||||
last_provision_error: str | None = Field(default=None, sa_column=Column(Text))
|
||||
is_board_lead: bool = Field(default=False, index=True)
|
||||
|
||||
# Auto heartbeat governor state (Mission Control managed)
|
||||
auto_heartbeat_enabled: bool = Field(default=True, index=True)
|
||||
auto_heartbeat_step: int = Field(default=0)
|
||||
auto_heartbeat_off: bool = Field(default=False, index=True)
|
||||
auto_heartbeat_last_active_at: datetime | None = Field(default=None)
|
||||
|
||||
created_at: datetime = Field(default_factory=utcnow)
|
||||
updated_at: datetime = Field(default_factory=utcnow)
|
||||
|
||||
325
backend/app/services/auto_heartbeat_governor.py
Normal file
325
backend/app/services/auto_heartbeat_governor.py
Normal file
@@ -0,0 +1,325 @@
|
||||
"""Auto-idle heartbeat governor.
|
||||
|
||||
Goal: reduce background model usage by dynamically backing off agent heartbeats when boards are idle,
|
||||
while keeping agents responsive when there is activity.
|
||||
|
||||
Design notes:
|
||||
- Runs periodically (every N seconds) from the backend process.
|
||||
- Uses a DB advisory lock to ensure only one governor instance runs at a time.
|
||||
- Activity trigger (per Tes spec): ANY new board chat counts as activity.
|
||||
- Leads never go fully off; they cap at 1h.
|
||||
- "Fully off" is implemented by unsetting the agent heartbeat in the gateway config (not by using
|
||||
an invalid value like every="disabled").
|
||||
|
||||
This module only decides and applies desired heartbeats. It does not wake sessions.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlmodel import col, select
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
from app.core.time import utcnow
|
||||
from app.db.session import async_session_maker
|
||||
from app.models.agents import Agent
|
||||
from app.models.board_memory import BoardMemory
|
||||
from app.models.gateways import Gateway
|
||||
from app.models.tasks import Task
|
||||
from app.services.openclaw.internal.agent_key import agent_key as _agent_key
|
||||
from app.services.openclaw.provisioning import (
|
||||
OpenClawGatewayControlPlane,
|
||||
_workspace_path,
|
||||
)
|
||||
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Governor cadence + behaviour.
|
||||
ACTIVE_EVERY = "5m"
|
||||
LADDER: list[str] = ["10m", "30m", "1h", "3h", "6h"]
|
||||
LEAD_CAP_EVERY = "1h"
|
||||
ACTIVE_WINDOW = timedelta(minutes=60)
|
||||
|
||||
# Postgres advisory lock key (2x int32). Keep stable.
|
||||
_ADVISORY_LOCK_KEY_1 = 424242
|
||||
_ADVISORY_LOCK_KEY_2 = 1701
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DesiredHeartbeat:
|
||||
every: str | None # None means "off" (unset heartbeat in gateway config)
|
||||
step: int
|
||||
off: bool
|
||||
|
||||
|
||||
def _is_active(*, now: datetime, last_chat_at: datetime | None, has_work: bool) -> bool:
|
||||
if has_work:
|
||||
return True
|
||||
if last_chat_at is None:
|
||||
return False
|
||||
return (now - last_chat_at) <= ACTIVE_WINDOW
|
||||
|
||||
|
||||
def compute_desired_heartbeat(
|
||||
*,
|
||||
is_lead: bool,
|
||||
active: bool,
|
||||
step: int,
|
||||
) -> DesiredHeartbeat:
|
||||
"""Return desired heartbeat for an agent.
|
||||
|
||||
step: governor-managed backoff index.
|
||||
0 means just became active.
|
||||
1..len(LADDER) means ladder index-1.
|
||||
len(LADDER)+1 means off (for non-leads).
|
||||
"""
|
||||
|
||||
if active:
|
||||
return DesiredHeartbeat(every=ACTIVE_EVERY, step=0, off=False)
|
||||
|
||||
# If idle, advance one rung.
|
||||
next_step = max(1, int(step) + 1)
|
||||
|
||||
if is_lead:
|
||||
# Leads never go fully off; cap at 1h.
|
||||
if next_step <= 0:
|
||||
next_every = ACTIVE_EVERY
|
||||
elif next_step <= len(LADDER):
|
||||
next_every = LADDER[next_step - 1]
|
||||
else:
|
||||
next_every = LEAD_CAP_EVERY
|
||||
# Enforce cap.
|
||||
if next_every in ("3h", "6h"):
|
||||
next_every = LEAD_CAP_EVERY
|
||||
return DesiredHeartbeat(every=next_every, step=min(next_step, len(LADDER)), off=False)
|
||||
|
||||
# Non-leads can go fully off after max backoff.
|
||||
if next_step <= len(LADDER):
|
||||
return DesiredHeartbeat(every=LADDER[next_step - 1], step=next_step, off=False)
|
||||
|
||||
return DesiredHeartbeat(every=None, step=len(LADDER) + 1, off=True)
|
||||
|
||||
|
||||
async def _acquire_lock(session) -> bool:
|
||||
result = await session.exec(
|
||||
text("SELECT pg_try_advisory_lock(:k1, :k2)"),
|
||||
params={"k1": _ADVISORY_LOCK_KEY_1, "k2": _ADVISORY_LOCK_KEY_2},
|
||||
)
|
||||
row = result.first()
|
||||
return bool(row and row[0])
|
||||
|
||||
|
||||
async def _release_lock(session) -> None:
|
||||
await session.exec(
|
||||
text("SELECT pg_advisory_unlock(:k1, :k2)"),
|
||||
params={"k1": _ADVISORY_LOCK_KEY_1, "k2": _ADVISORY_LOCK_KEY_2},
|
||||
)
|
||||
|
||||
|
||||
async def _latest_chat_by_board(session) -> dict[Any, datetime]:
|
||||
# Only chat memory items.
|
||||
rows = await session.exec(
|
||||
text(
|
||||
"""
|
||||
SELECT board_id, MAX(created_at) AS last_chat_at
|
||||
FROM boardmemory
|
||||
WHERE is_chat = true
|
||||
GROUP BY board_id
|
||||
""",
|
||||
),
|
||||
)
|
||||
result: dict[Any, datetime] = {}
|
||||
for board_id, last_chat_at in rows.all():
|
||||
if board_id and last_chat_at:
|
||||
result[board_id] = last_chat_at
|
||||
return result
|
||||
|
||||
|
||||
async def _has_work_map(session) -> dict[Any, bool]:
|
||||
# Work = tasks assigned to the agent that are in_progress or review.
|
||||
# Map by agent_id.
|
||||
rows = await session.exec(
|
||||
text(
|
||||
"""
|
||||
SELECT assigned_agent_id, COUNT(*)
|
||||
FROM task
|
||||
WHERE assigned_agent_id IS NOT NULL
|
||||
AND status IN ('in_progress', 'review')
|
||||
GROUP BY assigned_agent_id
|
||||
""",
|
||||
),
|
||||
)
|
||||
result: dict[Any, bool] = {}
|
||||
for agent_id, count in rows.all():
|
||||
if agent_id:
|
||||
result[agent_id] = bool(count and int(count) > 0)
|
||||
return result
|
||||
|
||||
|
||||
def _merge_heartbeat_config(agent: Agent, every: str) -> dict[str, Any]:
|
||||
merged: dict[str, Any] = {
|
||||
"every": every,
|
||||
"target": "last",
|
||||
"includeReasoning": False,
|
||||
}
|
||||
if isinstance(agent.heartbeat_config, dict):
|
||||
merged.update({k: v for k, v in agent.heartbeat_config.items() if k != "every"})
|
||||
merged["every"] = every
|
||||
return merged
|
||||
|
||||
|
||||
async def _patch_gateway(
|
||||
*,
|
||||
gateway: Gateway,
|
||||
agent_entries: list[tuple[str, str, dict[str, Any] | None]],
|
||||
) -> None:
|
||||
control_plane = OpenClawGatewayControlPlane(
|
||||
GatewayClientConfig(url=gateway.url or "", token=gateway.token),
|
||||
)
|
||||
# patch_agent_heartbeats supports None heartbeat entries after our patch.
|
||||
await control_plane.patch_agent_heartbeats(agent_entries) # type: ignore[arg-type]
|
||||
|
||||
|
||||
async def run_governor_once() -> None:
|
||||
if not getattr(settings, "auto_heartbeat_governor_enabled", False):
|
||||
return
|
||||
|
||||
async with async_session_maker() as session:
|
||||
locked = await _acquire_lock(session)
|
||||
if not locked:
|
||||
logger.debug("auto_heartbeat.skip_locked")
|
||||
return
|
||||
|
||||
try:
|
||||
now = utcnow()
|
||||
agents = (await session.exec(select(Agent))).all()
|
||||
if not agents:
|
||||
return
|
||||
|
||||
# Batch compute activity signals.
|
||||
chat_by_board = await _latest_chat_by_board(session)
|
||||
has_work_by_agent = await _has_work_map(session)
|
||||
|
||||
# Load gateways referenced by these agents.
|
||||
gateway_ids = {a.gateway_id for a in agents if a.gateway_id}
|
||||
gateways = (
|
||||
await session.exec(select(Gateway).where(col(Gateway.id).in_(gateway_ids)))
|
||||
).all()
|
||||
gateway_by_id = {g.id: g for g in gateways}
|
||||
|
||||
# Group patches per gateway.
|
||||
patches_by_gateway: dict[Any, list[tuple[str, str, dict[str, Any] | None]]] = {}
|
||||
changed = 0
|
||||
|
||||
for agent in agents:
|
||||
if not agent.auto_heartbeat_enabled:
|
||||
continue
|
||||
gateway = gateway_by_id.get(agent.gateway_id)
|
||||
if gateway is None or not gateway.url or not gateway.workspace_root:
|
||||
continue
|
||||
|
||||
last_chat_at = None
|
||||
if agent.board_id:
|
||||
last_chat_at = chat_by_board.get(agent.board_id)
|
||||
has_work = has_work_by_agent.get(agent.id, False)
|
||||
|
||||
active = _is_active(now=now, last_chat_at=last_chat_at, has_work=has_work)
|
||||
desired = compute_desired_heartbeat(
|
||||
is_lead=bool(agent.is_board_lead),
|
||||
active=active,
|
||||
step=int(agent.auto_heartbeat_step or 0),
|
||||
)
|
||||
|
||||
# Determine if we need to update DB state.
|
||||
needs_db = (
|
||||
bool(agent.auto_heartbeat_off) != desired.off
|
||||
or int(agent.auto_heartbeat_step or 0) != desired.step
|
||||
)
|
||||
|
||||
# Determine desired heartbeat payload.
|
||||
agent_id = _agent_key(agent)
|
||||
workspace_path = _workspace_path(agent, gateway.workspace_root)
|
||||
|
||||
heartbeat_payload: dict[str, Any] | None
|
||||
if desired.every is None:
|
||||
heartbeat_payload = None
|
||||
else:
|
||||
heartbeat_payload = _merge_heartbeat_config(agent, desired.every)
|
||||
|
||||
# Compare with current (only best-effort; gateway patch is idempotent).
|
||||
# If agent is off, patch regardless (None removes heartbeat).
|
||||
if desired.every is None:
|
||||
patches_by_gateway.setdefault(gateway.id, []).append(
|
||||
(agent_id, workspace_path, None),
|
||||
)
|
||||
else:
|
||||
# Only patch when 'every' differs or we were previously off.
|
||||
current_every = None
|
||||
if isinstance(agent.heartbeat_config, dict):
|
||||
current_every = agent.heartbeat_config.get("every")
|
||||
if current_every != desired.every or bool(agent.auto_heartbeat_off):
|
||||
patches_by_gateway.setdefault(gateway.id, []).append(
|
||||
(agent_id, workspace_path, heartbeat_payload),
|
||||
)
|
||||
|
||||
if needs_db:
|
||||
agent.auto_heartbeat_step = desired.step
|
||||
agent.auto_heartbeat_off = desired.off
|
||||
if active:
|
||||
agent.auto_heartbeat_last_active_at = now
|
||||
agent.updated_at = now
|
||||
session.add(agent)
|
||||
changed += 1
|
||||
|
||||
# Keep heartbeat_config in sync for non-off entries.
|
||||
if desired.every is not None:
|
||||
agent.heartbeat_config = heartbeat_payload
|
||||
|
||||
if changed:
|
||||
await session.commit()
|
||||
|
||||
# Apply patches gateway-by-gateway.
|
||||
for gateway_id, entries in patches_by_gateway.items():
|
||||
gateway = gateway_by_id.get(gateway_id)
|
||||
if gateway is None:
|
||||
continue
|
||||
try:
|
||||
await _patch_gateway(gateway=gateway, agent_entries=entries)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"auto_heartbeat.patch_failed",
|
||||
extra={"gateway_id": str(gateway_id), "error": str(exc)},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"auto_heartbeat.run_complete",
|
||||
extra={"agents": len(agents), "db_changed": changed, "gateways": len(patches_by_gateway)},
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
await _release_lock(session)
|
||||
except Exception:
|
||||
logger.exception("auto_heartbeat.unlock_failed")
|
||||
|
||||
|
||||
async def governor_loop() -> None:
|
||||
"""Run the governor forever."""
|
||||
interval = getattr(settings, "auto_heartbeat_governor_interval_seconds", 300)
|
||||
interval = max(30, int(interval))
|
||||
logger.info(
|
||||
"auto_heartbeat.loop_start",
|
||||
extra={"interval_seconds": interval},
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
await run_governor_once()
|
||||
except Exception:
|
||||
logger.exception("auto_heartbeat.loop_error")
|
||||
await asyncio.sleep(interval)
|
||||
@@ -544,7 +544,7 @@ class GatewayControlPlane(ABC):
|
||||
@abstractmethod
|
||||
async def patch_agent_heartbeats(
|
||||
self,
|
||||
entries: list[tuple[str, str, dict[str, Any]]],
|
||||
entries: list[tuple[str, str, dict[str, Any] | None]],
|
||||
) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -684,7 +684,7 @@ class OpenClawGatewayControlPlane(GatewayControlPlane):
|
||||
|
||||
async def patch_agent_heartbeats(
|
||||
self,
|
||||
entries: list[tuple[str, str, dict[str, Any]]],
|
||||
entries: list[tuple[str, str, dict[str, Any] | None]],
|
||||
) -> None:
|
||||
base_hash, raw_list, config_data = await _gateway_config_agent_list(self._config)
|
||||
entry_by_id = _heartbeat_entry_map(entries)
|
||||
@@ -732,8 +732,8 @@ async def _gateway_config_agent_list(
|
||||
|
||||
|
||||
def _heartbeat_entry_map(
|
||||
entries: list[tuple[str, str, dict[str, Any]]],
|
||||
) -> dict[str, tuple[str, dict[str, Any]]]:
|
||||
entries: list[tuple[str, str, dict[str, Any] | None]],
|
||||
) -> dict[str, tuple[str, dict[str, Any] | None]]:
|
||||
return {
|
||||
agent_id: (workspace_path, heartbeat) for agent_id, workspace_path, heartbeat in entries
|
||||
}
|
||||
@@ -741,7 +741,7 @@ def _heartbeat_entry_map(
|
||||
|
||||
def _updated_agent_list(
|
||||
raw_list: list[object],
|
||||
entry_by_id: dict[str, tuple[str, dict[str, Any]]],
|
||||
entry_by_id: dict[str, tuple[str, dict[str, Any] | None]],
|
||||
) -> list[object]:
|
||||
updated_ids: set[str] = set()
|
||||
new_list: list[object] = []
|
||||
@@ -758,16 +758,20 @@ def _updated_agent_list(
|
||||
workspace_path, heartbeat = entry_by_id[agent_id]
|
||||
new_entry = dict(raw_entry)
|
||||
new_entry["workspace"] = workspace_path
|
||||
new_entry["heartbeat"] = heartbeat
|
||||
if heartbeat is None:
|
||||
new_entry.pop("heartbeat", None)
|
||||
else:
|
||||
new_entry["heartbeat"] = heartbeat
|
||||
new_list.append(new_entry)
|
||||
updated_ids.add(agent_id)
|
||||
|
||||
for agent_id, (workspace_path, heartbeat) in entry_by_id.items():
|
||||
if agent_id in updated_ids:
|
||||
continue
|
||||
new_list.append(
|
||||
{"id": agent_id, "workspace": workspace_path, "heartbeat": heartbeat},
|
||||
)
|
||||
entry = {"id": agent_id, "workspace": workspace_path}
|
||||
if heartbeat is not None:
|
||||
entry["heartbeat"] = heartbeat
|
||||
new_list.append(entry)
|
||||
|
||||
return new_list
|
||||
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
"""add auto heartbeat governor fields
|
||||
|
||||
Revision ID: f7d54a8c5098
|
||||
Revises: b7a1d9c3e4f5
|
||||
Create Date: 2026-02-23 11:04:51.730825
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f7d54a8c5098'
|
||||
down_revision = 'b7a1d9c3e4f5'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"agents",
|
||||
sa.Column("auto_heartbeat_enabled", sa.Boolean(), nullable=False, server_default=sa.text("true")),
|
||||
)
|
||||
op.add_column(
|
||||
"agents",
|
||||
sa.Column("auto_heartbeat_step", sa.Integer(), nullable=False, server_default="0"),
|
||||
)
|
||||
op.add_column(
|
||||
"agents",
|
||||
sa.Column("auto_heartbeat_off", sa.Boolean(), nullable=False, server_default=sa.text("false")),
|
||||
)
|
||||
op.add_column(
|
||||
"agents",
|
||||
sa.Column("auto_heartbeat_last_active_at", sa.DateTime(timezone=True), nullable=True),
|
||||
)
|
||||
|
||||
op.create_index(
|
||||
op.f("ix_agents_auto_heartbeat_enabled"),
|
||||
"agents",
|
||||
["auto_heartbeat_enabled"],
|
||||
unique=False,
|
||||
)
|
||||
op.create_index(
|
||||
op.f("ix_agents_auto_heartbeat_off"),
|
||||
"agents",
|
||||
["auto_heartbeat_off"],
|
||||
unique=False,
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_index(op.f("ix_agents_auto_heartbeat_off"), table_name="agents")
|
||||
op.drop_index(op.f("ix_agents_auto_heartbeat_enabled"), table_name="agents")
|
||||
|
||||
op.drop_column("agents", "auto_heartbeat_last_active_at")
|
||||
op.drop_column("agents", "auto_heartbeat_off")
|
||||
op.drop_column("agents", "auto_heartbeat_step")
|
||||
op.drop_column("agents", "auto_heartbeat_enabled")
|
||||
42
backend/tests/test_auto_heartbeat_governor_policy.py
Normal file
42
backend/tests/test_auto_heartbeat_governor_policy.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from app.services.auto_heartbeat_governor import compute_desired_heartbeat
|
||||
|
||||
|
||||
def test_policy_active_resets_to_5m() -> None:
|
||||
desired = compute_desired_heartbeat(is_lead=False, active=True, step=3)
|
||||
assert desired.every == "5m"
|
||||
assert desired.step == 0
|
||||
assert desired.off is False
|
||||
|
||||
|
||||
def test_policy_backoff_steps_non_lead() -> None:
|
||||
# idle step 0 -> 10m
|
||||
d1 = compute_desired_heartbeat(is_lead=False, active=False, step=0)
|
||||
assert d1.every == "10m"
|
||||
assert d1.off is False
|
||||
|
||||
# keep idling up the ladder
|
||||
d2 = compute_desired_heartbeat(is_lead=False, active=False, step=d1.step)
|
||||
assert d2.every == "30m"
|
||||
|
||||
d3 = compute_desired_heartbeat(is_lead=False, active=False, step=d2.step)
|
||||
assert d3.every == "1h"
|
||||
|
||||
d4 = compute_desired_heartbeat(is_lead=False, active=False, step=d3.step)
|
||||
assert d4.every == "3h"
|
||||
|
||||
d5 = compute_desired_heartbeat(is_lead=False, active=False, step=d4.step)
|
||||
assert d5.every == "6h"
|
||||
|
||||
# next step goes fully off
|
||||
d6 = compute_desired_heartbeat(is_lead=False, active=False, step=d5.step)
|
||||
assert d6.every is None
|
||||
assert d6.off is True
|
||||
|
||||
|
||||
def test_policy_lead_caps_at_1h_never_off() -> None:
|
||||
# step beyond ladder should still cap at 1h
|
||||
d = compute_desired_heartbeat(is_lead=True, active=False, step=999)
|
||||
assert d.every == "1h"
|
||||
assert d.off is False
|
||||
Reference in New Issue
Block a user