From 69a6597936671f4a195f261f22742c3b2f7e3225 Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Fri, 6 Feb 2026 00:44:03 +0530 Subject: [PATCH] feat: implement task creation endpoint for board leads and enhance board chat functionality --- backend/app/api/agent.py | 58 ++- backend/app/api/board_memory.py | 197 ++++++++- backend/app/api/tasks.py | 20 +- frontend/src/app/boards/[boardId]/page.tsx | 402 +++++++++++++++++- .../src/components/BoardApprovalsPanel.tsx | 340 ++++++++------- .../src/components/molecules/TaskCard.tsx | 35 +- .../src/components/organisms/TaskBoard.tsx | 26 +- templates/AGENTS.md | 1 + templates/HEARTBEAT_AGENT.md | 7 + templates/HEARTBEAT_LEAD.md | 22 +- 10 files changed, 911 insertions(+), 197 deletions(-) diff --git a/backend/app/api/agent.py b/backend/app/api/agent.py index e98bd370..78c2ecac 100644 --- a/backend/app/api/agent.py +++ b/backend/app/api/agent.py @@ -21,13 +21,20 @@ from app.integrations.openclaw_gateway import ( send_message, ) from app.models.agents import Agent +from app.models.tasks import Task from app.models.boards import Board from app.models.gateways import Gateway from app.schemas.approvals import ApprovalCreate, ApprovalRead from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead from app.schemas.board_onboarding import BoardOnboardingRead from app.schemas.boards import BoardRead -from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskRead, TaskUpdate +from app.schemas.tasks import ( + TaskCommentCreate, + TaskCommentRead, + TaskCreate, + TaskRead, + TaskUpdate, +) from app.schemas.agents import AgentCreate, AgentHeartbeatCreate, AgentNudge, AgentRead from app.services.activity_log import record_activity @@ -120,6 +127,55 @@ def list_tasks( ) +@router.post("/boards/{board_id}/tasks", response_model=TaskRead) +def create_task( + payload: TaskCreate, + board: Board = Depends(get_board_or_404), + session: Session = Depends(get_session), + agent_ctx: AgentAuthContext = Depends(get_agent_auth_context), +) -> TaskRead: + _guard_board_access(agent_ctx, board) + if not agent_ctx.agent.is_board_lead: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + tasks_api.validate_task_status(payload.status) + task = Task.model_validate(payload) + task.board_id = board.id + task.auto_created = True + task.auto_reason = f"lead_agent:{agent_ctx.agent.id}" + if task.assigned_agent_id: + agent = session.get(Agent, task.assigned_agent_id) + if agent is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + if agent.is_board_lead: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Board leads cannot assign tasks to themselves.", + ) + if agent.board_id and agent.board_id != board.id: + raise HTTPException(status_code=status.HTTP_409_CONFLICT) + session.add(task) + session.commit() + session.refresh(task) + record_activity( + session, + event_type="task.created", + task_id=task.id, + message=f"Task created by lead: {task.title}.", + agent_id=agent_ctx.agent.id, + ) + session.commit() + if task.assigned_agent_id: + assigned_agent = session.get(Agent, task.assigned_agent_id) + if assigned_agent: + tasks_api._notify_agent_on_task_assign( + session=session, + board=board, + task=task, + agent=assigned_agent, + ) + return task + + @router.patch("/boards/{board_id}/tasks/{task_id}", response_model=TaskRead) def update_task( payload: TaskUpdate, diff --git a/backend/app/api/board_memory.py b/backend/app/api/board_memory.py index e9b1e98f..176ec154 100644 --- a/backend/app/api/board_memory.py +++ b/backend/app/api/board_memory.py @@ -1,15 +1,166 @@ from __future__ import annotations -from fastapi import APIRouter, Depends, HTTPException, Query, status +from datetime import datetime, timezone +import asyncio +import json +import re + +from fastapi import APIRouter, Depends, HTTPException, Query, Request from sqlmodel import Session, col, select +from sse_starlette.sse import EventSourceResponse +from starlette.concurrency import run_in_threadpool from app.api.deps import ActorContext, get_board_or_404, require_admin_or_agent -from app.db.session import get_session +from app.core.config import settings +from app.db.session import engine, get_session +from app.integrations.openclaw_gateway import ( + GatewayConfig as GatewayClientConfig, + OpenClawGatewayError, + ensure_session, + send_message, +) +from app.models.agents import Agent from app.models.board_memory import BoardMemory +from app.models.gateways import Gateway from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead router = APIRouter(prefix="/boards/{board_id}/memory", tags=["board-memory"]) +MENTION_PATTERN = re.compile(r"@([A-Za-z][\w-]{0,31})") + + +def _parse_since(value: str | None) -> datetime | None: + if not value: + return None + normalized = value.strip() + if not normalized: + return None + normalized = normalized.replace("Z", "+00:00") + try: + parsed = datetime.fromisoformat(normalized) + except ValueError: + return None + if parsed.tzinfo is not None: + return parsed.astimezone(timezone.utc).replace(tzinfo=None) + return parsed + + +def _serialize_memory(memory: BoardMemory) -> dict[str, object]: + return BoardMemoryRead.model_validate( + memory, from_attributes=True + ).model_dump(mode="json") + + +def _extract_mentions(message: str) -> set[str]: + return {match.group(1).lower() for match in MENTION_PATTERN.finditer(message)} + + +def _matches_mention(agent: Agent, mentions: set[str]) -> bool: + if not mentions: + return False + name = (agent.name or "").strip() + if not name: + return False + normalized = name.lower() + if normalized in mentions: + return True + first = normalized.split()[0] + return first in mentions + + +def _gateway_config(session: Session, board) -> GatewayClientConfig | None: + if not board.gateway_id: + return None + gateway = session.get(Gateway, board.gateway_id) + if gateway is None or not gateway.url: + return None + return GatewayClientConfig(url=gateway.url, token=gateway.token) + + +async def _send_agent_message( + *, + session_key: str, + config: GatewayClientConfig, + agent_name: str, + message: str, +) -> None: + await ensure_session(session_key, config=config, label=agent_name) + await send_message(message, session_key=session_key, config=config, deliver=False) + + +def _fetch_memory_events( + board_id, + since: datetime, +) -> list[BoardMemory]: + with Session(engine) as session: + statement = ( + select(BoardMemory) + .where(col(BoardMemory.board_id) == board_id) + .where(col(BoardMemory.created_at) >= since) + .order_by(col(BoardMemory.created_at)) + ) + return list(session.exec(statement)) + + +def _notify_chat_targets( + *, + session: Session, + board, + memory: BoardMemory, + actor: ActorContext, +) -> None: + if not memory.content: + return + config = _gateway_config(session, board) + if config is None: + return + mentions = _extract_mentions(memory.content) + statement = select(Agent).where(col(Agent.board_id) == board.id) + targets: dict[str, Agent] = {} + for agent in session.exec(statement): + if agent.is_board_lead: + targets[str(agent.id)] = agent + continue + if mentions and _matches_mention(agent, mentions): + targets[str(agent.id)] = agent + if actor.actor_type == "agent" and actor.agent: + targets.pop(str(actor.agent.id), None) + if not targets: + return + actor_name = "User" + if actor.actor_type == "agent" and actor.agent: + actor_name = actor.agent.name + elif actor.user: + actor_name = actor.user.preferred_name or actor.user.name or actor_name + snippet = memory.content.strip() + if len(snippet) > 800: + snippet = f"{snippet[:797]}..." + base_url = settings.base_url or "http://localhost:8000" + for agent in targets.values(): + if not agent.openclaw_session_id: + continue + mentioned = _matches_mention(agent, mentions) + header = "BOARD CHAT MENTION" if mentioned else "BOARD CHAT" + message = ( + f"{header}\n" + f"Board: {board.name}\n" + f"From: {actor_name}\n\n" + f"{snippet}\n\n" + "Reply via board chat:\n" + f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n" + 'Body: {"content":"...","tags":["chat"]}' + ) + try: + asyncio.run( + _send_agent_message( + session_key=agent.openclaw_session_id, + config=config, + agent_name=agent.name, + message=message, + ) + ) + except OpenClawGatewayError: + continue @router.get("", response_model=list[BoardMemoryRead]) def list_board_memory( @@ -32,6 +183,37 @@ def list_board_memory( return list(session.exec(statement)) +@router.get("/stream") +async def stream_board_memory( + request: Request, + board=Depends(get_board_or_404), + actor: ActorContext = Depends(require_admin_or_agent), + since: str | None = Query(default=None), +) -> EventSourceResponse: + if actor.actor_type == "agent" and actor.agent: + if actor.agent.board_id and actor.agent.board_id != board.id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + since_dt = _parse_since(since) or datetime.utcnow() + last_seen = since_dt + + async def event_generator(): + nonlocal last_seen + while True: + if await request.is_disconnected(): + break + memories = await run_in_threadpool( + _fetch_memory_events, board.id, last_seen + ) + for memory in memories: + if memory.created_at > last_seen: + last_seen = memory.created_at + payload = {"memory": _serialize_memory(memory)} + yield {"event": "memory", "data": json.dumps(payload)} + await asyncio.sleep(2) + + return EventSourceResponse(event_generator(), ping=15) + + @router.post("", response_model=BoardMemoryRead) def create_board_memory( payload: BoardMemoryCreate, @@ -42,13 +224,22 @@ def create_board_memory( if actor.actor_type == "agent" and actor.agent: if actor.agent.board_id and actor.agent.board_id != board.id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + is_chat = payload.tags is not None and "chat" in payload.tags + source = payload.source + if is_chat and not source: + if actor.actor_type == "agent" and actor.agent: + source = actor.agent.name + elif actor.user: + source = actor.user.preferred_name or actor.user.name or "User" memory = BoardMemory( board_id=board.id, content=payload.content, tags=payload.tags, - source=payload.source, + source=source, ) session.add(memory) session.commit() session.refresh(memory) + if is_chat: + _notify_chat_targets(session=session, board=board, memory=memory, actor=actor) return memory diff --git a/backend/app/api/tasks.py b/backend/app/api/tasks.py index 70a73589..65ba17c5 100644 --- a/backend/app/api/tasks.py +++ b/backend/app/api/tasks.py @@ -140,6 +140,12 @@ def _lead_was_mentioned( return False +def _lead_created_task(task: Task, lead: Agent) -> bool: + if not task.auto_created or not task.auto_reason: + return False + return task.auto_reason == f"lead_agent:{lead.id}" + + def _fetch_task_events( board_id: UUID, since: datetime, @@ -692,11 +698,13 @@ def create_task_comment( ) -> ActivityEvent: if actor.actor_type == "agent" and actor.agent: if actor.agent.is_board_lead and task.status != "review": - if not _lead_was_mentioned(session, task, actor.agent): + if not _lead_was_mentioned(session, task, actor.agent) and not _lead_created_task( + task, actor.agent + ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=( - "Board leads can only comment during review or when mentioned." + "Board leads can only comment during review, when mentioned, or on tasks they created." ), ) if actor.agent.board_id and task.board_id and actor.agent.board_id != task.board_id: @@ -714,15 +722,15 @@ def create_task_comment( session.refresh(event) mention_names = _extract_mentions(payload.message) targets: dict[UUID, Agent] = {} - if task.assigned_agent_id: - assigned_agent = session.get(Agent, task.assigned_agent_id) - if assigned_agent: - targets[assigned_agent.id] = assigned_agent if mention_names and task.board_id: statement = select(Agent).where(col(Agent.board_id) == task.board_id) for agent in session.exec(statement): if _matches_mention(agent, mention_names): targets[agent.id] = agent + if not mention_names and task.assigned_agent_id: + assigned_agent = session.get(Agent, task.assigned_agent_id) + if assigned_agent: + targets[assigned_agent.id] = assigned_agent if actor.actor_type == "agent" and actor.agent: targets.pop(actor.agent.id, None) if targets: diff --git a/frontend/src/app/boards/[boardId]/page.tsx b/frontend/src/app/boards/[boardId]/page.tsx index f4dc2e5d..314bcb43 100644 --- a/frontend/src/app/boards/[boardId]/page.tsx +++ b/frontend/src/app/boards/[boardId]/page.tsx @@ -4,7 +4,7 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useParams, useRouter } from "next/navigation"; import { SignInButton, SignedIn, SignedOut, useAuth } from "@clerk/nextjs"; -import { Pencil, Settings, X } from "lucide-react"; +import { MessageSquare, Pencil, Settings, X } from "lucide-react"; import ReactMarkdown from "react-markdown"; import { BoardApprovalsPanel } from "@/components/BoardApprovalsPanel"; @@ -53,6 +53,8 @@ type Task = { assigned_agent_id?: string | null; created_at?: string | null; updated_at?: string | null; + approvalsCount?: number; + approvalsPendingCount?: number; }; type Agent = { @@ -87,8 +89,25 @@ type Approval = { resolved_at?: string | null; }; +type BoardChatMessage = { + id: string; + content: string; + tags?: string[] | null; + source?: string | null; + created_at: string; +}; + const apiBase = getApiBaseUrl(); +const approvalTaskId = (approval: Approval) => { + const payload = approval.payload ?? {}; + return ( + (payload as Record).task_id ?? + (payload as Record).taskId ?? + (payload as Record).taskID + ); +}; + const priorities = [ { value: "low", label: "Low" }, { value: "medium", label: "Medium" }, @@ -147,6 +166,12 @@ export default function BoardDetailPage() { const [approvalsUpdatingId, setApprovalsUpdatingId] = useState( null, ); + const [isChatOpen, setIsChatOpen] = useState(false); + const [chatMessages, setChatMessages] = useState([]); + const [chatInput, setChatInput] = useState(""); + const [isChatSending, setIsChatSending] = useState(false); + const [chatError, setChatError] = useState(null); + const chatMessagesRef = useRef([]); const [isDeletingTask, setIsDeletingTask] = useState(false); const [deleteTaskError, setDeleteTaskError] = useState(null); const [viewMode, setViewMode] = useState<"board" | "list">("board"); @@ -274,6 +299,10 @@ export default function BoardDetailPage() { agentsRef.current = agents; }, [agents]); + useEffect(() => { + chatMessagesRef.current = chatMessages; + }, [chatMessages]); + const loadApprovals = useCallback(async () => { if (!isSignedIn || !boardId) return; setIsApprovalsLoading(true); @@ -306,6 +335,138 @@ export default function BoardDetailPage() { loadApprovals(); }, [boardId, isSignedIn, loadApprovals]); + const loadBoardChat = useCallback(async () => { + if (!isSignedIn || !boardId) return; + setChatError(null); + try { + const token = await getToken(); + const response = await fetch( + `${apiBase}/api/v1/boards/${boardId}/memory?limit=200`, + { + headers: { + Authorization: token ? `Bearer ${token}` : "", + }, + }, + ); + if (!response.ok) { + throw new Error("Unable to load board chat."); + } + const data = (await response.json()) as BoardChatMessage[]; + const chatOnly = data.filter((item) => item.tags?.includes("chat")); + const ordered = chatOnly.sort((a, b) => { + const aTime = new Date(a.created_at).getTime(); + const bTime = new Date(b.created_at).getTime(); + return aTime - bTime; + }); + setChatMessages(ordered); + } catch (err) { + setChatError( + err instanceof Error ? err.message : "Unable to load board chat.", + ); + } + }, [boardId, getToken, isSignedIn]); + + useEffect(() => { + loadBoardChat(); + }, [boardId, isSignedIn, loadBoardChat]); + + const latestChatTimestamp = (items: BoardChatMessage[]) => { + if (!items.length) return undefined; + const latest = items.reduce((max, item) => { + const ts = new Date(item.created_at).getTime(); + return Number.isNaN(ts) ? max : Math.max(max, ts); + }, 0); + if (!latest) return undefined; + return new Date(latest).toISOString(); + }; + + useEffect(() => { + if (!isSignedIn || !boardId) return; + let isCancelled = false; + const abortController = new AbortController(); + + const connect = async () => { + try { + const token = await getToken(); + if (!token || isCancelled) return; + const url = new URL( + `${apiBase}/api/v1/boards/${boardId}/memory/stream`, + ); + const since = latestChatTimestamp(chatMessagesRef.current); + if (since) { + url.searchParams.set("since", since); + } + const response = await fetch(url.toString(), { + headers: { + Authorization: `Bearer ${token}`, + }, + signal: abortController.signal, + }); + if (!response.ok || !response.body) { + throw new Error("Unable to connect board chat stream."); + } + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (!isCancelled) { + const { value, done } = await reader.read(); + if (done) break; + buffer += decoder.decode(value, { stream: true }); + buffer = buffer.replace(/\r\n/g, "\n"); + let boundary = buffer.indexOf("\n\n"); + while (boundary !== -1) { + const raw = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + const lines = raw.split("\n"); + let eventType = "message"; + let data = ""; + for (const line of lines) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + } else if (line.startsWith("data:")) { + data += line.slice(5).trim(); + } + } + if (eventType === "memory" && data) { + try { + const payload = JSON.parse(data) as { memory?: BoardChatMessage }; + if (payload.memory?.tags?.includes("chat")) { + setChatMessages((prev) => { + const exists = prev.some( + (item) => item.id === payload.memory?.id, + ); + if (exists) return prev; + const next = [...prev, payload.memory as BoardChatMessage]; + next.sort((a, b) => { + const aTime = new Date(a.created_at).getTime(); + const bTime = new Date(b.created_at).getTime(); + return aTime - bTime; + }); + return next; + }); + } + } catch { + // ignore malformed + } + } + boundary = buffer.indexOf("\n\n"); + } + } + } catch { + if (!isCancelled) { + setTimeout(connect, 3000); + } + } + }; + + connect(); + return () => { + isCancelled = true; + abortController.abort(); + }; + }, [boardId, getToken, isSignedIn]); + useEffect(() => { if (!isSignedIn || !boardId) return; let isCancelled = false; @@ -642,6 +803,55 @@ export default function BoardDetailPage() { } }; + const handleSendChat = async () => { + if (!isSignedIn || !boardId) return; + const trimmed = chatInput.trim(); + if (!trimmed) return; + setIsChatSending(true); + setChatError(null); + try { + const token = await getToken(); + const response = await fetch( + `${apiBase}/api/v1/boards/${boardId}/memory`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: token ? `Bearer ${token}` : "", + }, + body: JSON.stringify({ + content: trimmed, + tags: ["chat"], + }), + }, + ); + if (!response.ok) { + throw new Error("Unable to send message."); + } + const created = (await response.json()) as BoardChatMessage; + if (created.tags?.includes("chat")) { + setChatMessages((prev) => { + const exists = prev.some((item) => item.id === created.id); + if (exists) return prev; + const next = [...prev, created]; + next.sort((a, b) => { + const aTime = new Date(a.created_at).getTime(); + const bTime = new Date(b.created_at).getTime(); + return aTime - bTime; + }); + return next; + }); + } + setChatInput(""); + } catch (err) { + setChatError( + err instanceof Error ? err.message : "Unable to send message.", + ); + } finally { + setIsChatSending(false); + } + }; + const assigneeById = useMemo(() => { const map = new Map(); agents @@ -652,6 +862,28 @@ export default function BoardDetailPage() { return map; }, [agents, boardId]); + const pendingApprovalsByTaskId = useMemo(() => { + const map = new Map(); + approvals + .filter((approval) => approval.status === "pending") + .forEach((approval) => { + const taskId = approvalTaskId(approval); + if (!taskId || typeof taskId !== "string") return; + map.set(taskId, (map.get(taskId) ?? 0) + 1); + }); + return map; + }, [approvals]); + + const totalApprovalsByTaskId = useMemo(() => { + const map = new Map(); + approvals.forEach((approval) => { + const taskId = approvalTaskId(approval); + if (!taskId || typeof taskId !== "string") return; + map.set(taskId, (map.get(taskId) ?? 0) + 1); + }); + return map; + }, [approvals]); + const displayTasks = useMemo( () => tasks.map((task) => ({ @@ -659,8 +891,10 @@ export default function BoardDetailPage() { assignee: task.assigned_agent_id ? assigneeById.get(task.assigned_agent_id) : undefined, + approvalsCount: totalApprovalsByTaskId.get(task.id) ?? 0, + approvalsPendingCount: pendingApprovalsByTaskId.get(task.id) ?? 0, })), - [tasks, assigneeById], + [tasks, assigneeById, pendingApprovalsByTaskId, totalApprovalsByTaskId], ); const boardAgents = useMemo( @@ -712,11 +946,7 @@ export default function BoardDetailPage() { if (!selectedTask) return []; const taskId = selectedTask.id; return approvals.filter((approval) => { - const payload = approval.payload ?? {}; - const payloadTaskId = - (payload as Record).task_id ?? - (payload as Record).taskId ?? - (payload as Record).taskID; + const payloadTaskId = approvalTaskId(approval); return payloadTaskId === taskId; }); }, [approvals, selectedTask]); @@ -770,6 +1000,7 @@ export default function BoardDetailPage() { }; const openComments = (task: Task) => { + setIsChatOpen(false); setSelectedTask(task); setIsDetailOpen(true); void loadComments(task.id); @@ -785,6 +1016,18 @@ export default function BoardDetailPage() { setIsEditDialogOpen(false); }; + const openBoardChat = () => { + if (isDetailOpen) { + closeComments(); + } + setIsChatOpen(true); + }; + + const closeBoardChat = () => { + setIsChatOpen(false); + setChatError(null); + }; + const handlePostComment = async () => { if (!selectedTask || !boardId || !isSignedIn) return; const trimmed = newComment.trim(); @@ -1200,6 +1443,14 @@ export default function BoardDetailPage() { ) : null} + + +
+
+ {chatError ? ( +
+ {chatError} +
+ ) : null} + {chatMessages.length === 0 ? ( +

+ No messages yet. Start the conversation with your lead agent. +

+ ) : ( + chatMessages.map((message) => ( +
+
+

+ {message.source ?? "User"} +

+ + {formatTaskTimestamp(message.created_at)} + +
+
+ ( +

+ ), + ul: ({ ...props }) => ( +

    + ), + ol: ({ ...props }) => ( +
      + ), + strong: ({ ...props }) => ( + + ), + }} + > + {message.content} + +
+
+ )) + )} +
+
+