From 3f0475e6e4548340b038d78ba04ddb16b2112928 Mon Sep 17 00:00:00 2001 From: DevBot Date: Mon, 23 Feb 2026 11:51:00 +0000 Subject: [PATCH] feat(governor): board-scoped auto heartbeat policy endpoints --- backend/app/api/boards.py | 50 +++++++ backend/app/models/boards.py | 11 ++ backend/app/schemas/agents.py | 20 +++ .../app/schemas/auto_heartbeat_governor.py | 128 ++++++++++++++++++ .../app/services/auto_heartbeat_governor.py | 77 ++++++++--- ...dd_board_auto_heartbeat_governor_policy.py | 74 ++++++++++ 6 files changed, 341 insertions(+), 19 deletions(-) create mode 100644 backend/app/schemas/auto_heartbeat_governor.py create mode 100644 backend/migrations/versions/1c3a2b7d9e10_add_board_auto_heartbeat_governor_policy.py diff --git a/backend/app/api/boards.py b/backend/app/api/boards.py index 01d874d1..5b941084 100644 --- a/backend/app/api/boards.py +++ b/backend/app/api/boards.py @@ -26,6 +26,10 @@ from app.models.agents import Agent from app.models.board_groups import BoardGroup from app.models.boards import Board from app.models.gateways import Gateway +from app.schemas.auto_heartbeat_governor import ( + AutoHeartbeatGovernorPolicyRead, + AutoHeartbeatGovernorPolicyUpdate, +) from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate from app.schemas.common import OkResponse from app.schemas.pagination import DefaultLimitOffsetPage @@ -408,6 +412,52 @@ def get_board( return board +@router.get( + "/{board_id}/auto-heartbeat-governor-policy", + response_model=AutoHeartbeatGovernorPolicyRead, +) +def get_auto_heartbeat_governor_policy( + board: Board = BOARD_USER_READ_DEP, +) -> AutoHeartbeatGovernorPolicyRead: + """Get board-scoped auto heartbeat governor policy.""" + return AutoHeartbeatGovernorPolicyRead( + enabled=bool(board.auto_heartbeat_governor_enabled), + run_interval_seconds=int(board.auto_heartbeat_governor_run_interval_seconds), + ladder=list(board.auto_heartbeat_governor_ladder or []), + lead_cap_every=str(board.auto_heartbeat_governor_lead_cap_every), + activity_trigger_type=str(board.auto_heartbeat_governor_activity_trigger_type), + ) + + +@router.patch( + "/{board_id}/auto-heartbeat-governor-policy", + response_model=AutoHeartbeatGovernorPolicyRead, +) +async def update_auto_heartbeat_governor_policy( + payload: AutoHeartbeatGovernorPolicyUpdate, + session: AsyncSession = SESSION_DEP, + board: Board = BOARD_USER_WRITE_DEP, +) -> AutoHeartbeatGovernorPolicyRead: + """Patch board-scoped auto heartbeat governor policy.""" + updates = payload.model_dump(exclude_unset=True) + if "enabled" in updates: + board.auto_heartbeat_governor_enabled = bool(updates["enabled"]) + if "run_interval_seconds" in updates: + board.auto_heartbeat_governor_run_interval_seconds = int(updates["run_interval_seconds"]) + if "ladder" in updates: + board.auto_heartbeat_governor_ladder = list(updates["ladder"]) + if "lead_cap_every" in updates: + board.auto_heartbeat_governor_lead_cap_every = str(updates["lead_cap_every"]) + if "activity_trigger_type" in updates: + trigger = updates["activity_trigger_type"] + board.auto_heartbeat_governor_activity_trigger_type = ( + trigger.value if hasattr(trigger, "value") else str(trigger) + ) + board.updated_at = utcnow() + await crud.save(session, board) + return get_auto_heartbeat_governor_policy(board) + + @router.get("/{board_id}/snapshot", response_model=BoardSnapshot) async def get_board_snapshot( board: Board = BOARD_ACTOR_READ_DEP, diff --git a/backend/app/models/boards.py b/backend/app/models/boards.py index 5d26340b..5fc25096 100644 --- a/backend/app/models/boards.py +++ b/backend/app/models/boards.py @@ -44,5 +44,16 @@ class Board(TenantScoped, table=True): block_status_changes_with_pending_approval: bool = Field(default=False) only_lead_can_change_status: bool = Field(default=False) max_agents: int = Field(default=1) + + # Auto heartbeat governor policy (board-scoped). + auto_heartbeat_governor_enabled: bool = Field(default=True) + auto_heartbeat_governor_run_interval_seconds: int = Field(default=300) + auto_heartbeat_governor_ladder: list[str] = Field( + default_factory=lambda: ["10m", "30m", "1h", "3h", "6h"], + sa_column=Column(JSON), + ) + auto_heartbeat_governor_lead_cap_every: str = Field(default="1h") + auto_heartbeat_governor_activity_trigger_type: str = Field(default="B") + created_at: datetime = Field(default_factory=utcnow) updated_at: datetime = Field(default_factory=utcnow) diff --git a/backend/app/schemas/agents.py b/backend/app/schemas/agents.py index 0e115d56..9c21f187 100644 --- a/backend/app/schemas/agents.py +++ b/backend/app/schemas/agents.py @@ -175,6 +175,10 @@ class AgentUpdate(SQLModel): description="Optional heartbeat policy override.", examples=[{"interval_seconds": 45}], ) + auto_heartbeat_enabled: bool | None = Field( + default=None, + description="If false, Mission Control's auto heartbeat governor will not manage this agent.", + ) identity_profile: dict[str, Any] | None = Field( default=None, description="Optional identity profile update values.", @@ -236,6 +240,22 @@ class AgentRead(AgentBase): default=False, description="Whether this agent is the primary gateway agent.", ) + auto_heartbeat_enabled: bool = Field( + default=True, + description="Whether Mission Control's auto heartbeat governor is allowed to manage this agent.", + ) + auto_heartbeat_step: int = Field( + default=0, + description="Current backoff ladder step maintained by the governor.", + ) + auto_heartbeat_off: bool = Field( + default=False, + description="Whether the governor has currently set this agent to fully-off (unset heartbeat).", + ) + auto_heartbeat_last_active_at: datetime | None = Field( + default=None, + description="Last time the governor considered this agent active.", + ) openclaw_session_id: str | None = Field( default=None, description="Optional openclaw session token.", diff --git a/backend/app/schemas/auto_heartbeat_governor.py b/backend/app/schemas/auto_heartbeat_governor.py new file mode 100644 index 00000000..653d0371 --- /dev/null +++ b/backend/app/schemas/auto_heartbeat_governor.py @@ -0,0 +1,128 @@ +"""Schemas for auto heartbeat governor policy configuration.""" + +from __future__ import annotations + +from enum import Enum +from typing import Annotated + +from pydantic import BaseModel, Field, field_validator + + +class ActivityTriggerType(str, Enum): + """Which events count as 'activity' for resetting the backoff ladder.""" + + A = "A" # board chat only + B = "B" # board chat OR has_work (assigned in-progress/review) + + +DurationStr = Annotated[ + str, + Field( + description="Duration string like 30s, 5m, 1h, 1d (no disabled).", + examples=["10m", "1h"], + ), +] + + +def _validate_duration(value: str) -> str: + value = (value or "").strip() + if not value: + raise ValueError("duration must be non-empty") + if value.lower() == "disabled": + raise ValueError('duration cannot be "disabled"') + # Simple format: integer + unit. + # Keep permissive for future; server-side logic still treats these as opaque. + import re + + if not re.match(r"^\d+\s*[smhd]$", value, flags=re.IGNORECASE): + raise ValueError("duration must match ^\\d+[smhd]$") + return value.replace(" ", "") + + +class AutoHeartbeatGovernorPolicyBase(BaseModel): + enabled: bool = Field( + default=True, + description="If false, the governor will not manage heartbeats for this board.", + ) + run_interval_seconds: int = Field( + default=300, + ge=30, + le=24 * 60 * 60, + description="Governor run cadence hint (seconds).", + ) + ladder: list[DurationStr] = Field( + default_factory=lambda: ["10m", "30m", "1h", "3h", "6h"], + description="Backoff ladder values (non-leads).", + ) + lead_cap_every: DurationStr = Field( + default="1h", + description="Max backoff interval for leads.", + ) + activity_trigger_type: ActivityTriggerType = Field( + default=ActivityTriggerType.B, + description="A = board chat only; B = board chat OR assigned work.", + ) + + @field_validator("ladder", mode="before") + @classmethod + def _normalize_ladder(cls, value: object) -> object: + # Accept comma-separated strings from UI forms. + if isinstance(value, str): + parts = [part.strip() for part in value.split(",")] + return [p for p in parts if p] + return value + + @field_validator("ladder") + @classmethod + def _validate_ladder(cls, ladder: list[str]) -> list[str]: + if not ladder: + raise ValueError("ladder must have at least one value") + normalized: list[str] = [] + for item in ladder: + normalized.append(_validate_duration(str(item))) + return normalized + + @field_validator("lead_cap_every") + @classmethod + def _validate_lead_cap(cls, value: str) -> str: + return _validate_duration(value) + + +class AutoHeartbeatGovernorPolicyRead(AutoHeartbeatGovernorPolicyBase): + """Read model for board-scoped governor policy.""" + + +class AutoHeartbeatGovernorPolicyUpdate(BaseModel): + """Patch model for board-scoped governor policy.""" + + enabled: bool | None = None + run_interval_seconds: int | None = Field(default=None, ge=30, le=24 * 60 * 60) + ladder: list[DurationStr] | str | None = None + lead_cap_every: DurationStr | None = None + activity_trigger_type: ActivityTriggerType | None = None + + @field_validator("ladder", mode="before") + @classmethod + def _normalize_ladder(cls, value: object) -> object: + if value is None: + return None + if isinstance(value, str): + parts = [part.strip() for part in value.split(",")] + return [p for p in parts if p] + return value + + @field_validator("ladder") + @classmethod + def _validate_ladder(cls, ladder: list[str] | None) -> list[str] | None: + if ladder is None: + return None + if not ladder: + raise ValueError("ladder must have at least one value") + return [_validate_duration(str(item)) for item in ladder] + + @field_validator("lead_cap_every") + @classmethod + def _validate_lead_cap(cls, value: str | None) -> str | None: + if value is None: + return None + return _validate_duration(value) diff --git a/backend/app/services/auto_heartbeat_governor.py b/backend/app/services/auto_heartbeat_governor.py index fcf6b2dc..b0d64ce5 100644 --- a/backend/app/services/auto_heartbeat_governor.py +++ b/backend/app/services/auto_heartbeat_governor.py @@ -30,6 +30,7 @@ from app.core.time import utcnow from app.db.session import async_session_maker from app.models.agents import Agent from app.models.board_memory import BoardMemory +from app.models.boards import Board from app.models.gateways import Gateway from app.models.tasks import Task from app.services.openclaw.internal.agent_key import agent_key as _agent_key @@ -41,10 +42,10 @@ from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConf logger = get_logger(__name__) -# Governor cadence + behaviour. -ACTIVE_EVERY = "5m" -LADDER: list[str] = ["10m", "30m", "1h", "3h", "6h"] -LEAD_CAP_EVERY = "1h" +# Governor cadence + behaviour (defaults; may be overridden by board policy). +DEFAULT_ACTIVE_EVERY = "5m" +DEFAULT_LADDER: list[str] = ["10m", "30m", "1h", "3h", "6h"] +DEFAULT_LEAD_CAP_EVERY = "1h" ACTIVE_WINDOW = timedelta(minutes=60) # Postgres advisory lock key (2x int32). Keep stable. @@ -72,6 +73,9 @@ def compute_desired_heartbeat( is_lead: bool, active: bool, step: int, + active_every: str = DEFAULT_ACTIVE_EVERY, + ladder: list[str] | None = None, + lead_cap_every: str = DEFAULT_LEAD_CAP_EVERY, ) -> DesiredHeartbeat: """Return desired heartbeat for an agent. @@ -81,30 +85,29 @@ def compute_desired_heartbeat( len(LADDER)+1 means off (for non-leads). """ + ladder = list(ladder or DEFAULT_LADDER) + if active: - return DesiredHeartbeat(every=ACTIVE_EVERY, step=0, off=False) + return DesiredHeartbeat(every=active_every, step=0, off=False) # If idle, advance one rung. next_step = max(1, int(step) + 1) if is_lead: - # Leads never go fully off; cap at 1h. + # Leads never go fully off; cap at lead_cap_every. if next_step <= 0: - next_every = ACTIVE_EVERY - elif next_step <= len(LADDER): - next_every = LADDER[next_step - 1] + next_every = active_every + elif next_step <= len(ladder): + next_every = ladder[next_step - 1] else: - next_every = LEAD_CAP_EVERY - # Enforce cap. - if next_every in ("3h", "6h"): - next_every = LEAD_CAP_EVERY - return DesiredHeartbeat(every=next_every, step=min(next_step, len(LADDER)), off=False) + next_every = lead_cap_every + return DesiredHeartbeat(every=next_every, step=min(next_step, len(ladder)), off=False) # Non-leads can go fully off after max backoff. - if next_step <= len(LADDER): - return DesiredHeartbeat(every=LADDER[next_step - 1], step=next_step, off=False) + if next_step <= len(ladder): + return DesiredHeartbeat(every=ladder[next_step - 1], step=next_step, off=False) - return DesiredHeartbeat(every=None, step=len(LADDER) + 1, off=True) + return DesiredHeartbeat(every=None, step=len(ladder) + 1, off=True) async def _acquire_lock(session) -> bool: @@ -207,6 +210,13 @@ async def run_governor_once() -> None: chat_by_board = await _latest_chat_by_board(session) has_work_by_agent = await _has_work_map(session) + # Load board policies referenced by these agents. + board_ids = {a.board_id for a in agents if a.board_id} + boards = ( + await session.exec(select(Board).where(col(Board.id).in_(board_ids))) + ).all() if board_ids else [] + board_by_id = {b.id: b for b in boards} + # Load gateways referenced by these agents. gateway_ids = {a.gateway_id for a in agents if a.gateway_id} gateways = ( @@ -225,16 +235,45 @@ async def run_governor_once() -> None: if gateway is None or not gateway.url or not gateway.workspace_root: continue + board = board_by_id.get(agent.board_id) if agent.board_id else None + if board is not None and not bool(board.auto_heartbeat_governor_enabled): + continue + last_chat_at = None if agent.board_id: last_chat_at = chat_by_board.get(agent.board_id) - has_work = has_work_by_agent.get(agent.id, False) - active = _is_active(now=now, last_chat_at=last_chat_at, has_work=has_work) + has_work = has_work_by_agent.get(agent.id, False) + trigger = ( + str(getattr(board, "auto_heartbeat_governor_activity_trigger_type", "B")) + if board is not None + else "B" + ) + active = _is_active( + now=now, + last_chat_at=last_chat_at, + has_work=(has_work if trigger != "A" else False), + ) + + ladder = ( + list(getattr(board, "auto_heartbeat_governor_ladder", None) or []) + if board is not None + else None + ) + if not ladder: + ladder = None + lead_cap = ( + str(getattr(board, "auto_heartbeat_governor_lead_cap_every", DEFAULT_LEAD_CAP_EVERY)) + if board is not None + else DEFAULT_LEAD_CAP_EVERY + ) + desired = compute_desired_heartbeat( is_lead=bool(agent.is_board_lead), active=active, step=int(agent.auto_heartbeat_step or 0), + ladder=ladder, + lead_cap_every=lead_cap, ) # Determine if we need to update DB state. diff --git a/backend/migrations/versions/1c3a2b7d9e10_add_board_auto_heartbeat_governor_policy.py b/backend/migrations/versions/1c3a2b7d9e10_add_board_auto_heartbeat_governor_policy.py new file mode 100644 index 00000000..e8b4d0d7 --- /dev/null +++ b/backend/migrations/versions/1c3a2b7d9e10_add_board_auto_heartbeat_governor_policy.py @@ -0,0 +1,74 @@ +"""add board auto heartbeat governor policy + +Revision ID: 1c3a2b7d9e10 +Revises: f7d54a8c5098 +Create Date: 2026-02-23 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "1c3a2b7d9e10" +down_revision = "f7d54a8c5098" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "boards", + sa.Column( + "auto_heartbeat_governor_enabled", + sa.Boolean(), + nullable=False, + server_default=sa.text("true"), + ), + ) + op.add_column( + "boards", + sa.Column( + "auto_heartbeat_governor_run_interval_seconds", + sa.Integer(), + nullable=False, + server_default="300", + ), + ) + op.add_column( + "boards", + sa.Column( + "auto_heartbeat_governor_ladder", + sa.JSON(), + nullable=False, + server_default=sa.text("'[\"10m\", \"30m\", \"1h\", \"3h\", \"6h\"]'"), + ), + ) + op.add_column( + "boards", + sa.Column( + "auto_heartbeat_governor_lead_cap_every", + sa.String(), + nullable=False, + server_default="1h", + ), + ) + op.add_column( + "boards", + sa.Column( + "auto_heartbeat_governor_activity_trigger_type", + sa.String(), + nullable=False, + server_default="B", + ), + ) + + +def downgrade() -> None: + op.drop_column("boards", "auto_heartbeat_governor_activity_trigger_type") + op.drop_column("boards", "auto_heartbeat_governor_lead_cap_every") + op.drop_column("boards", "auto_heartbeat_governor_ladder") + op.drop_column("boards", "auto_heartbeat_governor_run_interval_seconds") + op.drop_column("boards", "auto_heartbeat_governor_enabled")