Files
openclaw-mission-control/backend/app/api/board_memory.py

266 lines
9.7 KiB
Python

from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import func
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from sse_starlette.sse import EventSourceResponse
from app.api.deps import ActorContext, get_board_or_404, require_admin_or_agent
from app.core.config import settings
from app.core.time import utcnow
from app.db.pagination import paginate
from app.db.session import async_session_maker, get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.agents import Agent
from app.models.board_memory import BoardMemory
from app.models.boards import Board
from app.models.gateways import Gateway
from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.mentions import extract_mentions, matches_agent_mention
router = APIRouter(prefix="/boards/{board_id}/memory", tags=["board-memory"])
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
def _serialize_memory(memory: BoardMemory) -> dict[str, object]:
return BoardMemoryRead.model_validate(memory, from_attributes=True).model_dump(mode="json")
async def _gateway_config(session: AsyncSession, board: Board) -> GatewayClientConfig | None:
if board.gateway_id is None:
return None
gateway = await session.get(Gateway, board.gateway_id)
if gateway is None or not gateway.url:
return None
return GatewayClientConfig(url=gateway.url, token=gateway.token)
async def _send_agent_message(
*,
session_key: str,
config: GatewayClientConfig,
agent_name: str,
message: str,
deliver: bool = False,
) -> None:
await ensure_session(session_key, config=config, label=agent_name)
await send_message(message, session_key=session_key, config=config, deliver=deliver)
async def _fetch_memory_events(
session: AsyncSession,
board_id: UUID,
since: datetime,
is_chat: bool | None = None,
) -> list[BoardMemory]:
statement = (
select(BoardMemory).where(col(BoardMemory.board_id) == board_id)
# Old/invalid rows (empty/whitespace-only content) can exist; exclude them to
# satisfy the NonEmptyStr response schema.
.where(func.length(func.trim(col(BoardMemory.content))) > 0)
)
if is_chat is not None:
statement = statement.where(col(BoardMemory.is_chat) == is_chat)
statement = statement.where(col(BoardMemory.created_at) >= since).order_by(
col(BoardMemory.created_at)
)
return list(await session.exec(statement))
async def _notify_chat_targets(
*,
session: AsyncSession,
board: Board,
memory: BoardMemory,
actor: ActorContext,
) -> None:
if not memory.content:
return
config = await _gateway_config(session, board)
if config is None:
return
normalized = memory.content.strip()
command = normalized.lower()
# Special-case control commands to reach all board agents.
# These are intended to be parsed verbatim by agent runtimes.
if command in {"/pause", "/resume"}:
statement = select(Agent).where(col(Agent.board_id) == board.id)
pause_targets: list[Agent] = list(await session.exec(statement))
for agent in pause_targets:
if actor.actor_type == "agent" and actor.agent and agent.id == actor.agent.id:
continue
if not agent.openclaw_session_id:
continue
try:
await _send_agent_message(
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
message=command,
deliver=True,
)
except OpenClawGatewayError:
continue
return
mentions = extract_mentions(memory.content)
statement = select(Agent).where(col(Agent.board_id) == board.id)
targets: dict[str, Agent] = {}
for agent in await session.exec(statement):
if agent.is_board_lead:
targets[str(agent.id)] = agent
continue
if mentions and matches_agent_mention(agent, mentions):
targets[str(agent.id)] = agent
if actor.actor_type == "agent" and actor.agent:
targets.pop(str(actor.agent.id), None)
if not targets:
return
actor_name = "User"
if actor.actor_type == "agent" and actor.agent:
actor_name = actor.agent.name
elif actor.user:
actor_name = actor.user.preferred_name or actor.user.name or actor_name
snippet = memory.content.strip()
if len(snippet) > 800:
snippet = f"{snippet[:797]}..."
base_url = settings.base_url or "http://localhost:8000"
for agent in targets.values():
if not agent.openclaw_session_id:
continue
mentioned = matches_agent_mention(agent, mentions)
header = "BOARD CHAT MENTION" if mentioned else "BOARD CHAT"
message = (
f"{header}\n"
f"Board: {board.name}\n"
f"From: {actor_name}\n\n"
f"{snippet}\n\n"
"Reply via board chat:\n"
f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n"
'Body: {"content":"...","tags":["chat"]}'
)
try:
await _send_agent_message(
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
message=message,
)
except OpenClawGatewayError:
continue
@router.get("", response_model=DefaultLimitOffsetPage[BoardMemoryRead])
async def list_board_memory(
is_chat: bool | None = Query(default=None),
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> DefaultLimitOffsetPage[BoardMemoryRead]:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
statement = (
select(BoardMemory).where(col(BoardMemory.board_id) == board.id)
# Old/invalid rows (empty/whitespace-only content) can exist; exclude them to
# satisfy the NonEmptyStr response schema.
.where(func.length(func.trim(col(BoardMemory.content))) > 0)
)
if is_chat is not None:
statement = statement.where(col(BoardMemory.is_chat) == is_chat)
statement = statement.order_by(col(BoardMemory.created_at).desc())
return await paginate(session, statement)
@router.get("/stream")
async def stream_board_memory(
request: Request,
board: Board = Depends(get_board_or_404),
actor: ActorContext = Depends(require_admin_or_agent),
since: str | None = Query(default=None),
is_chat: bool | None = Query(default=None),
) -> EventSourceResponse:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
since_dt = _parse_since(since) or utcnow()
last_seen = since_dt
async def event_generator() -> AsyncIterator[dict[str, str]]:
nonlocal last_seen
while True:
if await request.is_disconnected():
break
async with async_session_maker() as session:
memories = await _fetch_memory_events(
session,
board.id,
last_seen,
is_chat=is_chat,
)
for memory in memories:
if memory.created_at > last_seen:
last_seen = memory.created_at
payload = {"memory": _serialize_memory(memory)}
yield {"event": "memory", "data": json.dumps(payload)}
await asyncio.sleep(2)
return EventSourceResponse(event_generator(), ping=15)
@router.post("", response_model=BoardMemoryRead)
async def create_board_memory(
payload: BoardMemoryCreate,
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> BoardMemory:
if actor.actor_type == "agent" and actor.agent:
if actor.agent.board_id and actor.agent.board_id != board.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
is_chat = payload.tags is not None and "chat" in payload.tags
source = payload.source
if is_chat and not source:
if actor.actor_type == "agent" and actor.agent:
source = actor.agent.name
elif actor.user:
source = actor.user.preferred_name or actor.user.name or "User"
memory = BoardMemory(
board_id=board.id,
content=payload.content,
tags=payload.tags,
is_chat=is_chat,
source=source,
)
session.add(memory)
await session.commit()
await session.refresh(memory)
if is_chat:
await _notify_chat_targets(session=session, board=board, memory=memory, actor=actor)
return memory