from __future__ import annotations from datetime import timedelta from uuid import UUID from sqlalchemy import case, func from sqlmodel import col, select from sqlmodel.ext.asyncio.session import AsyncSession from app.core.time import utcnow from app.models.agents import Agent from app.models.approvals import Approval from app.models.board_memory import BoardMemory from app.models.boards import Board from app.models.gateways import Gateway from app.models.tasks import Task from app.schemas.agents import AgentRead from app.schemas.approvals import ApprovalRead from app.schemas.board_memory import BoardMemoryRead from app.schemas.boards import BoardRead from app.schemas.view_models import BoardSnapshot, TaskCardRead from app.services.task_dependencies import ( blocked_by_dependency_ids, dependency_ids_by_task_id, dependency_status_by_id, ) OFFLINE_AFTER = timedelta(minutes=10) def _computed_agent_status(agent: Agent) -> str: now = utcnow() if agent.status in {"deleting", "updating"}: return agent.status if agent.last_seen_at is None: return "provisioning" if now - agent.last_seen_at > OFFLINE_AFTER: return "offline" return agent.status async def _gateway_main_session_keys(session: AsyncSession) -> set[str]: keys = (await session.exec(select(Gateway.main_session_key))).all() return {key for key in keys if key} def _agent_to_read(agent: Agent, main_session_keys: set[str]) -> AgentRead: model = AgentRead.model_validate(agent, from_attributes=True) computed_status = _computed_agent_status(agent) is_gateway_main = bool( agent.openclaw_session_id and agent.openclaw_session_id in main_session_keys ) return model.model_copy(update={"status": computed_status, "is_gateway_main": is_gateway_main}) def _memory_to_read(memory: BoardMemory) -> BoardMemoryRead: return BoardMemoryRead.model_validate(memory, from_attributes=True) def _approval_to_read(approval: Approval) -> ApprovalRead: return ApprovalRead.model_validate(approval, from_attributes=True) def _task_to_card( task: Task, *, agent_name_by_id: dict[UUID, str], counts_by_task_id: dict[UUID, tuple[int, int]], deps_by_task_id: dict[UUID, list[UUID]], dependency_status_by_id_map: dict[UUID, str], ) -> TaskCardRead: card = TaskCardRead.model_validate(task, from_attributes=True) approvals_count, approvals_pending_count = counts_by_task_id.get(task.id, (0, 0)) assignee = ( agent_name_by_id.get(task.assigned_agent_id) if task.assigned_agent_id is not None else None ) depends_on_task_ids = deps_by_task_id.get(task.id, []) blocked_by_task_ids = blocked_by_dependency_ids( dependency_ids=depends_on_task_ids, status_by_id=dependency_status_by_id_map, ) if task.status == "done": blocked_by_task_ids = [] return card.model_copy( update={ "assignee": assignee, "approvals_count": approvals_count, "approvals_pending_count": approvals_pending_count, "depends_on_task_ids": depends_on_task_ids, "blocked_by_task_ids": blocked_by_task_ids, "is_blocked": bool(blocked_by_task_ids), } ) async def build_board_snapshot(session: AsyncSession, board: Board) -> BoardSnapshot: board_read = BoardRead.model_validate(board, from_attributes=True) tasks = list( await session.exec( select(Task).where(col(Task.board_id) == board.id).order_by(col(Task.created_at).desc()) ) ) task_ids = [task.id for task in tasks] deps_by_task_id = await dependency_ids_by_task_id(session, board_id=board.id, task_ids=task_ids) all_dependency_ids: list[UUID] = [] for values in deps_by_task_id.values(): all_dependency_ids.extend(values) dependency_status_by_id_map = await dependency_status_by_id( session, board_id=board.id, dependency_ids=list({*all_dependency_ids}), ) main_session_keys = await _gateway_main_session_keys(session) agents = list( await session.exec( select(Agent) .where(col(Agent.board_id) == board.id) .order_by(col(Agent.created_at).desc()) ) ) agent_reads = [_agent_to_read(agent, main_session_keys) for agent in agents] agent_name_by_id = {agent.id: agent.name for agent in agents} pending_approvals_count = int( ( await session.exec( select(func.count(col(Approval.id))) .where(col(Approval.board_id) == board.id) .where(col(Approval.status) == "pending") ) ).one() ) approvals = list( await session.exec( select(Approval) .where(col(Approval.board_id) == board.id) .order_by(col(Approval.created_at).desc()) .limit(200) ) ) approval_reads = [_approval_to_read(approval) for approval in approvals] counts_by_task_id: dict[UUID, tuple[int, int]] = {} rows = list( await session.exec( select( col(Approval.task_id), func.count(col(Approval.id)).label("total"), func.sum(case((col(Approval.status) == "pending", 1), else_=0)).label("pending"), ) .where(col(Approval.board_id) == board.id) .where(col(Approval.task_id).is_not(None)) .group_by(col(Approval.task_id)) ) ) for task_id, total, pending in rows: if task_id is None: continue counts_by_task_id[task_id] = (int(total or 0), int(pending or 0)) task_cards = [ _task_to_card( task, agent_name_by_id=agent_name_by_id, counts_by_task_id=counts_by_task_id, deps_by_task_id=deps_by_task_id, dependency_status_by_id_map=dependency_status_by_id_map, ) for task in tasks ] chat_messages = list( await session.exec( select(BoardMemory) .where(col(BoardMemory.board_id) == board.id) .where(col(BoardMemory.is_chat).is_(True)) # Old/invalid rows (empty/whitespace-only content) can exist; exclude them to # satisfy the NonEmptyStr response schema. .where(func.length(func.trim(col(BoardMemory.content))) > 0) .order_by(col(BoardMemory.created_at).desc()) .limit(200) ) ) chat_messages.sort(key=lambda item: item.created_at) chat_reads = [_memory_to_read(memory) for memory in chat_messages] return BoardSnapshot( board=board_read, tasks=task_cards, agents=agent_reads, approvals=approval_reads, chat_messages=chat_reads, pending_approvals_count=pending_approvals_count, )