from __future__ import annotations import asyncio import json from collections.abc import AsyncIterator from datetime import datetime, timezone from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from sqlalchemy import func from sqlmodel import col, select from sqlmodel.ext.asyncio.session import AsyncSession from sse_starlette.sse import EventSourceResponse from app.api.deps import ActorContext, get_board_or_404, require_admin_or_agent from app.core.config import settings from app.core.time import utcnow from app.db.pagination import paginate from app.db.session import async_session_maker, get_session from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message from app.models.agents import Agent from app.models.board_memory import BoardMemory from app.models.boards import Board from app.models.gateways import Gateway from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead from app.schemas.pagination import DefaultLimitOffsetPage from app.services.mentions import extract_mentions, matches_agent_mention router = APIRouter(prefix="/boards/{board_id}/memory", tags=["board-memory"]) def _parse_since(value: str | None) -> datetime | None: if not value: return None normalized = value.strip() if not normalized: return None normalized = normalized.replace("Z", "+00:00") try: parsed = datetime.fromisoformat(normalized) except ValueError: return None if parsed.tzinfo is not None: return parsed.astimezone(timezone.utc).replace(tzinfo=None) return parsed def _serialize_memory(memory: BoardMemory) -> dict[str, object]: return BoardMemoryRead.model_validate(memory, from_attributes=True).model_dump(mode="json") async def _gateway_config(session: AsyncSession, board: Board) -> GatewayClientConfig | None: if board.gateway_id is None: return None gateway = await session.get(Gateway, board.gateway_id) if gateway is None or not gateway.url: return None return GatewayClientConfig(url=gateway.url, token=gateway.token) async def _send_agent_message( *, session_key: str, config: GatewayClientConfig, agent_name: str, message: str, ) -> None: await ensure_session(session_key, config=config, label=agent_name) await send_message(message, session_key=session_key, config=config, deliver=False) async def _fetch_memory_events( session: AsyncSession, board_id: UUID, since: datetime, is_chat: bool | None = None, ) -> list[BoardMemory]: statement = ( select(BoardMemory) .where(col(BoardMemory.board_id) == board_id) # Old/invalid rows (empty/whitespace-only content) can exist; exclude them to # satisfy the NonEmptyStr response schema. .where(func.length(func.trim(col(BoardMemory.content))) > 0) ) if is_chat is not None: statement = statement.where(col(BoardMemory.is_chat) == is_chat) statement = ( statement.where(col(BoardMemory.created_at) >= since) .order_by(col(BoardMemory.created_at)) ) return list(await session.exec(statement)) async def _notify_chat_targets( *, session: AsyncSession, board: Board, memory: BoardMemory, actor: ActorContext, ) -> None: if not memory.content: return config = await _gateway_config(session, board) if config is None: return mentions = extract_mentions(memory.content) statement = select(Agent).where(col(Agent.board_id) == board.id) targets: dict[str, Agent] = {} for agent in await session.exec(statement): if agent.is_board_lead: targets[str(agent.id)] = agent continue if mentions and matches_agent_mention(agent, mentions): targets[str(agent.id)] = agent if actor.actor_type == "agent" and actor.agent: targets.pop(str(actor.agent.id), None) if not targets: return actor_name = "User" if actor.actor_type == "agent" and actor.agent: actor_name = actor.agent.name elif actor.user: actor_name = actor.user.preferred_name or actor.user.name or actor_name snippet = memory.content.strip() if len(snippet) > 800: snippet = f"{snippet[:797]}..." base_url = settings.base_url or "http://localhost:8000" for agent in targets.values(): if not agent.openclaw_session_id: continue mentioned = matches_agent_mention(agent, mentions) header = "BOARD CHAT MENTION" if mentioned else "BOARD CHAT" message = ( f"{header}\n" f"Board: {board.name}\n" f"From: {actor_name}\n\n" f"{snippet}\n\n" "Reply via board chat:\n" f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n" 'Body: {"content":"...","tags":["chat"]}' ) try: await _send_agent_message( session_key=agent.openclaw_session_id, config=config, agent_name=agent.name, message=message, ) except OpenClawGatewayError: continue @router.get("", response_model=DefaultLimitOffsetPage[BoardMemoryRead]) async def list_board_memory( is_chat: bool | None = Query(default=None), board: Board = Depends(get_board_or_404), session: AsyncSession = Depends(get_session), actor: ActorContext = Depends(require_admin_or_agent), ) -> DefaultLimitOffsetPage[BoardMemoryRead]: if actor.actor_type == "agent" and actor.agent: if actor.agent.board_id and actor.agent.board_id != board.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) statement = ( select(BoardMemory) .where(col(BoardMemory.board_id) == board.id) # Old/invalid rows (empty/whitespace-only content) can exist; exclude them to # satisfy the NonEmptyStr response schema. .where(func.length(func.trim(col(BoardMemory.content))) > 0) ) if is_chat is not None: statement = statement.where(col(BoardMemory.is_chat) == is_chat) statement = statement.order_by(col(BoardMemory.created_at).desc()) return await paginate(session, statement) @router.get("/stream") async def stream_board_memory( request: Request, board: Board = Depends(get_board_or_404), actor: ActorContext = Depends(require_admin_or_agent), since: str | None = Query(default=None), is_chat: bool | None = Query(default=None), ) -> EventSourceResponse: if actor.actor_type == "agent" and actor.agent: if actor.agent.board_id and actor.agent.board_id != board.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) since_dt = _parse_since(since) or utcnow() last_seen = since_dt async def event_generator() -> AsyncIterator[dict[str, str]]: nonlocal last_seen while True: if await request.is_disconnected(): break async with async_session_maker() as session: memories = await _fetch_memory_events( session, board.id, last_seen, is_chat=is_chat, ) for memory in memories: if memory.created_at > last_seen: last_seen = memory.created_at payload = {"memory": _serialize_memory(memory)} yield {"event": "memory", "data": json.dumps(payload)} await asyncio.sleep(2) return EventSourceResponse(event_generator(), ping=15) @router.post("", response_model=BoardMemoryRead) async def create_board_memory( payload: BoardMemoryCreate, board: Board = Depends(get_board_or_404), session: AsyncSession = Depends(get_session), actor: ActorContext = Depends(require_admin_or_agent), ) -> BoardMemory: if actor.actor_type == "agent" and actor.agent: if actor.agent.board_id and actor.agent.board_id != board.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) is_chat = payload.tags is not None and "chat" in payload.tags source = payload.source if is_chat and not source: if actor.actor_type == "agent" and actor.agent: source = actor.agent.name elif actor.user: source = actor.user.preferred_name or actor.user.name or "User" memory = BoardMemory( board_id=board.id, content=payload.content, tags=payload.tags, is_chat=is_chat, source=source, ) session.add(memory) await session.commit() await session.refresh(memory) if is_chat: await _notify_chat_targets(session=session, board=board, memory=memory, actor=actor) return memory