feat(governor): board-scoped auto heartbeat policy endpoints

(cherry picked from commit 3f0475e6e4)
This commit is contained in:
DevBot
2026-02-23 11:51:00 +00:00
committed by Abhimanyu Saharan
parent 02b1709f3b
commit 1047a28f3c
6 changed files with 341 additions and 19 deletions

View File

@@ -28,6 +28,10 @@ from app.models.agents import Agent
from app.models.board_groups import BoardGroup
from app.models.boards import Board
from app.models.gateways import Gateway
from app.schemas.auto_heartbeat_governor import (
AutoHeartbeatGovernorPolicyRead,
AutoHeartbeatGovernorPolicyUpdate,
)
from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
from app.schemas.common import OkResponse
from app.schemas.pagination import DefaultLimitOffsetPage
@@ -498,6 +502,52 @@ def get_board(
return board
@router.get(
"/{board_id}/auto-heartbeat-governor-policy",
response_model=AutoHeartbeatGovernorPolicyRead,
)
def get_auto_heartbeat_governor_policy(
board: Board = BOARD_USER_READ_DEP,
) -> AutoHeartbeatGovernorPolicyRead:
"""Get board-scoped auto heartbeat governor policy."""
return AutoHeartbeatGovernorPolicyRead(
enabled=bool(board.auto_heartbeat_governor_enabled),
run_interval_seconds=int(board.auto_heartbeat_governor_run_interval_seconds),
ladder=list(board.auto_heartbeat_governor_ladder or []),
lead_cap_every=str(board.auto_heartbeat_governor_lead_cap_every),
activity_trigger_type=str(board.auto_heartbeat_governor_activity_trigger_type),
)
@router.patch(
"/{board_id}/auto-heartbeat-governor-policy",
response_model=AutoHeartbeatGovernorPolicyRead,
)
async def update_auto_heartbeat_governor_policy(
payload: AutoHeartbeatGovernorPolicyUpdate,
session: AsyncSession = SESSION_DEP,
board: Board = BOARD_USER_WRITE_DEP,
) -> AutoHeartbeatGovernorPolicyRead:
"""Patch board-scoped auto heartbeat governor policy."""
updates = payload.model_dump(exclude_unset=True)
if "enabled" in updates:
board.auto_heartbeat_governor_enabled = bool(updates["enabled"])
if "run_interval_seconds" in updates:
board.auto_heartbeat_governor_run_interval_seconds = int(updates["run_interval_seconds"])
if "ladder" in updates:
board.auto_heartbeat_governor_ladder = list(updates["ladder"])
if "lead_cap_every" in updates:
board.auto_heartbeat_governor_lead_cap_every = str(updates["lead_cap_every"])
if "activity_trigger_type" in updates:
trigger = updates["activity_trigger_type"]
board.auto_heartbeat_governor_activity_trigger_type = (
trigger.value if hasattr(trigger, "value") else str(trigger)
)
board.updated_at = utcnow()
await crud.save(session, board)
return get_auto_heartbeat_governor_policy(board)
@router.get("/{board_id}/snapshot", response_model=BoardSnapshot)
async def get_board_snapshot(
board: Board = BOARD_ACTOR_READ_DEP,

View File

@@ -45,5 +45,16 @@ class Board(TenantScoped, table=True):
block_status_changes_with_pending_approval: bool = Field(default=False)
only_lead_can_change_status: bool = Field(default=False)
max_agents: int = Field(default=1)
# Auto heartbeat governor policy (board-scoped).
auto_heartbeat_governor_enabled: bool = Field(default=True)
auto_heartbeat_governor_run_interval_seconds: int = Field(default=300)
auto_heartbeat_governor_ladder: list[str] = Field(
default_factory=lambda: ["10m", "30m", "1h", "3h", "6h"],
sa_column=Column(JSON),
)
auto_heartbeat_governor_lead_cap_every: str = Field(default="1h")
auto_heartbeat_governor_activity_trigger_type: str = Field(default="B")
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)

View File

@@ -175,6 +175,10 @@ class AgentUpdate(SQLModel):
description="Optional heartbeat policy override.",
examples=[{"interval_seconds": 45}],
)
auto_heartbeat_enabled: bool | None = Field(
default=None,
description="If false, Mission Control's auto heartbeat governor will not manage this agent.",
)
identity_profile: dict[str, Any] | None = Field(
default=None,
description="Optional identity profile update values.",
@@ -236,6 +240,22 @@ class AgentRead(AgentBase):
default=False,
description="Whether this agent is the primary gateway agent.",
)
auto_heartbeat_enabled: bool = Field(
default=True,
description="Whether Mission Control's auto heartbeat governor is allowed to manage this agent.",
)
auto_heartbeat_step: int = Field(
default=0,
description="Current backoff ladder step maintained by the governor.",
)
auto_heartbeat_off: bool = Field(
default=False,
description="Whether the governor has currently set this agent to fully-off (unset heartbeat).",
)
auto_heartbeat_last_active_at: datetime | None = Field(
default=None,
description="Last time the governor considered this agent active.",
)
openclaw_session_id: str | None = Field(
default=None,
description="Optional openclaw session token.",

View File

@@ -0,0 +1,128 @@
"""Schemas for auto heartbeat governor policy configuration."""
from __future__ import annotations
from enum import Enum
from typing import Annotated
from pydantic import BaseModel, Field, field_validator
class ActivityTriggerType(str, Enum):
"""Which events count as 'activity' for resetting the backoff ladder."""
A = "A" # board chat only
B = "B" # board chat OR has_work (assigned in-progress/review)
DurationStr = Annotated[
str,
Field(
description="Duration string like 30s, 5m, 1h, 1d (no disabled).",
examples=["10m", "1h"],
),
]
def _validate_duration(value: str) -> str:
value = (value or "").strip()
if not value:
raise ValueError("duration must be non-empty")
if value.lower() == "disabled":
raise ValueError('duration cannot be "disabled"')
# Simple format: integer + unit.
# Keep permissive for future; server-side logic still treats these as opaque.
import re
if not re.match(r"^\d+\s*[smhd]$", value, flags=re.IGNORECASE):
raise ValueError("duration must match ^\\d+[smhd]$")
return value.replace(" ", "")
class AutoHeartbeatGovernorPolicyBase(BaseModel):
enabled: bool = Field(
default=True,
description="If false, the governor will not manage heartbeats for this board.",
)
run_interval_seconds: int = Field(
default=300,
ge=30,
le=24 * 60 * 60,
description="Governor run cadence hint (seconds).",
)
ladder: list[DurationStr] = Field(
default_factory=lambda: ["10m", "30m", "1h", "3h", "6h"],
description="Backoff ladder values (non-leads).",
)
lead_cap_every: DurationStr = Field(
default="1h",
description="Max backoff interval for leads.",
)
activity_trigger_type: ActivityTriggerType = Field(
default=ActivityTriggerType.B,
description="A = board chat only; B = board chat OR assigned work.",
)
@field_validator("ladder", mode="before")
@classmethod
def _normalize_ladder(cls, value: object) -> object:
# Accept comma-separated strings from UI forms.
if isinstance(value, str):
parts = [part.strip() for part in value.split(",")]
return [p for p in parts if p]
return value
@field_validator("ladder")
@classmethod
def _validate_ladder(cls, ladder: list[str]) -> list[str]:
if not ladder:
raise ValueError("ladder must have at least one value")
normalized: list[str] = []
for item in ladder:
normalized.append(_validate_duration(str(item)))
return normalized
@field_validator("lead_cap_every")
@classmethod
def _validate_lead_cap(cls, value: str) -> str:
return _validate_duration(value)
class AutoHeartbeatGovernorPolicyRead(AutoHeartbeatGovernorPolicyBase):
"""Read model for board-scoped governor policy."""
class AutoHeartbeatGovernorPolicyUpdate(BaseModel):
"""Patch model for board-scoped governor policy."""
enabled: bool | None = None
run_interval_seconds: int | None = Field(default=None, ge=30, le=24 * 60 * 60)
ladder: list[DurationStr] | str | None = None
lead_cap_every: DurationStr | None = None
activity_trigger_type: ActivityTriggerType | None = None
@field_validator("ladder", mode="before")
@classmethod
def _normalize_ladder(cls, value: object) -> object:
if value is None:
return None
if isinstance(value, str):
parts = [part.strip() for part in value.split(",")]
return [p for p in parts if p]
return value
@field_validator("ladder")
@classmethod
def _validate_ladder(cls, ladder: list[str] | None) -> list[str] | None:
if ladder is None:
return None
if not ladder:
raise ValueError("ladder must have at least one value")
return [_validate_duration(str(item)) for item in ladder]
@field_validator("lead_cap_every")
@classmethod
def _validate_lead_cap(cls, value: str | None) -> str | None:
if value is None:
return None
return _validate_duration(value)

View File

@@ -30,6 +30,7 @@ from app.core.time import utcnow
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.board_memory import BoardMemory
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.tasks import Task
from app.services.openclaw.internal.agent_key import agent_key as _agent_key
@@ -41,10 +42,10 @@ from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConf
logger = get_logger(__name__)
# Governor cadence + behaviour.
ACTIVE_EVERY = "5m"
LADDER: list[str] = ["10m", "30m", "1h", "3h", "6h"]
LEAD_CAP_EVERY = "1h"
# Governor cadence + behaviour (defaults; may be overridden by board policy).
DEFAULT_ACTIVE_EVERY = "5m"
DEFAULT_LADDER: list[str] = ["10m", "30m", "1h", "3h", "6h"]
DEFAULT_LEAD_CAP_EVERY = "1h"
ACTIVE_WINDOW = timedelta(minutes=60)
# Postgres advisory lock key (2x int32). Keep stable.
@@ -72,6 +73,9 @@ def compute_desired_heartbeat(
is_lead: bool,
active: bool,
step: int,
active_every: str = DEFAULT_ACTIVE_EVERY,
ladder: list[str] | None = None,
lead_cap_every: str = DEFAULT_LEAD_CAP_EVERY,
) -> DesiredHeartbeat:
"""Return desired heartbeat for an agent.
@@ -81,30 +85,29 @@ def compute_desired_heartbeat(
len(LADDER)+1 means off (for non-leads).
"""
ladder = list(ladder or DEFAULT_LADDER)
if active:
return DesiredHeartbeat(every=ACTIVE_EVERY, step=0, off=False)
return DesiredHeartbeat(every=active_every, step=0, off=False)
# If idle, advance one rung.
next_step = max(1, int(step) + 1)
if is_lead:
# Leads never go fully off; cap at 1h.
# Leads never go fully off; cap at lead_cap_every.
if next_step <= 0:
next_every = ACTIVE_EVERY
elif next_step <= len(LADDER):
next_every = LADDER[next_step - 1]
next_every = active_every
elif next_step <= len(ladder):
next_every = ladder[next_step - 1]
else:
next_every = LEAD_CAP_EVERY
# Enforce cap.
if next_every in ("3h", "6h"):
next_every = LEAD_CAP_EVERY
return DesiredHeartbeat(every=next_every, step=min(next_step, len(LADDER)), off=False)
next_every = lead_cap_every
return DesiredHeartbeat(every=next_every, step=min(next_step, len(ladder)), off=False)
# Non-leads can go fully off after max backoff.
if next_step <= len(LADDER):
return DesiredHeartbeat(every=LADDER[next_step - 1], step=next_step, off=False)
if next_step <= len(ladder):
return DesiredHeartbeat(every=ladder[next_step - 1], step=next_step, off=False)
return DesiredHeartbeat(every=None, step=len(LADDER) + 1, off=True)
return DesiredHeartbeat(every=None, step=len(ladder) + 1, off=True)
async def _acquire_lock(session) -> bool:
@@ -207,6 +210,13 @@ async def run_governor_once() -> None:
chat_by_board = await _latest_chat_by_board(session)
has_work_by_agent = await _has_work_map(session)
# Load board policies referenced by these agents.
board_ids = {a.board_id for a in agents if a.board_id}
boards = (
await session.exec(select(Board).where(col(Board.id).in_(board_ids)))
).all() if board_ids else []
board_by_id = {b.id: b for b in boards}
# Load gateways referenced by these agents.
gateway_ids = {a.gateway_id for a in agents if a.gateway_id}
gateways = (
@@ -225,16 +235,45 @@ async def run_governor_once() -> None:
if gateway is None or not gateway.url or not gateway.workspace_root:
continue
board = board_by_id.get(agent.board_id) if agent.board_id else None
if board is not None and not bool(board.auto_heartbeat_governor_enabled):
continue
last_chat_at = None
if agent.board_id:
last_chat_at = chat_by_board.get(agent.board_id)
has_work = has_work_by_agent.get(agent.id, False)
active = _is_active(now=now, last_chat_at=last_chat_at, has_work=has_work)
has_work = has_work_by_agent.get(agent.id, False)
trigger = (
str(getattr(board, "auto_heartbeat_governor_activity_trigger_type", "B"))
if board is not None
else "B"
)
active = _is_active(
now=now,
last_chat_at=last_chat_at,
has_work=(has_work if trigger != "A" else False),
)
ladder = (
list(getattr(board, "auto_heartbeat_governor_ladder", None) or [])
if board is not None
else None
)
if not ladder:
ladder = None
lead_cap = (
str(getattr(board, "auto_heartbeat_governor_lead_cap_every", DEFAULT_LEAD_CAP_EVERY))
if board is not None
else DEFAULT_LEAD_CAP_EVERY
)
desired = compute_desired_heartbeat(
is_lead=bool(agent.is_board_lead),
active=active,
step=int(agent.auto_heartbeat_step or 0),
ladder=ladder,
lead_cap_every=lead_cap,
)
# Determine if we need to update DB state.

View File

@@ -0,0 +1,74 @@
"""add board auto heartbeat governor policy
Revision ID: 1c3a2b7d9e10
Revises: f7d54a8c5098
Create Date: 2026-02-23
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "1c3a2b7d9e10"
down_revision = "f7d54a8c5098"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"boards",
sa.Column(
"auto_heartbeat_governor_enabled",
sa.Boolean(),
nullable=False,
server_default=sa.text("true"),
),
)
op.add_column(
"boards",
sa.Column(
"auto_heartbeat_governor_run_interval_seconds",
sa.Integer(),
nullable=False,
server_default="300",
),
)
op.add_column(
"boards",
sa.Column(
"auto_heartbeat_governor_ladder",
sa.JSON(),
nullable=False,
server_default=sa.text("'[\"10m\", \"30m\", \"1h\", \"3h\", \"6h\"]'"),
),
)
op.add_column(
"boards",
sa.Column(
"auto_heartbeat_governor_lead_cap_every",
sa.String(),
nullable=False,
server_default="1h",
),
)
op.add_column(
"boards",
sa.Column(
"auto_heartbeat_governor_activity_trigger_type",
sa.String(),
nullable=False,
server_default="B",
),
)
def downgrade() -> None:
op.drop_column("boards", "auto_heartbeat_governor_activity_trigger_type")
op.drop_column("boards", "auto_heartbeat_governor_lead_cap_every")
op.drop_column("boards", "auto_heartbeat_governor_ladder")
op.drop_column("boards", "auto_heartbeat_governor_run_interval_seconds")
op.drop_column("boards", "auto_heartbeat_governor_enabled")