diff --git a/backend/app/api/approvals.py b/backend/app/api/approvals.py index 5619e84e..f7258652 100644 --- a/backend/app/api/approvals.py +++ b/backend/app/api/approvals.py @@ -30,8 +30,8 @@ from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalStatus, from app.schemas.pagination import DefaultLimitOffsetPage from app.services.activity_log import record_activity from app.services.approval_task_links import ( - lock_tasks_for_approval, load_task_ids_by_approval, + lock_tasks_for_approval, normalize_task_ids, pending_approval_conflicts_by_task, replace_approval_task_links, @@ -408,7 +408,9 @@ async def update_approval( if "status" in updates: target_status = updates["status"] if target_status == "pending" and prior_status != "pending": - task_ids_by_approval = await load_task_ids_by_approval(session, approval_ids=[approval.id]) + task_ids_by_approval = await load_task_ids_by_approval( + session, approval_ids=[approval.id] + ) approval_task_ids = task_ids_by_approval.get(approval.id) if not approval_task_ids and approval.task_id is not None: approval_task_ids = [approval.task_id] diff --git a/backend/app/api/tasks.py b/backend/app/api/tasks.py index 4874eabd..91ce21d2 100644 --- a/backend/app/api/tasks.py +++ b/backend/app/api/tasks.py @@ -35,6 +35,7 @@ from app.models.boards import Board from app.models.task_dependencies import TaskDependency from app.models.task_fingerprints import TaskFingerprint from app.models.tasks import Task +from app.schemas.activity_events import ActivityEventRead from app.schemas.common import OkResponse from app.schemas.errors import BlockedTaskError from app.schemas.pagination import DefaultLimitOffsetPage @@ -648,7 +649,10 @@ def _task_event_payload( deps_map: dict[UUID, list[UUID]], dep_status: dict[UUID, str], ) -> dict[str, object]: - payload: dict[str, object] = {"type": event.event_type} + payload: dict[str, object] = { + "type": event.event_type, + "activity": ActivityEventRead.model_validate(event).model_dump(mode="json"), + } if event.event_type == "task.comment": payload["comment"] = _serialize_comment(event) return payload diff --git a/backend/app/services/approval_task_links.py b/backend/app/services/approval_task_links.py index a1dc7a6d..6222c8f1 100644 --- a/backend/app/services/approval_task_links.py +++ b/backend/app/services/approval_task_links.py @@ -196,10 +196,10 @@ async def pending_approval_conflicts_by_task( legacy_statement = legacy_statement.where(col(Approval.id) != exclude_approval_id) legacy_rows = list(await session.exec(legacy_statement)) - for task_id, approval_id, _created_at in legacy_rows: - if task_id is None: + for legacy_task_id, approval_id, _created_at in legacy_rows: + if legacy_task_id is None: continue - conflicts.setdefault(task_id, approval_id) + conflicts.setdefault(legacy_task_id, approval_id) return conflicts diff --git a/backend/tests/test_agent_auth_token_lookup_regression.py b/backend/tests/test_agent_auth_token_lookup_regression.py index 6dc78c2d..63397aa6 100644 --- a/backend/tests/test_agent_auth_token_lookup_regression.py +++ b/backend/tests/test_agent_auth_token_lookup_regression.py @@ -35,7 +35,9 @@ async def test_agent_token_lookup_should_not_verify_more_than_once( async def exec(self, _stmt: object) -> list[object]: agents = [] for i in range(50): - agents.append(SimpleNamespace(agent_token_hash=f"pbkdf2_sha256$1$salt{i}$digest{i}")) + agents.append( + SimpleNamespace(agent_token_hash=f"pbkdf2_sha256$1$salt{i}$digest{i}") + ) return agents calls = {"n": 0} diff --git a/backend/tests/test_tasks_api_rows.py b/backend/tests/test_tasks_api_rows.py index 0c457b11..2b5d6ab6 100644 --- a/backend/tests/test_tasks_api_rows.py +++ b/backend/tests/test_tasks_api_rows.py @@ -5,7 +5,7 @@ from uuid import uuid4 import pytest -from app.api.tasks import _coerce_task_event_rows +from app.api.tasks import _coerce_task_event_rows, _task_event_payload from app.models.activity_events import ActivityEvent from app.models.tasks import Task @@ -51,3 +51,65 @@ def test_coerce_task_event_rows_accepts_row_like_values(): def test_coerce_task_event_rows_rejects_invalid_values(): with pytest.raises(TypeError, match="Expected \\(ActivityEvent, Task \\| None\\) rows"): _coerce_task_event_rows([("bad", "row")]) + + +def test_task_event_payload_includes_activity_for_comment_event() -> None: + task = Task(board_id=uuid4(), title="Ship patch") + event = ActivityEvent( + event_type="task.comment", + message="Looks good.", + task_id=task.id, + agent_id=uuid4(), + ) + + payload = _task_event_payload( + event, + task, + deps_map={}, + dep_status={}, + ) + + assert payload["type"] == "task.comment" + assert payload["activity"] == { + "id": str(event.id), + "event_type": "task.comment", + "message": "Looks good.", + "agent_id": str(event.agent_id), + "task_id": str(task.id), + "created_at": event.created_at.isoformat(), + } + comment = payload["comment"] + assert isinstance(comment, dict) + assert comment["id"] == str(event.id) + assert comment["task_id"] == str(task.id) + assert comment["message"] == "Looks good." + + +def test_task_event_payload_includes_activity_for_non_comment_event() -> None: + task = Task(board_id=uuid4(), title="Wire stream events", status="in_progress") + event = ActivityEvent( + event_type="task.updated", + message="Task updated: Wire stream events.", + task_id=task.id, + ) + + payload = _task_event_payload( + event, + task, + deps_map={}, + dep_status={}, + ) + + assert payload["type"] == "task.updated" + assert payload["activity"] == { + "id": str(event.id), + "event_type": "task.updated", + "message": "Task updated: Wire stream events.", + "agent_id": None, + "task_id": str(task.id), + "created_at": event.created_at.isoformat(), + } + task_payload = payload["task"] + assert isinstance(task_payload, dict) + assert task_payload["id"] == str(task.id) + assert task_payload["is_blocked"] is False diff --git a/frontend/src/app/activity/page.tsx b/frontend/src/app/activity/page.tsx index 30f64baa..f4a65c26 100644 --- a/frontend/src/app/activity/page.tsx +++ b/frontend/src/app/activity/page.tsx @@ -4,15 +4,31 @@ import { memo, useCallback, useEffect, useMemo, useRef, useState } from "react"; import Link from "next/link"; import { SignedIn, SignedOut, useAuth } from "@/auth/clerk"; -import { ArrowUpRight, Activity as ActivityIcon } from "lucide-react"; +import { Activity as ActivityIcon } from "lucide-react"; import { ApiError } from "@/api/mutator"; +import { streamAgentsApiV1AgentsStreamGet } from "@/api/generated/agents/agents"; +import { listActivityApiV1ActivityGet } from "@/api/generated/activity/activity"; import { - type listTaskCommentFeedApiV1ActivityTaskCommentsGetResponse, - streamTaskCommentFeedApiV1ActivityTaskCommentsStreamGet, - useListTaskCommentFeedApiV1ActivityTaskCommentsGet, -} from "@/api/generated/activity/activity"; -import type { ActivityTaskCommentFeedItemRead } from "@/api/generated/model"; + getBoardSnapshotApiV1BoardsBoardIdSnapshotGet, + listBoardsApiV1BoardsGet, +} from "@/api/generated/boards/boards"; +import { streamBoardMemoryApiV1BoardsBoardIdMemoryStreamGet } from "@/api/generated/board-memory/board-memory"; +import { streamApprovalsApiV1BoardsBoardIdApprovalsStreamGet } from "@/api/generated/approvals/approvals"; +import { streamTasksApiV1BoardsBoardIdTasksStreamGet } from "@/api/generated/tasks/tasks"; +import { + type getMyMembershipApiV1OrganizationsMeMemberGetResponse, + useGetMyMembershipApiV1OrganizationsMeMemberGet, +} from "@/api/generated/organizations/organizations"; +import type { + ActivityEventRead, + AgentRead, + ApprovalRead, + BoardMemoryRead, + BoardRead, + TaskCommentRead, + TaskRead, +} from "@/api/generated/model"; import { Markdown } from "@/components/atoms/Markdown"; import { ActivityFeed } from "@/components/activity/ActivityFeed"; import { SignedOutPanel } from "@/components/auth/SignedOutPanel"; @@ -30,6 +46,62 @@ const SSE_RECONNECT_BACKOFF = { maxMs: 5 * 60_000, } as const; +const STREAM_CONNECT_SPACING_MS = 120; +const MAX_FEED_ITEMS = 300; +const PAGED_LIMIT = 200; +const PAGED_MAX = 1000; + +type Agent = AgentRead & { status: string }; + +type TaskEventType = + | "task.comment" + | "task.created" + | "task.updated" + | "task.status_changed"; + +type FeedEventType = + | TaskEventType + | "board.chat" + | "board.command" + | "agent.created" + | "agent.online" + | "agent.offline" + | "agent.updated" + | "approval.created" + | "approval.updated" + | "approval.approved" + | "approval.rejected"; + +type FeedItem = { + id: string; + created_at: string; + event_type: FeedEventType; + message: string | null; + agent_id: string | null; + actor_name: string; + actor_role: string | null; + board_id: string | null; + board_name: string | null; + task_id: string | null; + task_title: string | null; + title: string; +}; + +type TaskMeta = { + title: string; + boardId: string | null; +}; + +const TASK_EVENT_TYPES = new Set([ + "task.comment", + "task.created", + "task.updated", + "task.status_changed", +]); + +const isTaskEventType = (value: string): value is TaskEventType => + TASK_EVENT_TYPES.has(value as TaskEventType); + const formatShortTimestamp = (value: string) => { const date = parseApiDatetime(value); if (!date) return "—"; @@ -41,27 +113,101 @@ const formatShortTimestamp = (value: string) => { }); }; -const latestTimestamp = (items: ActivityTaskCommentFeedItemRead[]) => { - let latest = 0; - for (const item of items) { - const time = apiDatetimeToMs(item.created_at) ?? 0; - latest = Math.max(latest, time); - } - return latest ? new Date(latest).toISOString() : null; +const normalizeAgent = (agent: AgentRead): Agent => ({ + ...agent, + status: (agent.status ?? "offline").trim() || "offline", +}); + +const normalizeStatus = (value?: string | null) => + (value ?? "").trim().toLowerCase() || "offline"; + +const humanizeApprovalAction = (value: string): string => { + const cleaned = value.replace(/[._-]+/g, " ").trim(); + if (!cleaned) return "Approval"; + return cleaned.charAt(0).toUpperCase() + cleaned.slice(1); }; -const FeedCard = memo(function FeedCard({ - item, -}: { - item: ActivityTaskCommentFeedItemRead; -}) { - const message = (item.message ?? "").trim(); - const authorName = item.agent_name?.trim() || "Admin"; - const authorRole = item.agent_role?.trim() || null; - const authorAvatar = (authorName[0] ?? "A").toUpperCase(); +const humanizeStatus = (value: string): string => + value.replace(/_/g, " ").trim() || "offline"; - const taskHref = `/boards/${item.board_id}?taskId=${item.task_id}`; - const boardHref = `/boards/${item.board_id}`; +const roleFromAgent = (agent?: Agent | null): string | null => { + if (!agent) return null; + const profile = agent.identity_profile; + if (!profile || typeof profile !== "object") return null; + const role = profile.role; + if (typeof role !== "string") return null; + const trimmed = role.trim(); + return trimmed || null; +}; + +const eventLabel = (eventType: FeedEventType): string => { + if (eventType === "task.comment") return "Comment"; + if (eventType === "task.created") return "Created"; + if (eventType === "task.status_changed") return "Status"; + if (eventType === "board.chat") return "Chat"; + if (eventType === "board.command") return "Command"; + if (eventType === "agent.created") return "Agent"; + if (eventType === "agent.online") return "Online"; + if (eventType === "agent.offline") return "Offline"; + if (eventType === "agent.updated") return "Agent update"; + if (eventType === "approval.created") return "Approval"; + if (eventType === "approval.updated") return "Approval update"; + if (eventType === "approval.approved") return "Approved"; + if (eventType === "approval.rejected") return "Rejected"; + return "Updated"; +}; + +const eventPillClass = (eventType: FeedEventType): string => { + if (eventType === "task.comment") { + return "border-blue-200 bg-blue-50 text-blue-700"; + } + if (eventType === "task.created") { + return "border-emerald-200 bg-emerald-50 text-emerald-700"; + } + if (eventType === "task.status_changed") { + return "border-amber-200 bg-amber-50 text-amber-700"; + } + if (eventType === "board.chat") { + return "border-teal-200 bg-teal-50 text-teal-700"; + } + if (eventType === "board.command") { + return "border-fuchsia-200 bg-fuchsia-50 text-fuchsia-700"; + } + if (eventType === "agent.created") { + return "border-violet-200 bg-violet-50 text-violet-700"; + } + if (eventType === "agent.online") { + return "border-lime-200 bg-lime-50 text-lime-700"; + } + if (eventType === "agent.offline") { + return "border-slate-300 bg-slate-100 text-slate-700"; + } + if (eventType === "agent.updated") { + return "border-indigo-200 bg-indigo-50 text-indigo-700"; + } + if (eventType === "approval.created") { + return "border-cyan-200 bg-cyan-50 text-cyan-700"; + } + if (eventType === "approval.updated") { + return "border-sky-200 bg-sky-50 text-sky-700"; + } + if (eventType === "approval.approved") { + return "border-emerald-200 bg-emerald-50 text-emerald-700"; + } + if (eventType === "approval.rejected") { + return "border-rose-200 bg-rose-50 text-rose-700"; + } + return "border-slate-200 bg-slate-100 text-slate-700"; +}; + +const FeedCard = memo(function FeedCard({ item }: { item: FeedItem }) { + const message = (item.message ?? "").trim(); + const authorAvatar = (item.actor_name[0] ?? "A").toUpperCase(); + const taskHref = + item.board_id && item.task_id + ? `/boards/${item.board_id}?taskId=${item.task_id}` + : null; + const boardHref = item.board_id ? `/boards/${item.board_id}` : null; return (
@@ -70,14 +216,12 @@ const FeedCard = memo(function FeedCard({ {authorAvatar}
-
-
+
+ {taskHref ? ( - {item.task_title} + {item.title} -
+ ) : ( +

+ {item.title} +

+ )} +
+ + {eventLabel(item.event_type)} + + {boardHref && item.board_name ? ( {item.board_name} - · - {authorName} - {authorRole ? ( - <> - · - {authorRole} - - ) : null} - · - - {formatShortTimestamp(item.created_at)} + ) : item.board_name ? ( + + {item.board_name} -
+ ) : null} + {item.board_name ? ( + · + ) : null} + + {item.actor_name} + + {item.actor_role ? ( + <> + · + {item.actor_role} + + ) : null} + · + + {formatShortTimestamp(item.created_at)} +
- - View task - -
@@ -133,10 +291,6 @@ const FeedCard = memo(function FeedCard({ FeedCard.displayName = "FeedCard"; export default function ActivityPage() { - // Clerk auth state can differ between the server render and client hydration. - // When that happens, / can cause a React hydration mismatch - // that fails Cypress (and is noisy in the console). Gate the initial render until - // after mount so the first client render matches the server markup. const [isMounted, setIsMounted] = useState(false); useEffect(() => { setIsMounted(true); @@ -145,92 +299,894 @@ export default function ActivityPage() { const { isSignedIn } = useAuth(); const isPageActive = usePageActive(); - const feedQuery = useListTaskCommentFeedApiV1ActivityTaskCommentsGet< - listTaskCommentFeedApiV1ActivityTaskCommentsGetResponse, + const membershipQuery = useGetMyMembershipApiV1OrganizationsMeMemberGet< + getMyMembershipApiV1OrganizationsMeMemberGetResponse, ApiError - >( - { limit: 200 }, - { - query: { - enabled: Boolean(isSignedIn), - refetchOnMount: "always", - refetchOnWindowFocus: false, - retry: false, - }, + >({ + query: { + enabled: Boolean(isSignedIn), + refetchOnMount: "always", + refetchOnWindowFocus: false, + retry: false, }, - ); + }); + const isOrgAdmin = useMemo(() => { + const member = + membershipQuery.data?.status === 200 ? membershipQuery.data.data : null; + return member ? ["owner", "admin"].includes(member.role) : false; + }, [membershipQuery.data]); - const [feedItems, setFeedItems] = useState( - [], - ); - const feedItemsRef = useRef([]); + const [isFeedLoading, setIsFeedLoading] = useState(false); + const [feedError, setFeedError] = useState(null); + const [feedItems, setFeedItems] = useState([]); + const [boards, setBoards] = useState([]); + + const feedItemsRef = useRef([]); const seenIdsRef = useRef>(new Set()); - const initializedRef = useRef(false); + const boardsByIdRef = useRef>(new Map()); + const taskMetaByIdRef = useRef>(new Map()); + const agentsByIdRef = useRef>(new Map()); + const approvalsByIdRef = useRef>(new Map()); useEffect(() => { feedItemsRef.current = feedItems; }, [feedItems]); - useEffect(() => { - if (initializedRef.current) return; - if (feedQuery.data?.status !== 200) return; - const items = feedQuery.data.data.items ?? []; - initializedRef.current = true; - setFeedItems((prev) => { - const map = new Map(); - [...prev, ...items].forEach((item) => map.set(item.id, item)); - const merged = [...map.values()]; - merged.sort((a, b) => { - const aTime = apiDatetimeToMs(a.created_at) ?? 0; - const bTime = apiDatetimeToMs(b.created_at) ?? 0; - return bTime - aTime; - }); - const next = merged.slice(0, 200); - seenIdsRef.current = new Set(next.map((item) => item.id)); - return next; - }); - }, [feedQuery.data]); + const boardIds = useMemo(() => boards.map((board) => board.id), [boards]); - const pushFeedItem = useCallback((item: ActivityTaskCommentFeedItemRead) => { + const pushFeedItem = useCallback((item: FeedItem) => { setFeedItems((prev) => { if (seenIdsRef.current.has(item.id)) return prev; seenIdsRef.current.add(item.id); const next = [item, ...prev]; - return next.slice(0, 200); + return next.slice(0, MAX_FEED_ITEMS); }); }, []); + const resolveAuthor = useCallback( + (agentId: string | null | undefined, fallbackName = "Admin") => { + if (agentId) { + const agent = agentsByIdRef.current.get(agentId); + if (agent) { + return { + id: agent.id, + name: agent.name, + role: roleFromAgent(agent), + }; + } + } + return { + id: agentId ?? null, + name: fallbackName, + role: null, + }; + }, + [], + ); + + const boardNameForId = useCallback((boardId: string | null | undefined) => { + if (!boardId) return null; + return boardsByIdRef.current.get(boardId)?.name ?? null; + }, []); + + const updateTaskMeta = useCallback( + ( + task: { id: string; title: string; board_id?: string | null }, + fallbackBoardId: string, + ) => { + const boardId = task.board_id ?? fallbackBoardId; + taskMetaByIdRef.current.set(task.id, { + title: task.title, + boardId, + }); + }, + [], + ); + + const mapTaskActivity = useCallback( + (event: ActivityEventRead): FeedItem | null => { + if (!isTaskEventType(event.event_type)) return null; + const meta = event.task_id + ? taskMetaByIdRef.current.get(event.task_id) + : null; + const boardId = meta?.boardId ?? null; + const author = resolveAuthor(event.agent_id, "Admin"); + return { + id: `activity:${event.id}`, + created_at: event.created_at, + event_type: event.event_type, + message: event.message ?? null, + agent_id: author.id, + actor_name: author.name, + actor_role: author.role, + board_id: boardId, + board_name: boardNameForId(boardId), + task_id: event.task_id ?? null, + task_title: meta?.title ?? null, + title: + meta?.title ?? (event.task_id ? "Unknown task" : "Task activity"), + }; + }, + [boardNameForId, resolveAuthor], + ); + + const mapTaskComment = useCallback( + (comment: TaskCommentRead, fallbackBoardId: string): FeedItem => { + const meta = comment.task_id + ? taskMetaByIdRef.current.get(comment.task_id) + : null; + const boardId = meta?.boardId ?? fallbackBoardId; + const author = resolveAuthor(comment.agent_id, "Admin"); + return { + id: `comment:${comment.id}`, + created_at: comment.created_at, + event_type: "task.comment", + message: comment.message ?? null, + agent_id: author.id, + actor_name: author.name, + actor_role: author.role, + board_id: boardId, + board_name: boardNameForId(boardId), + task_id: comment.task_id ?? null, + task_title: meta?.title ?? null, + title: + meta?.title ?? (comment.task_id ? "Unknown task" : "Task activity"), + }; + }, + [boardNameForId, resolveAuthor], + ); + + const mapApprovalEvent = useCallback( + ( + approval: ApprovalRead, + boardId: string, + previous: ApprovalRead | null = null, + ): FeedItem => { + const nextStatus = approval.status ?? "pending"; + const previousStatus = previous?.status ?? null; + const kind: FeedEventType = + previousStatus === null + ? nextStatus === "approved" + ? "approval.approved" + : nextStatus === "rejected" + ? "approval.rejected" + : "approval.created" + : nextStatus !== previousStatus + ? nextStatus === "approved" + ? "approval.approved" + : nextStatus === "rejected" + ? "approval.rejected" + : "approval.updated" + : "approval.updated"; + + const stamp = + kind === "approval.created" + ? approval.created_at + : (approval.resolved_at ?? approval.created_at); + const action = humanizeApprovalAction(approval.action_type); + const author = resolveAuthor(approval.agent_id, "Admin"); + const statusText = + nextStatus === "approved" + ? "approved" + : nextStatus === "rejected" + ? "rejected" + : "pending"; + const message = + kind === "approval.created" + ? `${action} requested (${approval.confidence}% confidence).` + : kind === "approval.approved" + ? `${action} approved (${approval.confidence}% confidence).` + : kind === "approval.rejected" + ? `${action} rejected (${approval.confidence}% confidence).` + : `${action} updated (${statusText}, ${approval.confidence}% confidence).`; + + const taskMeta = approval.task_id + ? taskMetaByIdRef.current.get(approval.task_id) + : null; + + return { + id: `approval:${approval.id}:${kind}:${stamp}`, + created_at: stamp, + event_type: kind, + message, + agent_id: author.id, + actor_name: author.name, + actor_role: author.role, + board_id: boardId, + board_name: boardNameForId(boardId), + task_id: approval.task_id ?? null, + task_title: taskMeta?.title ?? null, + title: `Approval · ${action}`, + }; + }, + [boardNameForId, resolveAuthor], + ); + + const mapBoardChat = useCallback( + (memory: BoardMemoryRead, boardId: string): FeedItem => { + const content = (memory.content ?? "").trim(); + const actorName = (memory.source ?? "User").trim() || "User"; + const command = content.startsWith("/"); + return { + id: `chat:${memory.id}`, + created_at: memory.created_at, + event_type: command ? "board.command" : "board.chat", + message: content || null, + agent_id: null, + actor_name: actorName, + actor_role: null, + board_id: boardId, + board_name: boardNameForId(boardId), + task_id: null, + task_title: null, + title: command ? "Board command" : "Board chat", + }; + }, + [boardNameForId], + ); + + const mapAgentEvent = useCallback( + ( + agent: Agent, + previous: Agent | null, + isSnapshot = false, + ): FeedItem | null => { + const nextStatus = normalizeStatus(agent.status); + const previousStatus = previous ? normalizeStatus(previous.status) : null; + const statusChanged = + previousStatus !== null && nextStatus !== previousStatus; + const profileChanged = + Boolean(previous) && + (previous?.name !== agent.name || + previous?.is_board_lead !== agent.is_board_lead || + JSON.stringify(previous?.identity_profile ?? {}) !== + JSON.stringify(agent.identity_profile ?? {})); + + let kind: FeedEventType; + if (isSnapshot) { + kind = + nextStatus === "online" + ? "agent.online" + : nextStatus === "offline" + ? "agent.offline" + : "agent.updated"; + } else if (!previous) { + kind = "agent.created"; + } else if (statusChanged && nextStatus === "online") { + kind = "agent.online"; + } else if (statusChanged && nextStatus === "offline") { + kind = "agent.offline"; + } else if (statusChanged || profileChanged) { + kind = "agent.updated"; + } else { + return null; + } + + const stamp = agent.last_seen_at ?? agent.updated_at ?? agent.created_at; + const message = + kind === "agent.created" + ? `${agent.name} joined this board.` + : kind === "agent.online" + ? `${agent.name} is online.` + : kind === "agent.offline" + ? `${agent.name} is offline.` + : `${agent.name} updated (${humanizeStatus(nextStatus)}).`; + + return { + id: `agent:${agent.id}:${isSnapshot ? "snapshot" : kind}:${stamp}`, + created_at: stamp, + event_type: kind, + message, + agent_id: agent.id, + actor_name: agent.name, + actor_role: roleFromAgent(agent), + board_id: agent.board_id ?? null, + board_name: boardNameForId(agent.board_id), + task_id: null, + task_title: null, + title: `Agent · ${agent.name}`, + }; + }, + [boardNameForId], + ); + + const latestTimestamp = useCallback( + (predicate: (item: FeedItem) => boolean): string | null => { + let latest = 0; + for (const item of feedItemsRef.current) { + if (!predicate(item)) continue; + const time = apiDatetimeToMs(item.created_at) ?? 0; + if (time > latest) latest = time; + } + return latest ? new Date(latest).toISOString() : null; + }, + [], + ); + + useEffect(() => { + if (!isSignedIn) { + setBoards([]); + setFeedItems([]); + setFeedError(null); + setIsFeedLoading(false); + seenIdsRef.current = new Set(); + boardsByIdRef.current = new Map(); + taskMetaByIdRef.current = new Map(); + agentsByIdRef.current = new Map(); + approvalsByIdRef.current = new Map(); + return; + } + + let cancelled = false; + setIsFeedLoading(true); + setFeedError(null); + + const loadInitial = async () => { + try { + const nextBoards: BoardRead[] = []; + for (let offset = 0; offset < PAGED_MAX; offset += PAGED_LIMIT) { + const result = await listBoardsApiV1BoardsGet({ + limit: PAGED_LIMIT, + offset, + }); + if (cancelled) return; + if (result.status !== 200) { + throw new Error("Unable to load boards."); + } + const items = result.data.items ?? []; + nextBoards.push(...items); + if (items.length < PAGED_LIMIT) { + break; + } + } + + if (cancelled) return; + setBoards(nextBoards); + boardsByIdRef.current = new Map( + nextBoards.map((board) => [board.id, board]), + ); + + const seeded: FeedItem[] = []; + const seedSeen = new Set(); + + // Snapshot seeding gives org-level approvals/agents/chat and task metadata. + const snapshotResults = await Promise.allSettled( + nextBoards.map((board) => + getBoardSnapshotApiV1BoardsBoardIdSnapshotGet(board.id), + ), + ); + if (cancelled) return; + + snapshotResults.forEach((result, index) => { + if (result.status !== "fulfilled") return; + if (result.value.status !== 200) return; + const board = nextBoards[index]; + const snapshot = result.value.data; + + (snapshot.tasks ?? []).forEach((task) => { + taskMetaByIdRef.current.set(task.id, { + title: task.title, + boardId: board.id, + }); + }); + + (snapshot.agents ?? []).forEach((agent) => { + const normalized = normalizeAgent(agent); + agentsByIdRef.current.set(normalized.id, normalized); + const agentItem = mapAgentEvent(normalized, null, true); + if (!agentItem || seedSeen.has(agentItem.id)) return; + seedSeen.add(agentItem.id); + seeded.push(agentItem); + }); + + (snapshot.approvals ?? []).forEach((approval) => { + approvalsByIdRef.current.set(approval.id, approval); + const approvalItem = mapApprovalEvent(approval, board.id, null); + if (seedSeen.has(approvalItem.id)) return; + seedSeen.add(approvalItem.id); + seeded.push(approvalItem); + }); + + (snapshot.chat_messages ?? []).forEach((memory) => { + const chatItem = mapBoardChat(memory, board.id); + if (seedSeen.has(chatItem.id)) return; + seedSeen.add(chatItem.id); + seeded.push(chatItem); + }); + }); + + for (let offset = 0; offset < PAGED_MAX; offset += PAGED_LIMIT) { + const result = await listActivityApiV1ActivityGet({ + limit: PAGED_LIMIT, + offset, + }); + if (cancelled) return; + if (result.status !== 200) { + throw new Error("Unable to load activity feed."); + } + const items = result.data.items ?? []; + for (const event of items) { + const mapped = mapTaskActivity(event); + if (!mapped || seedSeen.has(mapped.id)) continue; + seedSeen.add(mapped.id); + seeded.push(mapped); + } + if (items.length < PAGED_LIMIT) { + break; + } + } + + seeded.sort((a, b) => { + const aTime = apiDatetimeToMs(a.created_at) ?? 0; + const bTime = apiDatetimeToMs(b.created_at) ?? 0; + return bTime - aTime; + }); + const next = seeded.slice(0, MAX_FEED_ITEMS); + if (cancelled) return; + setFeedItems(next); + seenIdsRef.current = new Set(next.map((item) => item.id)); + } catch (err) { + if (cancelled) return; + setFeedError( + err instanceof Error ? err.message : "Unable to load activity feed.", + ); + } finally { + if (cancelled) return; + setIsFeedLoading(false); + } + }; + + void loadInitial(); + return () => { + cancelled = true; + }; + }, [ + isSignedIn, + mapAgentEvent, + mapApprovalEvent, + mapBoardChat, + mapTaskActivity, + ]); + useEffect(() => { if (!isPageActive) return; if (!isSignedIn) return; - let isCancelled = false; + if (boardIds.length === 0) return; + + let cancelled = false; + const cleanups: Array<() => void> = []; + + boardIds.forEach((boardId, index) => { + const boardDelay = index * STREAM_CONNECT_SPACING_MS; + const abortController = new AbortController(); + const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF); + let reconnectTimeout: number | undefined; + let connectTimer: number | undefined; + + const connect = async () => { + try { + const since = latestTimestamp( + (item) => + item.board_id === boardId && isTaskEventType(item.event_type), + ); + const streamResult = + await streamTasksApiV1BoardsBoardIdTasksStreamGet( + boardId, + since ? { since } : undefined, + { + headers: { Accept: "text/event-stream" }, + signal: abortController.signal, + }, + ); + if (streamResult.status !== 200) { + throw new Error("Unable to connect task stream."); + } + const response = streamResult.data as Response; + if (!(response instanceof Response) || !response.body) { + throw new Error("Unable to connect task stream."); + } + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (!cancelled) { + const { value, done } = await reader.read(); + if (done) break; + if (value && value.length) { + backoff.reset(); + } + buffer += decoder.decode(value, { stream: true }); + buffer = buffer.replace(/\r\n/g, "\n"); + let boundary = buffer.indexOf("\n\n"); + while (boundary !== -1) { + const raw = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + const lines = raw.split("\n"); + let eventType = "message"; + let data = ""; + for (const line of lines) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + } else if (line.startsWith("data:")) { + data += line.slice(5).trim(); + } + } + if (eventType === "task" && data) { + try { + const payload = JSON.parse(data) as { + type?: string; + activity?: ActivityEventRead; + task?: TaskRead; + comment?: TaskCommentRead; + }; + if (payload.task) { + updateTaskMeta(payload.task, boardId); + } + if (payload.activity) { + const mapped = mapTaskActivity(payload.activity); + if (mapped) { + if (!mapped.board_id) { + mapped.board_id = boardId; + mapped.board_name = boardNameForId(boardId); + } + if (!mapped.task_title && payload.task?.title) { + mapped.task_title = payload.task.title; + mapped.title = payload.task.title; + } + pushFeedItem(mapped); + } + } else if ( + payload.type === "task.comment" && + payload.comment + ) { + pushFeedItem(mapTaskComment(payload.comment, boardId)); + } + } catch { + // Ignore malformed payloads. + } + } + boundary = buffer.indexOf("\n\n"); + } + } + } catch { + // Reconnect handled below. + } + + if (!cancelled) { + if (reconnectTimeout !== undefined) { + window.clearTimeout(reconnectTimeout); + } + const delay = backoff.nextDelayMs(); + reconnectTimeout = window.setTimeout(() => { + reconnectTimeout = undefined; + void connect(); + }, delay); + } + }; + + connectTimer = window.setTimeout(() => { + connectTimer = undefined; + void connect(); + }, boardDelay); + + cleanups.push(() => { + abortController.abort(); + if (connectTimer !== undefined) { + window.clearTimeout(connectTimer); + } + if (reconnectTimeout !== undefined) { + window.clearTimeout(reconnectTimeout); + } + }); + }); + + return () => { + cancelled = true; + cleanups.forEach((fn) => fn()); + }; + }, [ + boardIds, + boardNameForId, + isPageActive, + isSignedIn, + latestTimestamp, + mapTaskActivity, + mapTaskComment, + pushFeedItem, + updateTaskMeta, + ]); + + useEffect(() => { + if (!isPageActive) return; + if (!isSignedIn) return; + if (boardIds.length === 0) return; + + let cancelled = false; + const cleanups: Array<() => void> = []; + + boardIds.forEach((boardId, index) => { + const boardDelay = index * STREAM_CONNECT_SPACING_MS; + const abortController = new AbortController(); + const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF); + let reconnectTimeout: number | undefined; + let connectTimer: number | undefined; + + const connect = async () => { + try { + const since = latestTimestamp( + (item) => + item.board_id === boardId && + item.event_type.startsWith("approval."), + ); + const streamResult = + await streamApprovalsApiV1BoardsBoardIdApprovalsStreamGet( + boardId, + since ? { since } : undefined, + { + headers: { Accept: "text/event-stream" }, + signal: abortController.signal, + }, + ); + if (streamResult.status !== 200) { + throw new Error("Unable to connect approvals stream."); + } + const response = streamResult.data as Response; + if (!(response instanceof Response) || !response.body) { + throw new Error("Unable to connect approvals stream."); + } + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (!cancelled) { + const { value, done } = await reader.read(); + if (done) break; + if (value && value.length) { + backoff.reset(); + } + buffer += decoder.decode(value, { stream: true }); + buffer = buffer.replace(/\r\n/g, "\n"); + let boundary = buffer.indexOf("\n\n"); + while (boundary !== -1) { + const raw = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + const lines = raw.split("\n"); + let eventType = "message"; + let data = ""; + for (const line of lines) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + } else if (line.startsWith("data:")) { + data += line.slice(5).trim(); + } + } + if (eventType === "approval" && data) { + try { + const payload = JSON.parse(data) as { + approval?: ApprovalRead; + }; + if (payload.approval) { + const previous = + approvalsByIdRef.current.get(payload.approval.id) ?? null; + approvalsByIdRef.current.set( + payload.approval.id, + payload.approval, + ); + pushFeedItem( + mapApprovalEvent(payload.approval, boardId, previous), + ); + } + } catch { + // Ignore malformed payloads. + } + } + boundary = buffer.indexOf("\n\n"); + } + } + } catch { + // Reconnect handled below. + } + + if (!cancelled) { + if (reconnectTimeout !== undefined) { + window.clearTimeout(reconnectTimeout); + } + const delay = backoff.nextDelayMs(); + reconnectTimeout = window.setTimeout(() => { + reconnectTimeout = undefined; + void connect(); + }, delay); + } + }; + + connectTimer = window.setTimeout(() => { + connectTimer = undefined; + void connect(); + }, boardDelay); + + cleanups.push(() => { + abortController.abort(); + if (connectTimer !== undefined) { + window.clearTimeout(connectTimer); + } + if (reconnectTimeout !== undefined) { + window.clearTimeout(reconnectTimeout); + } + }); + }); + + return () => { + cancelled = true; + cleanups.forEach((fn) => fn()); + }; + }, [ + boardIds, + isPageActive, + isSignedIn, + latestTimestamp, + mapApprovalEvent, + pushFeedItem, + ]); + + useEffect(() => { + if (!isPageActive) return; + if (!isSignedIn) return; + if (boardIds.length === 0) return; + + let cancelled = false; + const cleanups: Array<() => void> = []; + + boardIds.forEach((boardId, index) => { + const boardDelay = index * STREAM_CONNECT_SPACING_MS; + const abortController = new AbortController(); + const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF); + let reconnectTimeout: number | undefined; + let connectTimer: number | undefined; + + const connect = async () => { + try { + const since = latestTimestamp( + (item) => + item.board_id === boardId && + (item.event_type === "board.chat" || + item.event_type === "board.command"), + ); + const params = { is_chat: true, ...(since ? { since } : {}) }; + const streamResult = + await streamBoardMemoryApiV1BoardsBoardIdMemoryStreamGet( + boardId, + params, + { + headers: { Accept: "text/event-stream" }, + signal: abortController.signal, + }, + ); + if (streamResult.status !== 200) { + throw new Error("Unable to connect board chat stream."); + } + const response = streamResult.data as Response; + if (!(response instanceof Response) || !response.body) { + throw new Error("Unable to connect board chat stream."); + } + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (!cancelled) { + const { value, done } = await reader.read(); + if (done) break; + if (value && value.length) { + backoff.reset(); + } + buffer += decoder.decode(value, { stream: true }); + buffer = buffer.replace(/\r\n/g, "\n"); + let boundary = buffer.indexOf("\n\n"); + while (boundary !== -1) { + const raw = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + const lines = raw.split("\n"); + let eventType = "message"; + let data = ""; + for (const line of lines) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + } else if (line.startsWith("data:")) { + data += line.slice(5).trim(); + } + } + if (eventType === "memory" && data) { + try { + const payload = JSON.parse(data) as { + memory?: BoardMemoryRead; + }; + if (payload.memory?.tags?.includes("chat")) { + pushFeedItem(mapBoardChat(payload.memory, boardId)); + } + } catch { + // Ignore malformed payloads. + } + } + boundary = buffer.indexOf("\n\n"); + } + } + } catch { + // Reconnect handled below. + } + + if (!cancelled) { + if (reconnectTimeout !== undefined) { + window.clearTimeout(reconnectTimeout); + } + const delay = backoff.nextDelayMs(); + reconnectTimeout = window.setTimeout(() => { + reconnectTimeout = undefined; + void connect(); + }, delay); + } + }; + + connectTimer = window.setTimeout(() => { + connectTimer = undefined; + void connect(); + }, boardDelay); + + cleanups.push(() => { + abortController.abort(); + if (connectTimer !== undefined) { + window.clearTimeout(connectTimer); + } + if (reconnectTimeout !== undefined) { + window.clearTimeout(reconnectTimeout); + } + }); + }); + + return () => { + cancelled = true; + cleanups.forEach((fn) => fn()); + }; + }, [ + boardIds, + isPageActive, + isSignedIn, + latestTimestamp, + mapBoardChat, + pushFeedItem, + ]); + + useEffect(() => { + if (!isPageActive) return; + if (!isSignedIn || !isOrgAdmin) return; + + let cancelled = false; const abortController = new AbortController(); const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF); let reconnectTimeout: number | undefined; const connect = async () => { try { - const since = latestTimestamp(feedItemsRef.current); - const streamResult = - await streamTaskCommentFeedApiV1ActivityTaskCommentsStreamGet( - since ? { since } : undefined, - { - headers: { Accept: "text/event-stream" }, - signal: abortController.signal, - }, - ); + const since = latestTimestamp((item) => + item.event_type.startsWith("agent."), + ); + const streamResult = await streamAgentsApiV1AgentsStreamGet( + since ? { since } : undefined, + { + headers: { Accept: "text/event-stream" }, + signal: abortController.signal, + }, + ); if (streamResult.status !== 200) { - throw new Error("Unable to connect task comment feed stream."); + throw new Error("Unable to connect agent stream."); } const response = streamResult.data as Response; if (!(response instanceof Response) || !response.body) { - throw new Error("Unable to connect task comment feed stream."); + throw new Error("Unable to connect agent stream."); } const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ""; - while (!isCancelled) { + while (!cancelled) { const { value, done } = await reader.read(); if (done) break; if (value && value.length) { @@ -252,16 +1208,21 @@ export default function ActivityPage() { data += line.slice(5).trim(); } } - if (eventType === "comment" && data) { + if (eventType === "agent" && data) { try { - const payload = JSON.parse(data) as { - comment?: ActivityTaskCommentFeedItemRead; - }; - if (payload.comment) { - pushFeedItem(payload.comment); + const payload = JSON.parse(data) as { agent?: AgentRead }; + if (payload.agent) { + const normalized = normalizeAgent(payload.agent); + const previous = + agentsByIdRef.current.get(normalized.id) ?? null; + agentsByIdRef.current.set(normalized.id, normalized); + const mapped = mapAgentEvent(normalized, previous, false); + if (mapped) { + pushFeedItem(mapped); + } } } catch { - // ignore malformed + // Ignore malformed payloads. } } boundary = buffer.indexOf("\n\n"); @@ -271,7 +1232,7 @@ export default function ActivityPage() { // Reconnect handled below. } - if (!isCancelled) { + if (!cancelled) { if (reconnectTimeout !== undefined) { window.clearTimeout(reconnectTimeout); } @@ -285,13 +1246,20 @@ export default function ActivityPage() { void connect(); return () => { - isCancelled = true; + cancelled = true; abortController.abort(); if (reconnectTimeout !== undefined) { window.clearTimeout(reconnectTimeout); } }; - }, [isPageActive, isSignedIn, pushFeedItem]); + }, [ + isOrgAdmin, + isPageActive, + isSignedIn, + latestTimestamp, + mapAgentEvent, + pushFeedItem, + ]); const orderedFeed = useMemo(() => { return [...feedItems].sort((a, b) => { @@ -328,7 +1296,8 @@ export default function ActivityPage() {

- Realtime task comments across all boards. + Realtime task, approval, agent, and board-chat activity + across all boards.

@@ -337,8 +1306,8 @@ export default function ActivityPage() {
} /> diff --git a/frontend/src/app/boards/[boardId]/page.tsx b/frontend/src/app/boards/[boardId]/page.tsx index 3906c1b1..4bee4332 100644 --- a/frontend/src/app/boards/[boardId]/page.tsx +++ b/frontend/src/app/boards/[boardId]/page.tsx @@ -53,10 +53,7 @@ import { streamApprovalsApiV1BoardsBoardIdApprovalsStreamGet, updateApprovalApiV1BoardsBoardIdApprovalsApprovalIdPatch, } from "@/api/generated/approvals/approvals"; -import { - listTaskCommentFeedApiV1ActivityTaskCommentsGet, - streamTaskCommentFeedApiV1ActivityTaskCommentsStreamGet, -} from "@/api/generated/activity/activity"; +import { listActivityApiV1ActivityGet } from "@/api/generated/activity/activity"; import { getBoardGroupSnapshotApiV1BoardsBoardIdGroupSnapshotGet, getBoardSnapshotApiV1BoardsBoardIdSnapshotGet, @@ -83,6 +80,7 @@ import type { BoardGroupSnapshot, BoardMemoryRead, BoardRead, + ActivityEventRead, OrganizationMemberRead, TaskCardRead, TaskCommentRead, @@ -115,6 +113,295 @@ type Approval = ApprovalRead & { status: string }; type BoardChatMessage = BoardMemoryRead; +type LiveFeedEventType = + | "task.comment" + | "task.created" + | "task.updated" + | "task.status_changed" + | "board.chat" + | "board.command" + | "agent.created" + | "agent.online" + | "agent.offline" + | "agent.updated" + | "approval.created" + | "approval.updated" + | "approval.approved" + | "approval.rejected"; + +type LiveFeedItem = { + id: string; + created_at: string; + message: string | null; + agent_id: string | null; + actor_name?: string | null; + task_id: string | null; + title?: string | null; + event_type: LiveFeedEventType; +}; + +const LIVE_FEED_EVENT_TYPES = new Set([ + "task.comment", + "task.created", + "task.updated", + "task.status_changed", + "board.chat", + "board.command", + "agent.created", + "agent.online", + "agent.offline", + "agent.updated", + "approval.created", + "approval.updated", + "approval.approved", + "approval.rejected", +]); + +const isLiveFeedEventType = (value: string): value is LiveFeedEventType => + LIVE_FEED_EVENT_TYPES.has(value as LiveFeedEventType); + +const toLiveFeedFromActivity = ( + event: ActivityEventRead, +): LiveFeedItem | null => { + if (!isLiveFeedEventType(event.event_type)) { + return null; + } + return { + id: event.id, + created_at: event.created_at, + message: event.message ?? null, + agent_id: event.agent_id ?? null, + task_id: event.task_id ?? null, + title: null, + event_type: event.event_type, + }; +}; + +const toLiveFeedFromComment = (comment: TaskCommentRead): LiveFeedItem => ({ + id: comment.id, + created_at: comment.created_at, + message: comment.message ?? null, + agent_id: comment.agent_id ?? null, + actor_name: null, + task_id: comment.task_id ?? null, + title: null, + event_type: "task.comment", +}); + +const toLiveFeedFromBoardChat = (memory: BoardChatMessage): LiveFeedItem => { + const content = (memory.content ?? "").trim(); + const actorName = (memory.source ?? "User").trim() || "User"; + const isCommand = content.startsWith("/"); + return { + id: `chat:${memory.id}`, + created_at: memory.created_at, + message: content || null, + agent_id: null, + actor_name: actorName, + task_id: null, + title: isCommand ? "Board command" : "Board chat", + event_type: isCommand ? "board.command" : "board.chat", + }; +}; + +const normalizeAgentStatus = (value?: string | null): string => { + const status = (value ?? "").trim().toLowerCase(); + return status || "offline"; +}; + +const humanizeAgentStatus = (value: string): string => + value.replace(/_/g, " ").trim() || "offline"; + +const toLiveFeedFromAgentSnapshot = (agent: Agent): LiveFeedItem => { + const status = normalizeAgentStatus(agent.status); + const stamp = agent.last_seen_at ?? agent.updated_at ?? agent.created_at; + const eventType: LiveFeedEventType = + status === "online" + ? "agent.online" + : status === "offline" + ? "agent.offline" + : "agent.updated"; + return { + id: `agent:${agent.id}:snapshot:${status}:${stamp}`, + created_at: stamp, + message: `${agent.name} is ${humanizeAgentStatus(status)}.`, + agent_id: agent.id, + actor_name: null, + task_id: null, + title: `Agent · ${agent.name}`, + event_type: eventType, + }; +}; + +const toLiveFeedFromAgentUpdate = ( + agent: Agent, + previous: Agent | null, +): LiveFeedItem | null => { + const nextStatus = normalizeAgentStatus(agent.status); + const previousStatus = previous + ? normalizeAgentStatus(previous.status) + : null; + const statusChanged = + previousStatus !== null && nextStatus !== previousStatus; + const isNew = previous === null; + const profileChanged = + Boolean(previous) && + (previous?.name !== agent.name || + previous?.is_board_lead !== agent.is_board_lead || + JSON.stringify(previous?.identity_profile ?? {}) !== + JSON.stringify(agent.identity_profile ?? {})); + + let eventType: LiveFeedEventType; + if (isNew) { + eventType = "agent.created"; + } else if (statusChanged && nextStatus === "online") { + eventType = "agent.online"; + } else if (statusChanged && nextStatus === "offline") { + eventType = "agent.offline"; + } else if (statusChanged || profileChanged) { + eventType = "agent.updated"; + } else { + return null; + } + + const stamp = agent.last_seen_at ?? agent.updated_at ?? agent.created_at; + const message = + eventType === "agent.created" + ? `${agent.name} joined this board.` + : eventType === "agent.online" + ? `${agent.name} is online.` + : eventType === "agent.offline" + ? `${agent.name} is offline.` + : `${agent.name} updated (${humanizeAgentStatus(nextStatus)}).`; + return { + id: `agent:${agent.id}:${eventType}:${stamp}`, + created_at: stamp, + message, + agent_id: agent.id, + actor_name: null, + task_id: null, + title: `Agent · ${agent.name}`, + event_type: eventType, + }; +}; + +const humanizeLiveFeedApprovalAction = (value: string): string => { + const cleaned = value.replace(/[._-]+/g, " ").trim(); + if (!cleaned) return "Approval"; + return cleaned.charAt(0).toUpperCase() + cleaned.slice(1); +}; + +const toLiveFeedFromApproval = ( + approval: ApprovalRead, + previous: ApprovalRead | null = null, +): LiveFeedItem => { + const nextStatus = approval.status ?? "pending"; + const previousStatus = previous?.status ?? null; + const eventType: LiveFeedEventType = + previousStatus === null + ? nextStatus === "approved" + ? "approval.approved" + : nextStatus === "rejected" + ? "approval.rejected" + : "approval.created" + : nextStatus !== previousStatus + ? nextStatus === "approved" + ? "approval.approved" + : nextStatus === "rejected" + ? "approval.rejected" + : "approval.updated" + : "approval.updated"; + const stamp = + eventType === "approval.created" + ? approval.created_at + : (approval.resolved_at ?? approval.created_at); + const action = humanizeLiveFeedApprovalAction(approval.action_type); + const statusText = + nextStatus === "approved" + ? "approved" + : nextStatus === "rejected" + ? "rejected" + : "pending"; + const message = + eventType === "approval.created" + ? `${action} requested (${approval.confidence}% confidence).` + : eventType === "approval.approved" + ? `${action} approved (${approval.confidence}% confidence).` + : eventType === "approval.rejected" + ? `${action} rejected (${approval.confidence}% confidence).` + : `${action} updated (${statusText}, ${approval.confidence}% confidence).`; + return { + id: `approval:${approval.id}:${eventType}:${stamp}`, + created_at: stamp, + message, + agent_id: approval.agent_id ?? null, + actor_name: null, + task_id: approval.task_id ?? null, + title: `Approval · ${action}`, + event_type: eventType, + }; +}; + +const liveFeedEventLabel = (eventType: LiveFeedEventType): string => { + if (eventType === "task.comment") return "Comment"; + if (eventType === "task.created") return "Created"; + if (eventType === "task.status_changed") return "Status"; + if (eventType === "board.chat") return "Chat"; + if (eventType === "board.command") return "Command"; + if (eventType === "agent.created") return "Agent"; + if (eventType === "agent.online") return "Online"; + if (eventType === "agent.offline") return "Offline"; + if (eventType === "agent.updated") return "Agent update"; + if (eventType === "approval.created") return "Approval"; + if (eventType === "approval.updated") return "Approval update"; + if (eventType === "approval.approved") return "Approved"; + if (eventType === "approval.rejected") return "Rejected"; + return "Updated"; +}; + +const liveFeedEventPillClass = (eventType: LiveFeedEventType): string => { + if (eventType === "task.comment") { + return "border-blue-200 bg-blue-50 text-blue-700"; + } + if (eventType === "task.created") { + return "border-emerald-200 bg-emerald-50 text-emerald-700"; + } + if (eventType === "task.status_changed") { + return "border-amber-200 bg-amber-50 text-amber-700"; + } + if (eventType === "board.chat") { + return "border-teal-200 bg-teal-50 text-teal-700"; + } + if (eventType === "board.command") { + return "border-fuchsia-200 bg-fuchsia-50 text-fuchsia-700"; + } + if (eventType === "agent.created") { + return "border-violet-200 bg-violet-50 text-violet-700"; + } + if (eventType === "agent.online") { + return "border-lime-200 bg-lime-50 text-lime-700"; + } + if (eventType === "agent.offline") { + return "border-slate-300 bg-slate-100 text-slate-700"; + } + if (eventType === "agent.updated") { + return "border-indigo-200 bg-indigo-50 text-indigo-700"; + } + if (eventType === "approval.created") { + return "border-cyan-200 bg-cyan-50 text-cyan-700"; + } + if (eventType === "approval.updated") { + return "border-sky-200 bg-sky-50 text-sky-700"; + } + if (eventType === "approval.approved") { + return "border-emerald-200 bg-emerald-50 text-emerald-700"; + } + if (eventType === "approval.rejected") { + return "border-rose-200 bg-rose-50 text-rose-700"; + } + return "border-slate-200 bg-slate-100 text-slate-700"; +}; + const normalizeTask = (task: TaskCardRead): Task => ({ ...task, status: task.status ?? "inbox", @@ -271,7 +558,7 @@ const ChatMessageCard = memo(function ChatMessageCard({ ChatMessageCard.displayName = "ChatMessageCard"; const LiveFeedCard = memo(function LiveFeedCard({ - comment, + item, taskTitle, authorName, authorRole, @@ -279,7 +566,7 @@ const LiveFeedCard = memo(function LiveFeedCard({ onViewTask, isNew, }: { - comment: TaskComment; + item: LiveFeedItem; taskTitle: string; authorName: string; authorRole?: string | null; @@ -287,7 +574,9 @@ const LiveFeedCard = memo(function LiveFeedCard({ onViewTask?: () => void; isNew?: boolean; }) { - const message = (comment.message ?? "").trim(); + const message = (item.message ?? "").trim(); + const eventLabel = liveFeedEventLabel(item.event_type); + const eventPillClass = liveFeedEventPillClass(item.event_type); return (
{taskTitle} - {onViewTask ? ( - - ) : null}
+ + {eventLabel} + {authorName} {authorRole ? ( <> @@ -345,7 +631,7 @@ const LiveFeedCard = memo(function LiveFeedCard({ ) : null} · - {formatShortTimestamp(comment.created_at)} + {formatShortTimestamp(item.created_at)}
@@ -413,8 +699,8 @@ export default function BoardDetailPage() { const selectedTaskIdRef = useRef(null); const openedTaskIdFromUrlRef = useRef(null); const [comments, setComments] = useState([]); - const [liveFeed, setLiveFeed] = useState([]); - const liveFeedRef = useRef([]); + const [liveFeed, setLiveFeed] = useState([]); + const liveFeedRef = useRef([]); const liveFeedFlashTimersRef = useRef>({}); const [liveFeedFlashIds, setLiveFeedFlashIds] = useState< Record @@ -467,15 +753,15 @@ export default function BoardDetailPage() { const isLiveFeedOpenRef = useRef(false); const toastIdRef = useRef(0); const toastTimersRef = useRef>({}); - const pushLiveFeed = useCallback((comment: TaskComment) => { + const pushLiveFeed = useCallback((item: LiveFeedItem) => { const alreadySeen = liveFeedRef.current.some( - (item) => item.id === comment.id, + (existing) => existing.id === item.id, ); setLiveFeed((prev) => { - if (prev.some((item) => item.id === comment.id)) { + if (prev.some((existing) => existing.id === item.id)) { return prev; } - const next = [comment, ...prev]; + const next = [item, ...prev]; return next.slice(0, 50); }); @@ -483,20 +769,20 @@ export default function BoardDetailPage() { if (!isLiveFeedOpenRef.current) return; setLiveFeedFlashIds((prev) => - prev[comment.id] ? prev : { ...prev, [comment.id]: true }, + prev[item.id] ? prev : { ...prev, [item.id]: true }, ); if (typeof window === "undefined") return; - const existingTimer = liveFeedFlashTimersRef.current[comment.id]; + const existingTimer = liveFeedFlashTimersRef.current[item.id]; if (existingTimer !== undefined) { window.clearTimeout(existingTimer); } - liveFeedFlashTimersRef.current[comment.id] = window.setTimeout(() => { - delete liveFeedFlashTimersRef.current[comment.id]; + liveFeedFlashTimersRef.current[item.id] = window.setTimeout(() => { + delete liveFeedFlashTimersRef.current[item.id]; setLiveFeedFlashIds((prev) => { - if (!prev[comment.id]) return prev; + if (!prev[item.id]) return prev; const next = { ...prev }; - delete next[comment.id]; + delete next[item.id]; return next; }); }, 2200); @@ -566,6 +852,7 @@ export default function BoardDetailPage() { useEffect(() => { if (!isLiveFeedOpen) return; if (!isSignedIn || !boardId) return; + if (isLoading) return; if (liveFeedHistoryLoadedRef.current) return; let cancelled = false; @@ -574,28 +861,83 @@ export default function BoardDetailPage() { const fetchHistory = async () => { try { - const result = await listTaskCommentFeedApiV1ActivityTaskCommentsGet({ - board_id: boardId, - limit: 200, - }); - if (cancelled) return; - if (result.status !== 200) { - throw new Error("Unable to load live feed."); + const sourceTasks = + tasksRef.current.length > 0 ? tasksRef.current : tasks; + const sourceApprovals = + approvalsRef.current.length > 0 ? approvalsRef.current : approvals; + const sourceAgents = + agentsRef.current.length > 0 ? agentsRef.current : agents; + const sourceChatMessages = + chatMessagesRef.current.length > 0 + ? chatMessagesRef.current + : chatMessages; + const boardTaskIds = new Set(sourceTasks.map((task) => task.id)); + const collected: LiveFeedItem[] = []; + const seen = new Set(); + const limit = 200; + const recentChatMessages = [...sourceChatMessages] + .sort((a, b) => { + const aTime = apiDatetimeToMs(a.created_at) ?? 0; + const bTime = apiDatetimeToMs(b.created_at) ?? 0; + return bTime - aTime; + }) + .slice(0, 50); + for (const memory of recentChatMessages) { + const chatItem = toLiveFeedFromBoardChat(memory); + if (seen.has(chatItem.id)) continue; + seen.add(chatItem.id); + collected.push(chatItem); + if (collected.length >= 200) break; + } + for (const agent of sourceAgents) { + if (collected.length >= 200) break; + const agentItem = toLiveFeedFromAgentSnapshot(agent); + if (seen.has(agentItem.id)) continue; + seen.add(agentItem.id); + collected.push(agentItem); + if (collected.length >= 200) break; + } + for (const approval of sourceApprovals) { + if (collected.length >= 200) break; + const approvalItem = toLiveFeedFromApproval(approval); + if (seen.has(approvalItem.id)) continue; + seen.add(approvalItem.id); + collected.push(approvalItem); + if (collected.length >= 200) break; + } + + for ( + let offset = 0; + collected.length < 200 && offset < 1000; + offset += limit + ) { + const result = await listActivityApiV1ActivityGet({ + limit, + offset, + }); + if (cancelled) return; + if (result.status !== 200) { + throw new Error("Unable to load live feed."); + } + const items = result.data.items ?? []; + for (const event of items) { + const mapped = toLiveFeedFromActivity(event); + if (!mapped?.task_id) continue; + if (!boardTaskIds.has(mapped.task_id)) continue; + if (seen.has(mapped.id)) continue; + seen.add(mapped.id); + collected.push(mapped); + if (collected.length >= 200) break; + } + if (collected.length >= 200 || items.length < limit) { + break; + } } - const items = result.data.items ?? []; liveFeedHistoryLoadedRef.current = true; - const mapped: TaskComment[] = items.map((item) => ({ - id: item.id, - message: item.message ?? null, - agent_id: item.agent_id ?? null, - task_id: item.task_id ?? null, - created_at: item.created_at, - })); - setLiveFeed((prev) => { - const map = new Map(); - [...prev, ...mapped].forEach((item) => map.set(item.id, item)); + const map = new Map(); + [...prev, ...collected].forEach((item) => map.set(item.id, item)); const merged = [...map.values()]; merged.sort((a, b) => { const aTime = apiDatetimeToMs(a.created_at) ?? 0; @@ -619,7 +961,16 @@ export default function BoardDetailPage() { return () => { cancelled = true; }; - }, [boardId, isLiveFeedOpen, isSignedIn]); + }, [ + agents, + approvals, + boardId, + chatMessages, + isLiveFeedOpen, + isLoading, + isSignedIn, + tasks, + ]); const [isDialogOpen, setIsDialogOpen] = useState(false); const [title, setTitle] = useState(""); @@ -831,7 +1182,7 @@ export default function BoardDetailPage() { useEffect(() => { if (!isPageActive) return; if (!isSignedIn || !boardId || !board) return; - if (!isChatOpen) return; + if (!isChatOpen && !isLiveFeedOpen) return; let isCancelled = false; const abortController = new AbortController(); const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF); @@ -891,6 +1242,7 @@ export default function BoardDetailPage() { memory?: BoardChatMessage; }; if (payload.memory?.tags?.includes("chat")) { + pushLiveFeed(toLiveFeedFromBoardChat(payload.memory)); setChatMessages((prev) => { const exists = prev.some( (item) => item.id === payload.memory?.id, @@ -936,126 +1288,15 @@ export default function BoardDetailPage() { window.clearTimeout(reconnectTimeout); } }; - }, [board, boardId, isChatOpen, isPageActive, isSignedIn]); - - useEffect(() => { - if (!isPageActive) return; - if (!isLiveFeedOpen) return; - if (!isSignedIn || !boardId) return; - let isCancelled = false; - const abortController = new AbortController(); - const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF); - let reconnectTimeout: number | undefined; - - const connect = async () => { - try { - const since = (() => { - let latestTime = 0; - liveFeedRef.current.forEach((comment) => { - const time = apiDatetimeToMs(comment.created_at); - if (time !== null && time > latestTime) { - latestTime = time; - } - }); - return latestTime ? new Date(latestTime).toISOString() : null; - })(); - - const streamResult = - await streamTaskCommentFeedApiV1ActivityTaskCommentsStreamGet( - { - board_id: boardId, - since: since ?? null, - }, - { - headers: { Accept: "text/event-stream" }, - signal: abortController.signal, - }, - ); - if (streamResult.status !== 200) { - throw new Error("Unable to connect live feed stream."); - } - const response = streamResult.data as Response; - if (!(response instanceof Response) || !response.body) { - throw new Error("Unable to connect live feed stream."); - } - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - while (!isCancelled) { - const { value, done } = await reader.read(); - if (done) break; - if (value && value.length) { - backoff.reset(); - } - buffer += decoder.decode(value, { stream: true }); - buffer = buffer.replace(/\r\n/g, "\n"); - let boundary = buffer.indexOf("\n\n"); - while (boundary !== -1) { - const raw = buffer.slice(0, boundary); - buffer = buffer.slice(boundary + 2); - const lines = raw.split("\n"); - let eventType = "message"; - let data = ""; - for (const line of lines) { - if (line.startsWith("event:")) { - eventType = line.slice(6).trim(); - } else if (line.startsWith("data:")) { - data += line.slice(5).trim(); - } - } - if (eventType === "comment" && data) { - try { - const payload = JSON.parse(data) as { - comment?: { - id: string; - created_at: string; - message?: string | null; - agent_id?: string | null; - task_id?: string | null; - }; - }; - if (payload.comment) { - pushLiveFeed({ - id: payload.comment.id, - created_at: payload.comment.created_at, - message: payload.comment.message ?? null, - agent_id: payload.comment.agent_id ?? null, - task_id: payload.comment.task_id ?? null, - }); - } - } catch { - // ignore malformed - } - } - boundary = buffer.indexOf("\n\n"); - } - } - } catch { - // Reconnect handled below. - } - - if (!isCancelled) { - if (reconnectTimeout !== undefined) { - window.clearTimeout(reconnectTimeout); - } - const delay = backoff.nextDelayMs(); - reconnectTimeout = window.setTimeout(() => { - reconnectTimeout = undefined; - void connect(); - }, delay); - } - }; - - void connect(); - return () => { - isCancelled = true; - abortController.abort(); - if (reconnectTimeout !== undefined) { - window.clearTimeout(reconnectTimeout); - } - }; - }, [boardId, isLiveFeedOpen, isPageActive, isSignedIn, pushLiveFeed]); + }, [ + board, + boardId, + isChatOpen, + isLiveFeedOpen, + isPageActive, + isSignedIn, + pushLiveFeed, + ]); useEffect(() => { if (!isPageActive) return; @@ -1129,6 +1370,13 @@ export default function BoardDetailPage() { }; if (payload.approval) { const normalized = normalizeApproval(payload.approval); + const previousApproval = + approvalsRef.current.find( + (item) => item.id === normalized.id, + ) ?? null; + pushLiveFeed( + toLiveFeedFromApproval(normalized, previousApproval), + ); setApprovals((prev) => { const index = prev.findIndex( (item) => item.id === normalized.id, @@ -1201,7 +1449,7 @@ export default function BoardDetailPage() { window.clearTimeout(reconnectTimeout); } }; - }, [board, boardId, isPageActive, isSignedIn]); + }, [board, boardId, isPageActive, isSignedIn, pushLiveFeed]); useEffect(() => { if (!selectedTask) { @@ -1279,14 +1527,22 @@ export default function BoardDetailPage() { try { const payload = JSON.parse(data) as { type?: string; + activity?: ActivityEventRead; task?: TaskRead; comment?: TaskCommentRead; }; + const liveEvent = payload.activity + ? toLiveFeedFromActivity(payload.activity) + : payload.type === "task.comment" && payload.comment + ? toLiveFeedFromComment(payload.comment) + : null; + if (liveEvent) { + pushLiveFeed(liveEvent); + } if ( payload.comment?.task_id && payload.type === "task.comment" ) { - pushLiveFeed(payload.comment); setComments((prev) => { if ( selectedTaskIdRef.current !== payload.comment?.task_id @@ -1454,6 +1710,17 @@ export default function BoardDetailPage() { const payload = JSON.parse(data) as { agent?: AgentRead }; if (payload.agent) { const normalized = normalizeAgent(payload.agent); + const previousAgent = + agentsRef.current.find( + (item) => item.id === normalized.id, + ) ?? null; + const liveEvent = toLiveFeedFromAgentUpdate( + normalized, + previousAgent, + ); + if (liveEvent) { + pushLiveFeed(liveEvent); + } setAgents((prev) => { const index = prev.findIndex( (item) => item.id === normalized.id, @@ -1500,7 +1767,7 @@ export default function BoardDetailPage() { window.clearTimeout(reconnectTimeout); } }; - }, [board, boardId, isOrgAdmin, isPageActive, isSignedIn]); + }, [board, boardId, isOrgAdmin, isPageActive, isSignedIn, pushLiveFeed]); const resetForm = () => { setTitle(""); @@ -1568,6 +1835,7 @@ export default function BoardDetailPage() { } const created = result.data; if (created.tags?.includes("chat")) { + pushLiveFeed(toLiveFeedFromBoardChat(created)); setChatMessages((prev) => { const exists = prev.some((item) => item.id === created.id); if (exists) return prev; @@ -1586,7 +1854,7 @@ export default function BoardDetailPage() { return { ok: false, error: message }; } }, - [boardId, isSignedIn], + [boardId, isSignedIn, pushLiveFeed], ); const handleSendChat = useCallback( @@ -3237,7 +3505,7 @@ export default function BoardDetailPage() { Live feed

- Realtime task comments across this board. + Realtime task, approval, agent, and board-chat activity.