From a32c01e4a80e26ad973113cf6e629d77db6c9f76 Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Sat, 7 Feb 2026 15:20:36 +0530 Subject: [PATCH 1/4] feat(gateway): implement gateway board management and messaging features --- backend/app/api/agent.py | 311 ++++++++++++++++++++ backend/app/schemas/gateway_coordination.py | 75 +++++ backend/app/services/board_leads.py | 108 +++++++ templates/HEARTBEAT_LEAD.md | 9 + templates/MAIN_AGENTS.md | 35 +++ 5 files changed, 538 insertions(+) create mode 100644 backend/app/schemas/gateway_coordination.py create mode 100644 backend/app/services/board_leads.py diff --git a/backend/app/api/agent.py b/backend/app/api/agent.py index 83cd1c49..b7f9474b 100644 --- a/backend/app/api/agent.py +++ b/backend/app/api/agent.py @@ -5,6 +5,7 @@ from typing import Any, cast from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import func from sqlmodel import col, select from sqlmodel.ext.asyncio.session import AsyncSession @@ -15,6 +16,7 @@ from app.api import board_onboarding as onboarding_api from app.api import tasks as tasks_api from app.api.deps import ActorContext, get_board_or_404, get_task_or_404 from app.core.agent_auth import AgentAuthContext, get_agent_auth_context +from app.core.config import settings from app.db.pagination import paginate from app.db.session import get_session from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig @@ -40,9 +42,19 @@ from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead from app.schemas.board_onboarding import BoardOnboardingAgentUpdate, BoardOnboardingRead from app.schemas.boards import BoardRead from app.schemas.common import OkResponse +from app.schemas.gateway_coordination import ( + GatewayBoardEnsureRequest, + GatewayBoardEnsureResponse, + GatewayLeadBroadcastBoardResult, + GatewayLeadBroadcastRequest, + GatewayLeadBroadcastResponse, + GatewayLeadMessageRequest, + GatewayLeadMessageResponse, +) from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate from app.services.activity_log import record_activity +from app.services.board_leads import ensure_board_lead_agent from app.services.task_dependencies import ( blocked_by_dependency_ids, dependency_status_by_id, @@ -70,6 +82,50 @@ async def _gateway_config(session: AsyncSession, board: Board) -> GatewayClientC return GatewayClientConfig(url=gateway.url, token=gateway.token) +def _slugify(value: str) -> str: + import re + + slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") + return slug or "board" + + +async def _require_gateway_main( + session: AsyncSession, + agent: Agent, +) -> tuple[Gateway, GatewayClientConfig]: + session_key = (agent.openclaw_session_id or "").strip() + if not session_key: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Agent missing session key") + gateway = ( + await session.exec(select(Gateway).where(col(Gateway.main_session_key) == session_key)) + ).first() + if gateway is None: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only the gateway main agent may call this endpoint.", + ) + if not gateway.url: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Gateway url is required", + ) + return gateway, GatewayClientConfig(url=gateway.url, token=gateway.token) + + +async def _require_gateway_board( + session: AsyncSession, + *, + gateway: Gateway, + board_id: UUID | str, +) -> Board: + board = await session.get(Board, board_id) + if board is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Board not found") + if board.gateway_id != gateway.id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + return board + + @router.get("/boards", response_model=DefaultLimitOffsetPage[BoardRead]) async def list_boards( session: AsyncSession = Depends(get_session), @@ -440,3 +496,258 @@ async def agent_heartbeat( session=session, actor=_actor(agent_ctx), ) + + +@router.post("/gateway/boards/ensure", response_model=GatewayBoardEnsureResponse) +async def ensure_gateway_board( + payload: GatewayBoardEnsureRequest, + session: AsyncSession = Depends(get_session), + agent_ctx: AgentAuthContext = Depends(get_agent_auth_context), +) -> GatewayBoardEnsureResponse: + gateway, config = await _require_gateway_main(session, agent_ctx.agent) + + requested_name = payload.name.strip() + requested_slug = _slugify(payload.slug.strip() if payload.slug else requested_name) + + # Try slug match first, then case-insensitive name match. + existing = ( + await session.exec( + select(Board) + .where(col(Board.gateway_id) == gateway.id) + .where(col(Board.slug) == requested_slug) + ) + ).first() + if existing is None: + existing = ( + await session.exec( + select(Board) + .where(col(Board.gateway_id) == gateway.id) + .where(func.lower(col(Board.name)) == requested_name.lower()) + ) + ).first() + + created = False + board = existing + if board is None: + slug = requested_slug + suffix = 2 + while True: + conflict = ( + await session.exec( + select(Board.id) + .where(col(Board.gateway_id) == gateway.id) + .where(col(Board.slug) == slug) + ) + ).first() + if conflict is None: + break + slug = f"{requested_slug}-{suffix}" + suffix += 1 + + board = Board( + name=requested_name, + slug=slug, + gateway_id=gateway.id, + board_type=payload.board_type, + objective=payload.objective.strip() if payload.objective else None, + success_metrics=payload.success_metrics, + target_date=payload.target_date, + goal_confirmed=False, + goal_source="gateway_main_agent", + ) + session.add(board) + await session.commit() + await session.refresh(board) + created = True + + lead, lead_created = await ensure_board_lead_agent( + session, + board=board, + gateway=gateway, + config=config, + user=None, + agent_name=payload.lead_agent_name.strip() if payload.lead_agent_name else None, + identity_profile=payload.lead_identity_profile, + action="provision", + ) + + return GatewayBoardEnsureResponse( + created=created, + lead_created=lead_created, + board_id=board.id, + lead_agent_id=lead.id, + board_name=board.name, + board_slug=board.slug, + lead_agent_name=lead.name, + ) + + +@router.post( + "/gateway/boards/{board_id}/lead/message", + response_model=GatewayLeadMessageResponse, +) +async def message_gateway_board_lead( + board_id: UUID, + payload: GatewayLeadMessageRequest, + session: AsyncSession = Depends(get_session), + agent_ctx: AgentAuthContext = Depends(get_agent_auth_context), +) -> GatewayLeadMessageResponse: + import json + + gateway, config = await _require_gateway_main(session, agent_ctx.agent) + board = await _require_gateway_board(session, gateway=gateway, board_id=board_id) + lead, lead_created = await ensure_board_lead_agent( + session, + board=board, + gateway=gateway, + config=config, + user=None, + action="provision", + ) + if not lead.openclaw_session_id: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Lead agent has no session key", + ) + + base_url = settings.base_url or "http://localhost:8000" + header = "GATEWAY MAIN QUESTION" if payload.kind == "question" else "GATEWAY MAIN HANDOFF" + correlation = payload.correlation_id.strip() if payload.correlation_id else "" + correlation_line = f"Correlation ID: {correlation}\n" if correlation else "" + tags = payload.reply_tags or ["gateway_main", "lead_reply"] + tags_json = json.dumps(tags) + reply_source = payload.reply_source or "lead_to_gateway_main" + + message = ( + f"{header}\n" + f"Board: {board.name}\n" + f"Board ID: {board.id}\n" + f"From agent: {agent_ctx.agent.name}\n" + f"{correlation_line}\n" + f"{payload.content.strip()}\n\n" + "Reply to the gateway main by writing a NON-chat memory item on this board:\n" + f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n" + f'Body: {{"content":"...","tags":{tags_json},"source":"{reply_source}"}}\n' + "Do NOT reply in OpenClaw chat." + ) + + try: + await ensure_session(lead.openclaw_session_id, config=config, label=lead.name) + await send_message(message, session_key=lead.openclaw_session_id, config=config) + except OpenClawGatewayError as exc: + record_activity( + session, + event_type="gateway.main.lead_message.failed", + message=f"Lead message failed for {board.name}: {exc}", + agent_id=agent_ctx.agent.id, + ) + await session.commit() + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc + + record_activity( + session, + event_type="gateway.main.lead_message.sent", + message=f"Sent {payload.kind} to lead for board: {board.name}.", + agent_id=agent_ctx.agent.id, + ) + await session.commit() + + return GatewayLeadMessageResponse( + board_id=board.id, + lead_agent_id=lead.id, + lead_agent_name=lead.name, + lead_created=lead_created, + ) + + +@router.post( + "/gateway/leads/broadcast", + response_model=GatewayLeadBroadcastResponse, +) +async def broadcast_gateway_lead_message( + payload: GatewayLeadBroadcastRequest, + session: AsyncSession = Depends(get_session), + agent_ctx: AgentAuthContext = Depends(get_agent_auth_context), +) -> GatewayLeadBroadcastResponse: + import json + + gateway, config = await _require_gateway_main(session, agent_ctx.agent) + + statement = select(Board).where(col(Board.gateway_id) == gateway.id).order_by( + col(Board.created_at).desc() + ) + if payload.board_ids: + statement = statement.where(col(Board.id).in_(payload.board_ids)) + boards = list(await session.exec(statement)) + + base_url = settings.base_url or "http://localhost:8000" + header = "GATEWAY MAIN QUESTION" if payload.kind == "question" else "GATEWAY MAIN HANDOFF" + correlation = payload.correlation_id.strip() if payload.correlation_id else "" + correlation_line = f"Correlation ID: {correlation}\n" if correlation else "" + tags = payload.reply_tags or ["gateway_main", "lead_reply"] + tags_json = json.dumps(tags) + reply_source = payload.reply_source or "lead_to_gateway_main" + + results: list[GatewayLeadBroadcastBoardResult] = [] + sent = 0 + failed = 0 + + for board in boards: + try: + lead, _lead_created = await ensure_board_lead_agent( + session, + board=board, + gateway=gateway, + config=config, + user=None, + action="provision", + ) + if not lead.openclaw_session_id: + raise ValueError("Lead agent has no session key") + message = ( + f"{header}\n" + f"Board: {board.name}\n" + f"Board ID: {board.id}\n" + f"From agent: {agent_ctx.agent.name}\n" + f"{correlation_line}\n" + f"{payload.content.strip()}\n\n" + "Reply to the gateway main by writing a NON-chat memory item on this board:\n" + f"POST {base_url}/api/v1/agent/boards/{board.id}/memory\n" + f'Body: {{"content":"...","tags":{tags_json},"source":"{reply_source}"}}\n' + "Do NOT reply in OpenClaw chat." + ) + await ensure_session(lead.openclaw_session_id, config=config, label=lead.name) + await send_message(message, session_key=lead.openclaw_session_id, config=config) + results.append( + GatewayLeadBroadcastBoardResult( + board_id=board.id, + lead_agent_id=lead.id, + lead_agent_name=lead.name, + ok=True, + ) + ) + sent += 1 + except Exception as exc: + results.append( + GatewayLeadBroadcastBoardResult( + board_id=board.id, + ok=False, + error=str(exc), + ) + ) + failed += 1 + + record_activity( + session, + event_type="gateway.main.lead_broadcast.sent", + message=f"Broadcast {payload.kind} to {sent} board leads (failed: {failed}).", + agent_id=agent_ctx.agent.id, + ) + await session.commit() + + return GatewayLeadBroadcastResponse( + ok=True, + sent=sent, + failed=failed, + results=results, + ) diff --git a/backend/app/schemas/gateway_coordination.py b/backend/app/schemas/gateway_coordination.py new file mode 100644 index 00000000..55a5b1e4 --- /dev/null +++ b/backend/app/schemas/gateway_coordination.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Literal +from uuid import UUID + +from sqlmodel import Field, SQLModel + +from app.schemas.common import NonEmptyStr + + +class GatewayBoardEnsureRequest(SQLModel): + name: NonEmptyStr + slug: str | None = None + board_type: Literal["goal", "general"] = "goal" + objective: str | None = None + success_metrics: dict[str, object] | None = None + target_date: datetime | None = None + lead_agent_name: str | None = None + lead_identity_profile: dict[str, str] | None = None + + +class GatewayBoardEnsureResponse(SQLModel): + created: bool = False + lead_created: bool = False + board_id: UUID + lead_agent_id: UUID | None = None + + # Convenience fields for callers that don't want to re-fetch. + board_name: str + board_slug: str + lead_agent_name: str | None = None + + +class GatewayLeadMessageRequest(SQLModel): + kind: Literal["question", "handoff"] = "question" + correlation_id: str | None = None + content: NonEmptyStr + + # How the lead should reply (defaults are interpreted by templates). + reply_tags: list[str] = Field(default_factory=lambda: ["gateway_main", "lead_reply"]) + reply_source: str | None = "lead_to_gateway_main" + + +class GatewayLeadMessageResponse(SQLModel): + ok: bool = True + board_id: UUID + lead_agent_id: UUID | None = None + lead_agent_name: str | None = None + lead_created: bool = False + + +class GatewayLeadBroadcastRequest(SQLModel): + kind: Literal["question", "handoff"] = "question" + correlation_id: str | None = None + content: NonEmptyStr + board_ids: list[UUID] | None = None + reply_tags: list[str] = Field(default_factory=lambda: ["gateway_main", "lead_reply"]) + reply_source: str | None = "lead_to_gateway_main" + + +class GatewayLeadBroadcastBoardResult(SQLModel): + board_id: UUID + lead_agent_id: UUID | None = None + lead_agent_name: str | None = None + ok: bool = False + error: str | None = None + + +class GatewayLeadBroadcastResponse(SQLModel): + ok: bool = True + sent: int = 0 + failed: int = 0 + results: list[GatewayLeadBroadcastBoardResult] = Field(default_factory=list) + diff --git a/backend/app/services/board_leads.py b/backend/app/services/board_leads.py new file mode 100644 index 00000000..89208a78 --- /dev/null +++ b/backend/app/services/board_leads.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +from typing import Any + +from sqlmodel import col, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.core.agent_tokens import generate_agent_token, hash_agent_token +from app.core.time import utcnow +from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig +from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message +from app.models.agents import Agent +from app.models.boards import Board +from app.models.gateways import Gateway +from app.models.users import User +from app.services.agent_provisioning import DEFAULT_HEARTBEAT_CONFIG, provision_agent + + +def lead_session_key(board: Board) -> str: + return f"agent:lead-{board.id}:main" + + +def lead_agent_name(_: Board) -> str: + return "Lead Agent" + + +async def ensure_board_lead_agent( + session: AsyncSession, + *, + board: Board, + gateway: Gateway, + config: GatewayClientConfig, + user: User | None, + agent_name: str | None = None, + identity_profile: dict[str, str] | None = None, + action: str = "provision", +) -> tuple[Agent, bool]: + existing = ( + await session.exec( + select(Agent) + .where(Agent.board_id == board.id) + .where(col(Agent.is_board_lead).is_(True)) + ) + ).first() + if existing: + desired_name = agent_name or lead_agent_name(board) + changed = False + if existing.name != desired_name: + existing.name = desired_name + changed = True + desired_session_key = lead_session_key(board) + if not existing.openclaw_session_id: + existing.openclaw_session_id = desired_session_key + changed = True + if changed: + existing.updated_at = utcnow() + session.add(existing) + await session.commit() + await session.refresh(existing) + return existing, False + + merged_identity_profile: dict[str, Any] = { + "role": "Board Lead", + "communication_style": "direct, concise, practical", + "emoji": ":gear:", + } + if identity_profile: + merged_identity_profile.update( + {key: value.strip() for key, value in identity_profile.items() if value.strip()} + ) + + agent = Agent( + name=agent_name or lead_agent_name(board), + status="provisioning", + board_id=board.id, + is_board_lead=True, + heartbeat_config=DEFAULT_HEARTBEAT_CONFIG.copy(), + identity_profile=merged_identity_profile, + openclaw_session_id=lead_session_key(board), + provision_requested_at=utcnow(), + provision_action=action, + ) + raw_token = generate_agent_token() + agent.agent_token_hash = hash_agent_token(raw_token) + session.add(agent) + await session.commit() + await session.refresh(agent) + + try: + await provision_agent(agent, board, gateway, raw_token, user, action=action) + if agent.openclaw_session_id: + await ensure_session(agent.openclaw_session_id, config=config, label=agent.name) + await send_message( + ( + f"Hello {agent.name}. Your workspace has been provisioned.\n\n" + "Start the agent, run BOOT.md, and if BOOTSTRAP.md exists run it once " + "then delete it. Begin heartbeats after startup." + ), + session_key=agent.openclaw_session_id, + config=config, + deliver=True, + ) + except OpenClawGatewayError: + # Best-effort provisioning. The board/agent rows should still exist. + pass + + return agent, True + diff --git a/templates/HEARTBEAT_LEAD.md b/templates/HEARTBEAT_LEAD.md index a972a848..5f2484f6 100644 --- a/templates/HEARTBEAT_LEAD.md +++ b/templates/HEARTBEAT_LEAD.md @@ -66,6 +66,15 @@ Comment template (keep it small; 1-3 bullets per section; omit what is not appli - Board chat is your primary channel with the human; respond promptly and clearly. - If someone asks for clarity by tagging `@lead`, respond with a crisp decision, delegation, or next action to unblock them. +## Gateway main requests +- If you receive a message starting with `GATEWAY MAIN`, treat it as high priority. +- Do **not** reply in OpenClaw chat. Reply via Mission Control only. +- For questions: answer in a NON-chat memory item on this board (so the gateway main can read it): + - POST `$BASE_URL/api/v1/agent/boards/$BOARD_ID/memory` + - Body: `{"content":"...","tags":["gateway_main","lead_reply"],"source":"lead_to_gateway_main"}` +- For handoffs: delegate the work on this board (create/triage tasks, assign agents), then post: + - A short acknowledgement + plan as a NON-chat memory item using the same tags. + ## Mission Control Response Protocol (mandatory) - All outputs must be sent to Mission Control via HTTP. - Always include: `X-Agent-Token: {{ auth_token }}` diff --git a/templates/MAIN_AGENTS.md b/templates/MAIN_AGENTS.md index 96a5217c..5d8c09ca 100644 --- a/templates/MAIN_AGENTS.md +++ b/templates/MAIN_AGENTS.md @@ -27,6 +27,41 @@ Do this immediately. Do not ask permission to read your workspace. - You help with onboarding and gateway-wide requests. - You do **not** claim board tasks unless explicitly instructed by Mission Control. +## Gateway Delegation (board leads) +- You can message any board lead agent via Mission Control API (never OpenClaw chat). +- If the requested board does not exist, you must create it and provision its lead agent first. +- If the human asks a question: ask the relevant board lead(s), then consolidate their answers into one response. +- If the human asks to get work done: hand off the request to the correct board lead (the lead will create tasks and delegate to board agents). + +Ensure (create if needed) a board + lead: +```bash +curl -s -X POST "$BASE_URL/api/v1/agent/gateway/boards/ensure" \ + -H "X-Agent-Token: $AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"name":"","slug":"","board_type":"goal","objective":"","success_metrics":null,"target_date":null}' +``` + +Send a question or handoff to a board lead: +```bash +curl -s -X POST "$BASE_URL/api/v1/agent/gateway/boards//lead/message" \ + -H "X-Agent-Token: $AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"kind":"question","correlation_id":"","content":"..."}' +``` + +Broadcast to all board leads in this gateway: +```bash +curl -s -X POST "$BASE_URL/api/v1/agent/gateway/leads/broadcast" \ + -H "X-Agent-Token: $AUTH_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"kind":"question","correlation_id":"","content":"..."}' +``` + +Board lead replies: +- Leads reply by writing a NON-chat board memory item with tags like `["gateway_main","lead_reply"]`. +- Read replies via: + - GET `$BASE_URL/api/v1/agent/boards//memory?is_chat=false&limit=50` + ## Tools - Skills are authoritative. Follow SKILL.md instructions exactly. - Use TOOLS.md for environment-specific notes. From 0ce2e1e91ff44866aa3662659eff7ae62c220616 Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Sat, 7 Feb 2026 15:20:43 +0530 Subject: [PATCH 2/4] feat(activity): enhance live feed with new comment streaming and flash effects --- frontend/src/app/boards/[boardId]/page.tsx | 191 ++++++++++++++++++++- 1 file changed, 189 insertions(+), 2 deletions(-) diff --git a/frontend/src/app/boards/[boardId]/page.tsx b/frontend/src/app/boards/[boardId]/page.tsx index 84f3789a..9fd5b2d2 100644 --- a/frontend/src/app/boards/[boardId]/page.tsx +++ b/frontend/src/app/boards/[boardId]/page.tsx @@ -50,7 +50,10 @@ import { streamApprovalsApiV1BoardsBoardIdApprovalsStreamGet, updateApprovalApiV1BoardsBoardIdApprovalsApprovalIdPatch, } from "@/api/generated/approvals/approvals"; -import { listTaskCommentFeedApiV1ActivityTaskCommentsGet } from "@/api/generated/activity/activity"; +import { + listTaskCommentFeedApiV1ActivityTaskCommentsGet, + streamTaskCommentFeedApiV1ActivityTaskCommentsStreamGet, +} from "@/api/generated/activity/activity"; import { getBoardSnapshotApiV1BoardsBoardIdSnapshotGet } from "@/api/generated/boards/boards"; import { createBoardMemoryApiV1BoardsBoardIdMemoryPost, @@ -219,6 +222,7 @@ const LiveFeedCard = memo(function LiveFeedCard({ authorRole, authorAvatar, onViewTask, + isNew, }: { comment: TaskComment; taskTitle: string; @@ -226,10 +230,18 @@ const LiveFeedCard = memo(function LiveFeedCard({ authorRole?: string | null; authorAvatar: string; onViewTask?: () => void; + isNew?: boolean; }) { const message = (comment.message ?? "").trim(); return ( -
+
{authorAvatar} @@ -316,6 +328,11 @@ export default function BoardDetailPage() { const openedTaskIdFromUrlRef = useRef(null); const [comments, setComments] = useState([]); const [liveFeed, setLiveFeed] = useState([]); + const liveFeedRef = useRef([]); + const liveFeedFlashTimersRef = useRef>({}); + const [liveFeedFlashIds, setLiveFeedFlashIds] = useState< + Record + >({}); const [isLiveFeedHistoryLoading, setIsLiveFeedHistoryLoading] = useState(false); const [liveFeedHistoryError, setLiveFeedHistoryError] = useState< @@ -359,7 +376,9 @@ export default function BoardDetailPage() { const [deleteTaskError, setDeleteTaskError] = useState(null); const [viewMode, setViewMode] = useState<"board" | "list">("board"); const [isLiveFeedOpen, setIsLiveFeedOpen] = useState(false); + const isLiveFeedOpenRef = useRef(false); const pushLiveFeed = useCallback((comment: TaskComment) => { + const alreadySeen = liveFeedRef.current.some((item) => item.id === comment.id); setLiveFeed((prev) => { if (prev.some((item) => item.id === comment.id)) { return prev; @@ -367,6 +386,28 @@ export default function BoardDetailPage() { const next = [comment, ...prev]; return next.slice(0, 50); }); + + if (alreadySeen) return; + if (!isLiveFeedOpenRef.current) return; + + setLiveFeedFlashIds((prev) => + prev[comment.id] ? prev : { ...prev, [comment.id]: true }, + ); + + if (typeof window === "undefined") return; + const existingTimer = liveFeedFlashTimersRef.current[comment.id]; + if (existingTimer !== undefined) { + window.clearTimeout(existingTimer); + } + liveFeedFlashTimersRef.current[comment.id] = window.setTimeout(() => { + delete liveFeedFlashTimersRef.current[comment.id]; + setLiveFeedFlashIds((prev) => { + if (!prev[comment.id]) return prev; + const next = { ...prev }; + delete next[comment.id]; + return next; + }); + }, 2200); }, []); useEffect(() => { @@ -374,8 +415,26 @@ export default function BoardDetailPage() { setIsLiveFeedHistoryLoading(false); setLiveFeedHistoryError(null); setLiveFeed([]); + setLiveFeedFlashIds({}); + if (typeof window !== "undefined") { + Object.values(liveFeedFlashTimersRef.current).forEach((timerId) => { + window.clearTimeout(timerId); + }); + } + liveFeedFlashTimersRef.current = {}; }, [boardId]); + useEffect(() => { + return () => { + if (typeof window !== "undefined") { + Object.values(liveFeedFlashTimersRef.current).forEach((timerId) => { + window.clearTimeout(timerId); + }); + } + liveFeedFlashTimersRef.current = {}; + }; + }, []); + useEffect(() => { if (!isLiveFeedOpen) return; if (!isSignedIn || !boardId) return; @@ -576,6 +635,14 @@ export default function BoardDetailPage() { chatMessagesRef.current = chatMessages; }, [chatMessages]); + useEffect(() => { + liveFeedRef.current = liveFeed; + }, [liveFeed]); + + useEffect(() => { + isLiveFeedOpenRef.current = isLiveFeedOpen; + }, [isLiveFeedOpen]); + useEffect(() => { if (!isChatOpen) return; const timeout = window.setTimeout(() => { @@ -716,6 +783,125 @@ export default function BoardDetailPage() { }; }, [board, boardId, isChatOpen, isPageActive, isSignedIn]); + useEffect(() => { + if (!isPageActive) return; + if (!isLiveFeedOpen) return; + if (!isSignedIn || !boardId) return; + let isCancelled = false; + const abortController = new AbortController(); + const backoff = createExponentialBackoff(SSE_RECONNECT_BACKOFF); + let reconnectTimeout: number | undefined; + + const connect = async () => { + try { + const since = (() => { + let latestTime = 0; + liveFeedRef.current.forEach((comment) => { + const time = apiDatetimeToMs(comment.created_at); + if (time !== null && time > latestTime) { + latestTime = time; + } + }); + return latestTime ? new Date(latestTime).toISOString() : null; + })(); + + const streamResult = + await streamTaskCommentFeedApiV1ActivityTaskCommentsStreamGet( + { + board_id: boardId, + since: since ?? null, + }, + { + headers: { Accept: "text/event-stream" }, + signal: abortController.signal, + }, + ); + if (streamResult.status !== 200) { + throw new Error("Unable to connect live feed stream."); + } + const response = streamResult.data as Response; + if (!(response instanceof Response) || !response.body) { + throw new Error("Unable to connect live feed stream."); + } + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (!isCancelled) { + const { value, done } = await reader.read(); + if (done) break; + if (value && value.length) { + backoff.reset(); + } + buffer += decoder.decode(value, { stream: true }); + buffer = buffer.replace(/\r\n/g, "\n"); + let boundary = buffer.indexOf("\n\n"); + while (boundary !== -1) { + const raw = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + const lines = raw.split("\n"); + let eventType = "message"; + let data = ""; + for (const line of lines) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + } else if (line.startsWith("data:")) { + data += line.slice(5).trim(); + } + } + if (eventType === "comment" && data) { + try { + const payload = JSON.parse(data) as { + comment?: { + id: string; + created_at: string; + message?: string | null; + agent_id?: string | null; + task_id?: string | null; + }; + }; + if (payload.comment) { + pushLiveFeed({ + id: payload.comment.id, + created_at: payload.comment.created_at, + message: payload.comment.message ?? null, + agent_id: payload.comment.agent_id ?? null, + task_id: payload.comment.task_id ?? null, + }); + } + } catch { + // ignore malformed + } + } + boundary = buffer.indexOf("\n\n"); + } + } + } catch { + // Reconnect handled below. + } + + if (!isCancelled) { + if (reconnectTimeout !== undefined) { + window.clearTimeout(reconnectTimeout); + } + const delay = backoff.nextDelayMs(); + reconnectTimeout = window.setTimeout(() => { + reconnectTimeout = undefined; + void connect(); + }, delay); + } + }; + + void connect(); + return () => { + isCancelled = true; + abortController.abort(); + if (reconnectTimeout !== undefined) { + window.clearTimeout(reconnectTimeout); + } + }; + }, [boardId, isLiveFeedOpen, isPageActive, isSignedIn, pushLiveFeed]); + useEffect(() => { if (!isPageActive) return; if (!isSignedIn || !boardId || !board) return; @@ -2573,6 +2759,7 @@ export default function BoardDetailPage() { Date: Sat, 7 Feb 2026 15:27:42 +0530 Subject: [PATCH 3/4] feat(comments): update comment sorting to display latest comments first --- frontend/src/app/boards/[boardId]/page.tsx | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/frontend/src/app/boards/[boardId]/page.tsx b/frontend/src/app/boards/[boardId]/page.tsx index 9fd5b2d2..0778e73e 100644 --- a/frontend/src/app/boards/[boardId]/page.tsx +++ b/frontend/src/app/boards/[boardId]/page.tsx @@ -1135,18 +1135,23 @@ export default function BoardDetailPage() { payload.comment?.created_at, ); if (prev.length === 0 || createdMs === null) { - return [...prev, payload.comment as TaskComment]; + return [payload.comment as TaskComment, ...prev]; + } + const first = prev[0]; + const firstMs = apiDatetimeToMs(first?.created_at); + if (firstMs !== null && createdMs >= firstMs) { + return [payload.comment as TaskComment, ...prev]; } const last = prev[prev.length - 1]; const lastMs = apiDatetimeToMs(last?.created_at); - if (lastMs !== null && createdMs >= lastMs) { + if (lastMs !== null && createdMs <= lastMs) { return [...prev, payload.comment as TaskComment]; } const next = [...prev, payload.comment as TaskComment]; next.sort((a, b) => { const aTime = apiDatetimeToMs(a.created_at) ?? 0; const bTime = apiDatetimeToMs(b.created_at) ?? 0; - return aTime - bTime; + return bTime - aTime; }); return next; }); @@ -1607,7 +1612,7 @@ export default function BoardDetailPage() { items.sort((a, b) => { const aTime = apiDatetimeToMs(a.created_at) ?? 0; const bTime = apiDatetimeToMs(b.created_at) ?? 0; - return aTime - bTime; + return bTime - aTime; }); setComments(items); } catch (err) { From 7ef1f3e2f880b98d0bca62b59c9b2d1690b830af Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Sat, 7 Feb 2026 15:30:07 +0530 Subject: [PATCH 4/4] feat(chat): improve focus management for chat input after sending messages --- frontend/src/components/BoardChatComposer.tsx | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/frontend/src/components/BoardChatComposer.tsx b/frontend/src/components/BoardChatComposer.tsx index f2884402..68a07a06 100644 --- a/frontend/src/components/BoardChatComposer.tsx +++ b/frontend/src/components/BoardChatComposer.tsx @@ -1,6 +1,6 @@ "use client"; -import { memo, useCallback, useState } from "react"; +import { memo, useCallback, useEffect, useRef, useState } from "react"; import { Button } from "@/components/ui/button"; import { Textarea } from "@/components/ui/textarea"; @@ -17,12 +17,22 @@ function BoardChatComposerImpl({ onSend, }: BoardChatComposerProps) { const [value, setValue] = useState(""); + const textareaRef = useRef(null); + const shouldFocusAfterSendRef = useRef(false); + + useEffect(() => { + if (isSending) return; + if (!shouldFocusAfterSendRef.current) return; + shouldFocusAfterSendRef.current = false; + textareaRef.current?.focus(); + }, [isSending]); const send = useCallback(async () => { if (isSending) return; const trimmed = value.trim(); if (!trimmed) return; const ok = await onSend(trimmed); + shouldFocusAfterSendRef.current = true; if (ok) { setValue(""); } @@ -31,6 +41,7 @@ function BoardChatComposerImpl({ return (