feat(tasks): Enhance task streaming and comment validation with markdown support

This commit is contained in:
Abhimanyu Saharan
2026-02-05 03:05:14 +05:30
parent af3c437c0a
commit 5e342e6906
6 changed files with 1420 additions and 42 deletions

View File

@@ -1,9 +1,14 @@
from __future__ import annotations
from datetime import datetime
from datetime import datetime, timezone
import asyncio
import json
from collections import deque
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sse_starlette.sse import EventSourceResponse
from starlette.concurrency import run_in_threadpool
from sqlalchemy import asc, desc
from sqlmodel import Session, col, select
@@ -15,7 +20,7 @@ from app.api.deps import (
require_admin_or_agent,
)
from app.core.auth import AuthContext
from app.db.session import get_session
from app.db.session import engine, get_session
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.boards import Board
@@ -25,8 +30,14 @@ from app.services.activity_log import record_activity
router = APIRouter(prefix="/boards/{board_id}/tasks", tags=["tasks"])
REQUIRED_COMMENT_FIELDS = ("summary:", "details:", "next:")
ALLOWED_STATUSES = {"inbox", "in_progress", "review", "done"}
TASK_EVENT_TYPES = {
"task.created",
"task.updated",
"task.status_changed",
"task.comment",
}
SSE_SEEN_MAX = 2000
def validate_task_status(status_value: str) -> None:
@@ -37,16 +48,11 @@ def validate_task_status(status_value: str) -> None:
)
def is_valid_markdown_comment(message: str) -> bool:
content = message.strip()
if not content:
return False
lowered = content.lower()
if not all(field in lowered for field in REQUIRED_COMMENT_FIELDS):
return False
if "- " not in content and "* " not in content:
return False
return True
def _comment_validation_error() -> HTTPException:
return HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Comment is required.",
)
def has_valid_recent_comment(
@@ -68,7 +74,92 @@ def has_valid_recent_comment(
event = session.exec(statement).first()
if event is None or event.message is None:
return False
return is_valid_markdown_comment(event.message)
return bool(event.message.strip())
def _parse_since(value: str | None) -> datetime | None:
if not value:
return None
normalized = value.strip()
if not normalized:
return None
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(timezone.utc).replace(tzinfo=None)
return parsed
def _fetch_task_events(
board_id: UUID,
since: datetime,
) -> list[tuple[ActivityEvent, Task | None]]:
with Session(engine) as session:
task_ids = list(
session.exec(select(Task.id).where(col(Task.board_id) == board_id))
)
if not task_ids:
return []
statement = (
select(ActivityEvent, Task)
.outerjoin(Task, ActivityEvent.task_id == Task.id)
.where(col(ActivityEvent.task_id).in_(task_ids))
.where(col(ActivityEvent.event_type).in_(TASK_EVENT_TYPES))
.where(col(ActivityEvent.created_at) >= since)
.order_by(asc(col(ActivityEvent.created_at)))
)
return list(session.exec(statement))
def _serialize_task(task: Task | None) -> dict[str, object] | None:
if task is None:
return None
return TaskRead.model_validate(task).model_dump(mode="json")
def _serialize_comment(event: ActivityEvent) -> dict[str, object]:
return TaskCommentRead.model_validate(event).model_dump(mode="json")
@router.get("/stream")
async def stream_tasks(
request: Request,
board: Board = Depends(get_board_or_404),
actor: ActorContext = Depends(require_admin_or_agent),
since: str | None = Query(default=None),
) -> EventSourceResponse:
since_dt = _parse_since(since) or datetime.utcnow()
seen_ids: set[UUID] = set()
seen_queue: deque[UUID] = deque()
async def event_generator():
last_seen = since_dt
while True:
if await request.is_disconnected():
break
rows = await run_in_threadpool(_fetch_task_events, board.id, last_seen)
for event, task in rows:
if event.id in seen_ids:
continue
seen_ids.add(event.id)
seen_queue.append(event.id)
if len(seen_queue) > SSE_SEEN_MAX:
oldest = seen_queue.popleft()
seen_ids.discard(oldest)
if event.created_at > last_seen:
last_seen = event.created_at
payload: dict[str, object] = {"type": event.event_type}
if event.event_type == "task.comment":
payload["comment"] = _serialize_comment(event)
else:
payload["task"] = _serialize_task(task)
yield {"event": "task", "data": json.dumps(payload)}
await asyncio.sleep(2)
return EventSourceResponse(event_generator(), ping=15)
@router.get("", response_model=list[TaskRead])
@@ -85,7 +176,7 @@ def list_tasks(
if status_filter:
statuses = [s.strip() for s in status_filter.split(",") if s.strip()]
if statuses:
if any(status not in ALLOWED_STATUSES for status in statuses):
if any(status_value not in ALLOWED_STATUSES for status_value in statuses):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Unsupported task status filter.",
@@ -136,6 +227,8 @@ def update_task(
previous_status = task.status
updates = payload.model_dump(exclude_unset=True)
comment = updates.pop("comment", None)
if comment is not None and not comment.strip():
comment = None
if actor.actor_type == "agent":
if actor.agent and actor.agent.board_id and task.board_id:
if actor.agent.board_id != task.board_id:
@@ -171,8 +264,8 @@ def update_task(
if "status" in updates and updates["status"] == "review":
if comment is not None and comment.strip():
if not is_valid_markdown_comment(comment):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
if not comment.strip():
raise _comment_validation_error()
else:
if not has_valid_recent_comment(
session,
@@ -180,18 +273,16 @@ def update_task(
task.assigned_agent_id,
task.in_progress_at,
):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
raise _comment_validation_error()
session.add(task)
session.commit()
session.refresh(task)
if comment is not None and comment.strip():
if actor.actor_type == "agent" and not is_valid_markdown_comment(comment):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
event = ActivityEvent(
event_type="task.comment",
message=comment.strip(),
message=comment,
task_id=task.id,
agent_id=actor.agent.id if actor.actor_type == "agent" and actor.agent else None,
)
@@ -255,12 +346,10 @@ def create_task_comment(
if actor.agent.board_id and task.board_id and actor.agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if not payload.message.strip():
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
if actor.actor_type == "agent" and not is_valid_markdown_comment(payload.message):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
raise _comment_validation_error()
event = ActivityEvent(
event_type="task.comment",
message=payload.message.strip(),
message=payload.message,
task_id=task.id,
agent_id=actor.agent.id if actor.actor_type == "agent" and actor.agent else None,
)