feat: enhance agent management with session handling and UI improvements

This commit is contained in:
Abhimanyu Saharan
2026-02-04 14:58:14 +05:30
parent f6105fa0d2
commit a33c539860
16 changed files with 1181 additions and 472 deletions

View File

@@ -4,8 +4,7 @@ from fastapi import APIRouter, Depends, Query
from sqlalchemy import desc
from sqlmodel import Session, col, select
from app.api.deps import require_admin_auth
from app.core.auth import AuthContext
from app.api.deps import ActorContext, require_admin_or_agent
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.schemas.activity_events import ActivityEventRead
@@ -18,11 +17,13 @@ def list_activity(
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[ActivityEvent]:
statement = select(ActivityEvent)
if actor.actor_type == "agent" and actor.agent:
statement = statement.where(ActivityEvent.agent_id == actor.agent.id)
statement = (
select(ActivityEvent)
.order_by(desc(col(ActivityEvent.created_at)))
statement.order_by(desc(col(ActivityEvent.created_at)))
.offset(offset)
.limit(limit)
)

View File

@@ -5,13 +5,22 @@ from datetime import datetime, timedelta
from uuid import uuid4
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from sqlmodel import Session, col, select
from sqlalchemy import update
from app.api.deps import require_admin_auth
from app.api.deps import ActorContext, require_admin_auth, require_admin_or_agent
from app.core.agent_tokens import generate_agent_token, hash_agent_token
from app.core.auth import AuthContext
from app.core.config import settings
from app.db.session import get_session
from app.integrations.openclaw_gateway import OpenClawGatewayError, openclaw_call
from app.integrations.openclaw_gateway import (
OpenClawGatewayError,
delete_session,
ensure_session,
send_message,
)
from app.models.agents import Agent
from app.models.activity_events import ActivityEvent
from app.schemas.agents import (
AgentCreate,
AgentHeartbeat,
@@ -40,7 +49,7 @@ def _build_session_key(agent_name: str) -> str:
async def _ensure_gateway_session(agent_name: str) -> tuple[str, str | None]:
session_key = _build_session_key(agent_name)
try:
await openclaw_call("sessions.patch", {"key": session_key, "label": agent_name})
await ensure_session(session_key, label=agent_name)
return session_key, None
except OpenClawGatewayError as exc:
return session_key, str(exc)
@@ -87,6 +96,8 @@ async def create_agent(
auth: AuthContext = Depends(require_admin_auth),
) -> Agent:
agent = Agent.model_validate(payload)
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
session_key, session_error = await _ensure_gateway_session(agent.name)
agent.openclaw_session_id = session_key
session.add(agent)
@@ -108,7 +119,7 @@ async def create_agent(
)
session.commit()
try:
await send_provisioning_message(agent)
await send_provisioning_message(agent, raw_token)
except OpenClawGatewayError as exc:
_record_provisioning_failure(session, agent, str(exc))
session.commit()
@@ -155,11 +166,13 @@ def heartbeat_agent(
agent_id: str,
payload: AgentHeartbeat,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Agent:
agent = session.get(Agent, agent_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if actor.actor_type == "agent" and actor.agent and actor.agent.id != agent.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if payload.status:
agent.status = payload.status
agent.last_seen_at = datetime.utcnow()
@@ -175,11 +188,15 @@ def heartbeat_agent(
async def heartbeat_or_create_agent(
payload: AgentHeartbeatCreate,
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Agent:
agent = session.exec(select(Agent).where(Agent.name == payload.name)).first()
if agent is None:
if actor.actor_type == "agent":
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
agent = Agent(name=payload.name, status=payload.status or "online")
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
session_key, session_error = await _ensure_gateway_session(agent.name)
agent.openclaw_session_id = session_key
session.add(agent)
@@ -201,7 +218,23 @@ async def heartbeat_or_create_agent(
)
session.commit()
try:
await send_provisioning_message(agent)
await send_provisioning_message(agent, raw_token)
except OpenClawGatewayError as exc:
_record_provisioning_failure(session, agent, str(exc))
session.commit()
except Exception as exc: # pragma: no cover - unexpected provisioning errors
_record_provisioning_failure(session, agent, str(exc))
session.commit()
elif actor.actor_type == "agent" and actor.agent and actor.agent.id != agent.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
elif agent.agent_token_hash is None and actor.actor_type == "user":
raw_token = generate_agent_token()
agent.agent_token_hash = hash_agent_token(raw_token)
session.add(agent)
session.commit()
session.refresh(agent)
try:
await send_provisioning_message(agent, raw_token)
except OpenClawGatewayError as exc:
_record_provisioning_failure(session, agent, str(exc))
session.commit()
@@ -226,14 +259,6 @@ async def heartbeat_or_create_agent(
agent_id=agent.id,
)
session.commit()
try:
await send_provisioning_message(agent)
except OpenClawGatewayError as exc:
_record_provisioning_failure(session, agent, str(exc))
session.commit()
except Exception as exc: # pragma: no cover - unexpected provisioning errors
_record_provisioning_failure(session, agent, str(exc))
session.commit()
if payload.status:
agent.status = payload.status
agent.last_seen_at = datetime.utcnow()
@@ -253,6 +278,41 @@ def delete_agent(
) -> dict[str, bool]:
agent = session.get(Agent, agent_id)
if agent:
async def _gateway_cleanup() -> None:
if agent.openclaw_session_id:
await delete_session(agent.openclaw_session_id)
main_session = settings.openclaw_main_session_key
if main_session:
workspace_root = settings.openclaw_workspace_root or "~/.openclaw/workspaces"
workspace_path = f"{workspace_root.rstrip('/')}/{_slugify(agent.name)}"
cleanup_message = (
"Cleanup request for deleted agent.\n\n"
f"Agent name: {agent.name}\n"
f"Agent id: {agent.id}\n"
f"Session key: {agent.openclaw_session_id or _build_session_key(agent.name)}\n"
f"Workspace path: {workspace_path}\n\n"
"Actions:\n"
"1) Remove the workspace directory.\n"
"2) Delete any lingering session artifacts.\n"
"Reply NO_REPLY."
)
await ensure_session(main_session, label="Main Agent")
await send_message(cleanup_message, session_key=main_session, deliver=False)
try:
import asyncio
asyncio.run(_gateway_cleanup())
except OpenClawGatewayError as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway cleanup failed: {exc}",
) from exc
session.execute(
update(ActivityEvent)
.where(col(ActivityEvent.agent_id) == agent.id)
.values(agent_id=None)
)
session.delete(agent)
session.commit()
return {"ok": True}

View File

@@ -3,7 +3,12 @@ from __future__ import annotations
from fastapi import APIRouter, Depends
from sqlmodel import Session, select
from app.api.deps import get_board_or_404, require_admin_auth
from app.api.deps import (
ActorContext,
get_board_or_404,
require_admin_auth,
require_admin_or_agent,
)
from app.core.auth import AuthContext
from app.db.session import get_session
from app.models.boards import Board
@@ -15,7 +20,7 @@ router = APIRouter(prefix="/boards", tags=["boards"])
@router.get("", response_model=list[BoardRead])
def list_boards(
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[Board]:
return list(session.exec(select(Board)))
@@ -36,7 +41,7 @@ def create_board(
@router.get("/{board_id}", response_model=BoardRead)
def get_board(
board: Board = Depends(get_board_or_404),
auth: AuthContext = Depends(require_admin_auth),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Board:
return board

View File

@@ -1,12 +1,18 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal
from fastapi import Depends, HTTPException, status
from sqlmodel import Session
from app.core.auth import AuthContext, get_auth_context
from app.core.agent_auth import AgentAuthContext, get_agent_auth_context_optional
from app.core.auth import AuthContext, get_auth_context, get_auth_context_optional
from app.db.session import get_session
from app.models.agents import Agent
from app.models.boards import Board
from app.models.tasks import Task
from app.models.users import User
from app.services.admin_access import require_admin
@@ -15,6 +21,25 @@ def require_admin_auth(auth: AuthContext = Depends(get_auth_context)) -> AuthCon
return auth
@dataclass
class ActorContext:
actor_type: Literal["user", "agent"]
user: User | None = None
agent: Agent | None = None
def require_admin_or_agent(
auth: AuthContext | None = Depends(get_auth_context_optional),
agent_auth: AgentAuthContext | None = Depends(get_agent_auth_context_optional),
) -> ActorContext:
if auth is not None:
require_admin(auth)
return ActorContext(actor_type="user", user=auth.user)
if agent_auth is not None:
return ActorContext(actor_type="agent", agent=agent_auth.agent)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
def get_board_or_404(
board_id: str,
session: Session = Depends(get_session),

View File

@@ -7,6 +7,7 @@ from app.core.auth import AuthContext
from app.core.config import settings
from app.integrations.openclaw_gateway import (
OpenClawGatewayError,
ensure_session,
get_chat_history,
openclaw_call,
send_message,
@@ -24,11 +25,24 @@ async def gateway_status(auth: AuthContext = Depends(require_admin_auth)) -> dic
sessions_list = list(sessions.get("sessions") or [])
else:
sessions_list = list(sessions or [])
main_session = settings.openclaw_main_session_key
main_session_entry: object | None = None
main_session_error: str | None = None
if main_session:
try:
ensured = await ensure_session(main_session, label="Main Agent")
if isinstance(ensured, dict):
main_session_entry = ensured.get("entry") or ensured
except OpenClawGatewayError as exc:
main_session_error = str(exc)
return {
"connected": True,
"gateway_url": gateway_url,
"sessions_count": len(sessions_list),
"sessions": sessions_list,
"main_session_key": main_session,
"main_session": main_session_entry,
"main_session_error": main_session_error,
}
except OpenClawGatewayError as exc:
return {
@@ -45,8 +59,21 @@ async def list_sessions(auth: AuthContext = Depends(require_admin_auth)) -> dict
except OpenClawGatewayError as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
if isinstance(sessions, dict):
return {"sessions": list(sessions.get("sessions") or [])}
return {"sessions": list(sessions or [])}
sessions_list = list(sessions.get("sessions") or [])
else:
sessions_list = list(sessions or [])
main_session = settings.openclaw_main_session_key
main_session_entry: object | None = None
if main_session:
try:
ensured = await ensure_session(main_session, label="Main Agent")
if isinstance(ensured, dict):
main_session_entry = ensured.get("entry") or ensured
except OpenClawGatewayError:
main_session_entry = None
return {"sessions": sessions_list, "main_session_key": main_session, "main_session": main_session_entry}
@router.get("/sessions/{session_id}")
@@ -61,7 +88,27 @@ async def get_session(
sessions_list = list(sessions.get("sessions") or [])
else:
sessions_list = list(sessions or [])
main_session = settings.openclaw_main_session_key
if main_session and not any(
session.get("key") == main_session for session in sessions_list
):
try:
await ensure_session(main_session, label="Main Agent")
refreshed = await openclaw_call("sessions.list")
if isinstance(refreshed, dict):
sessions_list = list(refreshed.get("sessions") or [])
else:
sessions_list = list(refreshed or [])
except OpenClawGatewayError:
pass
session = next((item for item in sessions_list if item.get("key") == session_id), None)
if session is None and main_session and session_id == main_session:
try:
ensured = await ensure_session(main_session, label="Main Agent")
if isinstance(ensured, dict):
session = ensured.get("entry") or ensured
except OpenClawGatewayError:
session = None
if session is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
return {"session": session}
@@ -92,6 +139,9 @@ async def send_session_message(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="content is required"
)
try:
main_session = settings.openclaw_main_session_key
if main_session and session_id == main_session:
await ensure_session(main_session, label="Main Agent")
await send_message(content, session_key=session_id)
except OpenClawGatewayError as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc

View File

@@ -2,10 +2,16 @@ from __future__ import annotations
from datetime import datetime
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from app.api.deps import get_board_or_404, get_task_or_404, require_admin_auth
from app.api.deps import (
ActorContext,
get_board_or_404,
get_task_or_404,
require_admin_auth,
require_admin_or_agent,
)
from app.core.auth import AuthContext
from app.db.session import get_session
from app.models.boards import Board
@@ -20,7 +26,7 @@ router = APIRouter(prefix="/boards/{board_id}/tasks", tags=["tasks"])
def list_tasks(
board: Board = Depends(get_board_or_404),
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
actor: ActorContext = Depends(require_admin_or_agent),
) -> list[Task]:
return list(session.exec(select(Task).where(Task.board_id == board.id)))
@@ -55,10 +61,14 @@ def update_task(
payload: TaskUpdate,
task: Task = Depends(get_task_or_404),
session: Session = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Task:
previous_status = task.status
updates = payload.model_dump(exclude_unset=True)
if actor.actor_type == "agent":
allowed_fields = {"status"}
if not set(updates).issubset(allowed_fields):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
for key, value in updates.items():
setattr(task, key, value)
task.updated_at = datetime.utcnow()
@@ -73,7 +83,13 @@ def update_task(
else:
event_type = "task.updated"
message = f"Task updated: {task.title}."
record_activity(session, event_type=event_type, task_id=task.id, message=message)
record_activity(
session,
event_type=event_type,
task_id=task.id,
message=message,
agent_id=actor.agent.id if actor.actor_type == "agent" and actor.agent else None,
)
session.commit()
return task

View File

@@ -95,3 +95,46 @@ async def get_auth_context(
actor_type="user",
user=user,
)
async def get_auth_context_optional(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
session: Session = Depends(get_session),
) -> AuthContext | None:
if credentials is None:
return None
try:
guard = _build_clerk_http_bearer(auto_error=False)
clerk_credentials = await guard(request)
except (RuntimeError, ValueError) as exc:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) from exc
except HTTPException as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from exc
auth_data = _resolve_clerk_auth(request, clerk_credentials)
try:
clerk_user_id = _parse_subject(auth_data)
except ValidationError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) from exc
if not clerk_user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
user = session.exec(select(User).where(User.clerk_user_id == clerk_user_id)).first()
if user is None:
claims = auth_data.decoded if auth_data and auth_data.decoded else {}
user = User(
clerk_user_id=clerk_user_id,
email=claims.get("email"),
name=claims.get("name"),
)
session.add(user)
session.commit()
session.refresh(user)
return AuthContext(
actor_type="user",
user=user,
)

View File

@@ -126,3 +126,14 @@ async def get_chat_history(session_key: str, limit: int | None = None) -> Any:
if limit is not None:
params["limit"] = limit
return await openclaw_call("chat.history", params)
async def delete_session(session_key: str) -> Any:
return await openclaw_call("sessions.delete", {"key": session_key})
async def ensure_session(session_key: str, label: str | None = None) -> Any:
params: dict[str, Any] = {"key": session_key}
if label:
params["label"] = label
return await openclaw_call("sessions.patch", params)

View File

@@ -13,6 +13,7 @@ class Agent(SQLModel, table=True):
name: str = Field(index=True)
status: str = Field(default="online", index=True)
openclaw_session_id: str | None = Field(default=None, index=True)
agent_token_hash: str | None = Field(default=None, index=True)
last_seen_at: datetime = Field(default_factory=datetime.utcnow)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)

View File

@@ -7,7 +7,7 @@ from uuid import uuid4
from jinja2 import Environment, FileSystemLoader, StrictUndefined
from app.core.config import settings
from app.integrations.openclaw_gateway import send_message
from app.integrations.openclaw_gateway import ensure_session, send_message
from app.models.agents import Agent
TEMPLATE_FILES = [
@@ -68,12 +68,11 @@ def _workspace_path(agent_name: str) -> str:
return f"{root}/{_slugify(agent_name)}"
def build_provisioning_message(agent: Agent) -> str:
def build_provisioning_message(agent: Agent, auth_token: str) -> str:
agent_id = str(agent.id)
workspace_path = _workspace_path(agent.name)
session_key = agent.openclaw_session_id or ""
base_url = settings.base_url or "REPLACE_WITH_BASE_URL"
auth_token = "REPLACE_WITH_AUTH_TOKEN"
context = {
"agent_name": agent.name,
@@ -114,9 +113,10 @@ def build_provisioning_message(agent: Agent) -> str:
)
async def send_provisioning_message(agent: Agent) -> None:
async def send_provisioning_message(agent: Agent, auth_token: str) -> None:
main_session = settings.openclaw_main_session_key
if not main_session:
return
message = build_provisioning_message(agent)
await ensure_session(main_session, label="Main Agent")
message = build_provisioning_message(agent, auth_token)
await send_message(message, session_key=main_session, deliver=False)