feat(governor): board-scoped auto heartbeat policy endpoints
This commit is contained in:
@@ -26,6 +26,10 @@ from app.models.agents import Agent
|
||||
from app.models.board_groups import BoardGroup
|
||||
from app.models.boards import Board
|
||||
from app.models.gateways import Gateway
|
||||
from app.schemas.auto_heartbeat_governor import (
|
||||
AutoHeartbeatGovernorPolicyRead,
|
||||
AutoHeartbeatGovernorPolicyUpdate,
|
||||
)
|
||||
from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
|
||||
from app.schemas.common import OkResponse
|
||||
from app.schemas.pagination import DefaultLimitOffsetPage
|
||||
@@ -408,6 +412,52 @@ def get_board(
|
||||
return board
|
||||
|
||||
|
||||
@router.get(
|
||||
"/{board_id}/auto-heartbeat-governor-policy",
|
||||
response_model=AutoHeartbeatGovernorPolicyRead,
|
||||
)
|
||||
def get_auto_heartbeat_governor_policy(
|
||||
board: Board = BOARD_USER_READ_DEP,
|
||||
) -> AutoHeartbeatGovernorPolicyRead:
|
||||
"""Get board-scoped auto heartbeat governor policy."""
|
||||
return AutoHeartbeatGovernorPolicyRead(
|
||||
enabled=bool(board.auto_heartbeat_governor_enabled),
|
||||
run_interval_seconds=int(board.auto_heartbeat_governor_run_interval_seconds),
|
||||
ladder=list(board.auto_heartbeat_governor_ladder or []),
|
||||
lead_cap_every=str(board.auto_heartbeat_governor_lead_cap_every),
|
||||
activity_trigger_type=str(board.auto_heartbeat_governor_activity_trigger_type),
|
||||
)
|
||||
|
||||
|
||||
@router.patch(
|
||||
"/{board_id}/auto-heartbeat-governor-policy",
|
||||
response_model=AutoHeartbeatGovernorPolicyRead,
|
||||
)
|
||||
async def update_auto_heartbeat_governor_policy(
|
||||
payload: AutoHeartbeatGovernorPolicyUpdate,
|
||||
session: AsyncSession = SESSION_DEP,
|
||||
board: Board = BOARD_USER_WRITE_DEP,
|
||||
) -> AutoHeartbeatGovernorPolicyRead:
|
||||
"""Patch board-scoped auto heartbeat governor policy."""
|
||||
updates = payload.model_dump(exclude_unset=True)
|
||||
if "enabled" in updates:
|
||||
board.auto_heartbeat_governor_enabled = bool(updates["enabled"])
|
||||
if "run_interval_seconds" in updates:
|
||||
board.auto_heartbeat_governor_run_interval_seconds = int(updates["run_interval_seconds"])
|
||||
if "ladder" in updates:
|
||||
board.auto_heartbeat_governor_ladder = list(updates["ladder"])
|
||||
if "lead_cap_every" in updates:
|
||||
board.auto_heartbeat_governor_lead_cap_every = str(updates["lead_cap_every"])
|
||||
if "activity_trigger_type" in updates:
|
||||
trigger = updates["activity_trigger_type"]
|
||||
board.auto_heartbeat_governor_activity_trigger_type = (
|
||||
trigger.value if hasattr(trigger, "value") else str(trigger)
|
||||
)
|
||||
board.updated_at = utcnow()
|
||||
await crud.save(session, board)
|
||||
return get_auto_heartbeat_governor_policy(board)
|
||||
|
||||
|
||||
@router.get("/{board_id}/snapshot", response_model=BoardSnapshot)
|
||||
async def get_board_snapshot(
|
||||
board: Board = BOARD_ACTOR_READ_DEP,
|
||||
|
||||
@@ -44,5 +44,16 @@ class Board(TenantScoped, table=True):
|
||||
block_status_changes_with_pending_approval: bool = Field(default=False)
|
||||
only_lead_can_change_status: bool = Field(default=False)
|
||||
max_agents: int = Field(default=1)
|
||||
|
||||
# Auto heartbeat governor policy (board-scoped).
|
||||
auto_heartbeat_governor_enabled: bool = Field(default=True)
|
||||
auto_heartbeat_governor_run_interval_seconds: int = Field(default=300)
|
||||
auto_heartbeat_governor_ladder: list[str] = Field(
|
||||
default_factory=lambda: ["10m", "30m", "1h", "3h", "6h"],
|
||||
sa_column=Column(JSON),
|
||||
)
|
||||
auto_heartbeat_governor_lead_cap_every: str = Field(default="1h")
|
||||
auto_heartbeat_governor_activity_trigger_type: str = Field(default="B")
|
||||
|
||||
created_at: datetime = Field(default_factory=utcnow)
|
||||
updated_at: datetime = Field(default_factory=utcnow)
|
||||
|
||||
@@ -175,6 +175,10 @@ class AgentUpdate(SQLModel):
|
||||
description="Optional heartbeat policy override.",
|
||||
examples=[{"interval_seconds": 45}],
|
||||
)
|
||||
auto_heartbeat_enabled: bool | None = Field(
|
||||
default=None,
|
||||
description="If false, Mission Control's auto heartbeat governor will not manage this agent.",
|
||||
)
|
||||
identity_profile: dict[str, Any] | None = Field(
|
||||
default=None,
|
||||
description="Optional identity profile update values.",
|
||||
@@ -236,6 +240,22 @@ class AgentRead(AgentBase):
|
||||
default=False,
|
||||
description="Whether this agent is the primary gateway agent.",
|
||||
)
|
||||
auto_heartbeat_enabled: bool = Field(
|
||||
default=True,
|
||||
description="Whether Mission Control's auto heartbeat governor is allowed to manage this agent.",
|
||||
)
|
||||
auto_heartbeat_step: int = Field(
|
||||
default=0,
|
||||
description="Current backoff ladder step maintained by the governor.",
|
||||
)
|
||||
auto_heartbeat_off: bool = Field(
|
||||
default=False,
|
||||
description="Whether the governor has currently set this agent to fully-off (unset heartbeat).",
|
||||
)
|
||||
auto_heartbeat_last_active_at: datetime | None = Field(
|
||||
default=None,
|
||||
description="Last time the governor considered this agent active.",
|
||||
)
|
||||
openclaw_session_id: str | None = Field(
|
||||
default=None,
|
||||
description="Optional openclaw session token.",
|
||||
|
||||
128
backend/app/schemas/auto_heartbeat_governor.py
Normal file
128
backend/app/schemas/auto_heartbeat_governor.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""Schemas for auto heartbeat governor policy configuration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import Annotated
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
|
||||
class ActivityTriggerType(str, Enum):
|
||||
"""Which events count as 'activity' for resetting the backoff ladder."""
|
||||
|
||||
A = "A" # board chat only
|
||||
B = "B" # board chat OR has_work (assigned in-progress/review)
|
||||
|
||||
|
||||
DurationStr = Annotated[
|
||||
str,
|
||||
Field(
|
||||
description="Duration string like 30s, 5m, 1h, 1d (no disabled).",
|
||||
examples=["10m", "1h"],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def _validate_duration(value: str) -> str:
|
||||
value = (value or "").strip()
|
||||
if not value:
|
||||
raise ValueError("duration must be non-empty")
|
||||
if value.lower() == "disabled":
|
||||
raise ValueError('duration cannot be "disabled"')
|
||||
# Simple format: integer + unit.
|
||||
# Keep permissive for future; server-side logic still treats these as opaque.
|
||||
import re
|
||||
|
||||
if not re.match(r"^\d+\s*[smhd]$", value, flags=re.IGNORECASE):
|
||||
raise ValueError("duration must match ^\\d+[smhd]$")
|
||||
return value.replace(" ", "")
|
||||
|
||||
|
||||
class AutoHeartbeatGovernorPolicyBase(BaseModel):
|
||||
enabled: bool = Field(
|
||||
default=True,
|
||||
description="If false, the governor will not manage heartbeats for this board.",
|
||||
)
|
||||
run_interval_seconds: int = Field(
|
||||
default=300,
|
||||
ge=30,
|
||||
le=24 * 60 * 60,
|
||||
description="Governor run cadence hint (seconds).",
|
||||
)
|
||||
ladder: list[DurationStr] = Field(
|
||||
default_factory=lambda: ["10m", "30m", "1h", "3h", "6h"],
|
||||
description="Backoff ladder values (non-leads).",
|
||||
)
|
||||
lead_cap_every: DurationStr = Field(
|
||||
default="1h",
|
||||
description="Max backoff interval for leads.",
|
||||
)
|
||||
activity_trigger_type: ActivityTriggerType = Field(
|
||||
default=ActivityTriggerType.B,
|
||||
description="A = board chat only; B = board chat OR assigned work.",
|
||||
)
|
||||
|
||||
@field_validator("ladder", mode="before")
|
||||
@classmethod
|
||||
def _normalize_ladder(cls, value: object) -> object:
|
||||
# Accept comma-separated strings from UI forms.
|
||||
if isinstance(value, str):
|
||||
parts = [part.strip() for part in value.split(",")]
|
||||
return [p for p in parts if p]
|
||||
return value
|
||||
|
||||
@field_validator("ladder")
|
||||
@classmethod
|
||||
def _validate_ladder(cls, ladder: list[str]) -> list[str]:
|
||||
if not ladder:
|
||||
raise ValueError("ladder must have at least one value")
|
||||
normalized: list[str] = []
|
||||
for item in ladder:
|
||||
normalized.append(_validate_duration(str(item)))
|
||||
return normalized
|
||||
|
||||
@field_validator("lead_cap_every")
|
||||
@classmethod
|
||||
def _validate_lead_cap(cls, value: str) -> str:
|
||||
return _validate_duration(value)
|
||||
|
||||
|
||||
class AutoHeartbeatGovernorPolicyRead(AutoHeartbeatGovernorPolicyBase):
|
||||
"""Read model for board-scoped governor policy."""
|
||||
|
||||
|
||||
class AutoHeartbeatGovernorPolicyUpdate(BaseModel):
|
||||
"""Patch model for board-scoped governor policy."""
|
||||
|
||||
enabled: bool | None = None
|
||||
run_interval_seconds: int | None = Field(default=None, ge=30, le=24 * 60 * 60)
|
||||
ladder: list[DurationStr] | str | None = None
|
||||
lead_cap_every: DurationStr | None = None
|
||||
activity_trigger_type: ActivityTriggerType | None = None
|
||||
|
||||
@field_validator("ladder", mode="before")
|
||||
@classmethod
|
||||
def _normalize_ladder(cls, value: object) -> object:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
parts = [part.strip() for part in value.split(",")]
|
||||
return [p for p in parts if p]
|
||||
return value
|
||||
|
||||
@field_validator("ladder")
|
||||
@classmethod
|
||||
def _validate_ladder(cls, ladder: list[str] | None) -> list[str] | None:
|
||||
if ladder is None:
|
||||
return None
|
||||
if not ladder:
|
||||
raise ValueError("ladder must have at least one value")
|
||||
return [_validate_duration(str(item)) for item in ladder]
|
||||
|
||||
@field_validator("lead_cap_every")
|
||||
@classmethod
|
||||
def _validate_lead_cap(cls, value: str | None) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
return _validate_duration(value)
|
||||
@@ -30,6 +30,7 @@ from app.core.time import utcnow
|
||||
from app.db.session import async_session_maker
|
||||
from app.models.agents import Agent
|
||||
from app.models.board_memory import BoardMemory
|
||||
from app.models.boards import Board
|
||||
from app.models.gateways import Gateway
|
||||
from app.models.tasks import Task
|
||||
from app.services.openclaw.internal.agent_key import agent_key as _agent_key
|
||||
@@ -41,10 +42,10 @@ from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConf
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Governor cadence + behaviour.
|
||||
ACTIVE_EVERY = "5m"
|
||||
LADDER: list[str] = ["10m", "30m", "1h", "3h", "6h"]
|
||||
LEAD_CAP_EVERY = "1h"
|
||||
# Governor cadence + behaviour (defaults; may be overridden by board policy).
|
||||
DEFAULT_ACTIVE_EVERY = "5m"
|
||||
DEFAULT_LADDER: list[str] = ["10m", "30m", "1h", "3h", "6h"]
|
||||
DEFAULT_LEAD_CAP_EVERY = "1h"
|
||||
ACTIVE_WINDOW = timedelta(minutes=60)
|
||||
|
||||
# Postgres advisory lock key (2x int32). Keep stable.
|
||||
@@ -72,6 +73,9 @@ def compute_desired_heartbeat(
|
||||
is_lead: bool,
|
||||
active: bool,
|
||||
step: int,
|
||||
active_every: str = DEFAULT_ACTIVE_EVERY,
|
||||
ladder: list[str] | None = None,
|
||||
lead_cap_every: str = DEFAULT_LEAD_CAP_EVERY,
|
||||
) -> DesiredHeartbeat:
|
||||
"""Return desired heartbeat for an agent.
|
||||
|
||||
@@ -81,30 +85,29 @@ def compute_desired_heartbeat(
|
||||
len(LADDER)+1 means off (for non-leads).
|
||||
"""
|
||||
|
||||
ladder = list(ladder or DEFAULT_LADDER)
|
||||
|
||||
if active:
|
||||
return DesiredHeartbeat(every=ACTIVE_EVERY, step=0, off=False)
|
||||
return DesiredHeartbeat(every=active_every, step=0, off=False)
|
||||
|
||||
# If idle, advance one rung.
|
||||
next_step = max(1, int(step) + 1)
|
||||
|
||||
if is_lead:
|
||||
# Leads never go fully off; cap at 1h.
|
||||
# Leads never go fully off; cap at lead_cap_every.
|
||||
if next_step <= 0:
|
||||
next_every = ACTIVE_EVERY
|
||||
elif next_step <= len(LADDER):
|
||||
next_every = LADDER[next_step - 1]
|
||||
next_every = active_every
|
||||
elif next_step <= len(ladder):
|
||||
next_every = ladder[next_step - 1]
|
||||
else:
|
||||
next_every = LEAD_CAP_EVERY
|
||||
# Enforce cap.
|
||||
if next_every in ("3h", "6h"):
|
||||
next_every = LEAD_CAP_EVERY
|
||||
return DesiredHeartbeat(every=next_every, step=min(next_step, len(LADDER)), off=False)
|
||||
next_every = lead_cap_every
|
||||
return DesiredHeartbeat(every=next_every, step=min(next_step, len(ladder)), off=False)
|
||||
|
||||
# Non-leads can go fully off after max backoff.
|
||||
if next_step <= len(LADDER):
|
||||
return DesiredHeartbeat(every=LADDER[next_step - 1], step=next_step, off=False)
|
||||
if next_step <= len(ladder):
|
||||
return DesiredHeartbeat(every=ladder[next_step - 1], step=next_step, off=False)
|
||||
|
||||
return DesiredHeartbeat(every=None, step=len(LADDER) + 1, off=True)
|
||||
return DesiredHeartbeat(every=None, step=len(ladder) + 1, off=True)
|
||||
|
||||
|
||||
async def _acquire_lock(session) -> bool:
|
||||
@@ -207,6 +210,13 @@ async def run_governor_once() -> None:
|
||||
chat_by_board = await _latest_chat_by_board(session)
|
||||
has_work_by_agent = await _has_work_map(session)
|
||||
|
||||
# Load board policies referenced by these agents.
|
||||
board_ids = {a.board_id for a in agents if a.board_id}
|
||||
boards = (
|
||||
await session.exec(select(Board).where(col(Board.id).in_(board_ids)))
|
||||
).all() if board_ids else []
|
||||
board_by_id = {b.id: b for b in boards}
|
||||
|
||||
# Load gateways referenced by these agents.
|
||||
gateway_ids = {a.gateway_id for a in agents if a.gateway_id}
|
||||
gateways = (
|
||||
@@ -225,16 +235,45 @@ async def run_governor_once() -> None:
|
||||
if gateway is None or not gateway.url or not gateway.workspace_root:
|
||||
continue
|
||||
|
||||
board = board_by_id.get(agent.board_id) if agent.board_id else None
|
||||
if board is not None and not bool(board.auto_heartbeat_governor_enabled):
|
||||
continue
|
||||
|
||||
last_chat_at = None
|
||||
if agent.board_id:
|
||||
last_chat_at = chat_by_board.get(agent.board_id)
|
||||
has_work = has_work_by_agent.get(agent.id, False)
|
||||
|
||||
active = _is_active(now=now, last_chat_at=last_chat_at, has_work=has_work)
|
||||
has_work = has_work_by_agent.get(agent.id, False)
|
||||
trigger = (
|
||||
str(getattr(board, "auto_heartbeat_governor_activity_trigger_type", "B"))
|
||||
if board is not None
|
||||
else "B"
|
||||
)
|
||||
active = _is_active(
|
||||
now=now,
|
||||
last_chat_at=last_chat_at,
|
||||
has_work=(has_work if trigger != "A" else False),
|
||||
)
|
||||
|
||||
ladder = (
|
||||
list(getattr(board, "auto_heartbeat_governor_ladder", None) or [])
|
||||
if board is not None
|
||||
else None
|
||||
)
|
||||
if not ladder:
|
||||
ladder = None
|
||||
lead_cap = (
|
||||
str(getattr(board, "auto_heartbeat_governor_lead_cap_every", DEFAULT_LEAD_CAP_EVERY))
|
||||
if board is not None
|
||||
else DEFAULT_LEAD_CAP_EVERY
|
||||
)
|
||||
|
||||
desired = compute_desired_heartbeat(
|
||||
is_lead=bool(agent.is_board_lead),
|
||||
active=active,
|
||||
step=int(agent.auto_heartbeat_step or 0),
|
||||
ladder=ladder,
|
||||
lead_cap_every=lead_cap,
|
||||
)
|
||||
|
||||
# Determine if we need to update DB state.
|
||||
|
||||
@@ -0,0 +1,74 @@
|
||||
"""add board auto heartbeat governor policy
|
||||
|
||||
Revision ID: 1c3a2b7d9e10
|
||||
Revises: f7d54a8c5098
|
||||
Create Date: 2026-02-23
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "1c3a2b7d9e10"
|
||||
down_revision = "f7d54a8c5098"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"boards",
|
||||
sa.Column(
|
||||
"auto_heartbeat_governor_enabled",
|
||||
sa.Boolean(),
|
||||
nullable=False,
|
||||
server_default=sa.text("true"),
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"boards",
|
||||
sa.Column(
|
||||
"auto_heartbeat_governor_run_interval_seconds",
|
||||
sa.Integer(),
|
||||
nullable=False,
|
||||
server_default="300",
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"boards",
|
||||
sa.Column(
|
||||
"auto_heartbeat_governor_ladder",
|
||||
sa.JSON(),
|
||||
nullable=False,
|
||||
server_default=sa.text("'[\"10m\", \"30m\", \"1h\", \"3h\", \"6h\"]'"),
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"boards",
|
||||
sa.Column(
|
||||
"auto_heartbeat_governor_lead_cap_every",
|
||||
sa.String(),
|
||||
nullable=False,
|
||||
server_default="1h",
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"boards",
|
||||
sa.Column(
|
||||
"auto_heartbeat_governor_activity_trigger_type",
|
||||
sa.String(),
|
||||
nullable=False,
|
||||
server_default="B",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("boards", "auto_heartbeat_governor_activity_trigger_type")
|
||||
op.drop_column("boards", "auto_heartbeat_governor_lead_cap_every")
|
||||
op.drop_column("boards", "auto_heartbeat_governor_ladder")
|
||||
op.drop_column("boards", "auto_heartbeat_governor_run_interval_seconds")
|
||||
op.drop_column("boards", "auto_heartbeat_governor_enabled")
|
||||
Reference in New Issue
Block a user