Files
openclaw-mission-control/backend/app/api/agents.py
Abhimanyu Saharan b24e3e1dcd ref(backend): Centralize deps and add mypy
Extract reusable API dependencies and activity logging helpers.\nAdd mypy configuration and dev dependency for type checking.\n\nCo-Authored-By: Claude <noreply@anthropic.com>
2026-02-04 03:57:19 +05:30

259 lines
8.4 KiB
Python

from __future__ import annotations
import re
from datetime import datetime, timedelta
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from app.api.deps import require_admin_auth
from app.core.auth import AuthContext
from app.db.session import get_session
from app.integrations.openclaw_gateway import OpenClawGatewayError, openclaw_call
from app.models.agents import Agent
from app.schemas.agents import (
AgentCreate,
AgentHeartbeat,
AgentHeartbeatCreate,
AgentRead,
AgentUpdate,
)
from app.services.activity_log import record_activity
from app.services.agent_provisioning import send_provisioning_message
router = APIRouter(prefix="/agents", tags=["agents"])
OFFLINE_AFTER = timedelta(minutes=10)
AGENT_SESSION_PREFIX = "agent"
def _slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or uuid4().hex
def _build_session_key(agent_name: str) -> str:
return f"{AGENT_SESSION_PREFIX}:{_slugify(agent_name)}:main"
async def _ensure_gateway_session(agent_name: str) -> tuple[str, str | None]:
session_key = _build_session_key(agent_name)
try:
await openclaw_call("sessions.patch", {"key": session_key, "label": agent_name})
return session_key, None
except OpenClawGatewayError as exc:
return session_key, str(exc)
def _with_computed_status(agent: Agent) -> Agent:
now = datetime.utcnow()
if agent.last_seen_at and now - agent.last_seen_at > OFFLINE_AFTER:
agent.status = "offline"
return agent
def _record_heartbeat(session: Session, agent: Agent) -> None:
record_activity(
session,
event_type="agent.heartbeat",
message=f"Heartbeat received from {agent.name}.",
agent_id=agent.id,
)
def _record_provisioning_failure(session: Session, agent: Agent, error: str) -> None:
record_activity(
session,
event_type="agent.provision.failed",
message=f"Provisioning message failed: {error}",
agent_id=agent.id,
)
@router.get("", response_model=list[AgentRead])
def list_agents(
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> list[Agent]:
agents = list(session.exec(select(Agent)))
return [_with_computed_status(agent) for agent in agents]
@router.post("", response_model=AgentRead)
async def create_agent(
payload: AgentCreate,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> Agent:
agent = Agent.model_validate(payload)
session_key, session_error = await _ensure_gateway_session(agent.name)
agent.openclaw_session_id = session_key
session.add(agent)
session.commit()
session.refresh(agent)
if session_error:
record_activity(
session,
event_type="agent.session.failed",
message=f"Session sync failed for {agent.name}: {session_error}",
agent_id=agent.id,
)
else:
record_activity(
session,
event_type="agent.session.created",
message=f"Session created for {agent.name}.",
agent_id=agent.id,
)
session.commit()
try:
await send_provisioning_message(agent)
except OpenClawGatewayError as exc:
_record_provisioning_failure(session, agent, str(exc))
session.commit()
except Exception as exc: # pragma: no cover - unexpected provisioning errors
_record_provisioning_failure(session, agent, str(exc))
session.commit()
return agent
@router.get("/{agent_id}", response_model=AgentRead)
def get_agent(
agent_id: str,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> Agent:
agent = session.get(Agent, agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return _with_computed_status(agent)
@router.patch("/{agent_id}", response_model=AgentRead)
def update_agent(
agent_id: str,
payload: AgentUpdate,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> Agent:
agent = session.get(Agent, agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
updates = payload.model_dump(exclude_unset=True)
for key, value in updates.items():
setattr(agent, key, value)
agent.updated_at = datetime.utcnow()
session.add(agent)
session.commit()
session.refresh(agent)
return _with_computed_status(agent)
@router.post("/{agent_id}/heartbeat", response_model=AgentRead)
def heartbeat_agent(
agent_id: str,
payload: AgentHeartbeat,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> Agent:
agent = session.get(Agent, agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if payload.status:
agent.status = payload.status
agent.last_seen_at = datetime.utcnow()
agent.updated_at = datetime.utcnow()
_record_heartbeat(session, agent)
session.add(agent)
session.commit()
session.refresh(agent)
return _with_computed_status(agent)
@router.post("/heartbeat", response_model=AgentRead)
async def heartbeat_or_create_agent(
payload: AgentHeartbeatCreate,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> Agent:
agent = session.exec(select(Agent).where(Agent.name == payload.name)).first()
if agent is None:
agent = Agent(name=payload.name, status=payload.status or "online")
session_key, session_error = await _ensure_gateway_session(agent.name)
agent.openclaw_session_id = session_key
session.add(agent)
session.commit()
session.refresh(agent)
if session_error:
record_activity(
session,
event_type="agent.session.failed",
message=f"Session sync failed for {agent.name}: {session_error}",
agent_id=agent.id,
)
else:
record_activity(
session,
event_type="agent.session.created",
message=f"Session created for {agent.name}.",
agent_id=agent.id,
)
session.commit()
try:
await send_provisioning_message(agent)
except OpenClawGatewayError as exc:
_record_provisioning_failure(session, agent, str(exc))
session.commit()
except Exception as exc: # pragma: no cover - unexpected provisioning errors
_record_provisioning_failure(session, agent, str(exc))
session.commit()
elif not agent.openclaw_session_id:
session_key, session_error = await _ensure_gateway_session(agent.name)
agent.openclaw_session_id = session_key
if session_error:
record_activity(
session,
event_type="agent.session.failed",
message=f"Session sync failed for {agent.name}: {session_error}",
agent_id=agent.id,
)
else:
record_activity(
session,
event_type="agent.session.created",
message=f"Session created for {agent.name}.",
agent_id=agent.id,
)
session.commit()
try:
await send_provisioning_message(agent)
except OpenClawGatewayError as exc:
_record_provisioning_failure(session, agent, str(exc))
session.commit()
except Exception as exc: # pragma: no cover - unexpected provisioning errors
_record_provisioning_failure(session, agent, str(exc))
session.commit()
if payload.status:
agent.status = payload.status
agent.last_seen_at = datetime.utcnow()
agent.updated_at = datetime.utcnow()
_record_heartbeat(session, agent)
session.add(agent)
session.commit()
session.refresh(agent)
return _with_computed_status(agent)
@router.delete("/{agent_id}")
def delete_agent(
agent_id: str,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> dict[str, bool]:
agent = session.get(Agent, agent_id)
if agent:
session.delete(agent)
session.commit()
return {"ok": True}