feat: implement task dependencies with validation and update handling

This commit is contained in:
Abhimanyu Saharan
2026-02-07 00:21:44 +05:30
parent 8970ee6742
commit 4bab455912
34 changed files with 1241 additions and 157 deletions

View File

@@ -3,16 +3,16 @@ from __future__ import annotations
import asyncio
import json
from collections import deque
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Sequence
from datetime import datetime, timezone
from typing import cast
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import asc, delete, desc
from sqlalchemy import asc, delete, desc, or_
from sqlmodel import col, select
from sqlmodel.sql.expression import Select
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.sql.expression import Select
from sse_starlette.sse import EventSourceResponse
from app.api.deps import (
@@ -33,13 +33,23 @@ from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.task_dependencies import TaskDependency
from app.models.task_fingerprints import TaskFingerprint
from app.models.tasks import Task
from app.schemas.common import OkResponse
from app.schemas.errors import BlockedTaskError
from app.schemas.pagination import DefaultLimitOffsetPage
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
from app.services.mentions import extract_mentions, matches_agent_mention
from app.services.task_dependencies import (
blocked_by_dependency_ids,
dependency_ids_by_task_id,
dependency_status_by_id,
dependent_task_ids,
replace_task_dependencies,
validate_dependency_update,
)
router = APIRouter(prefix="/boards/{board_id}/tasks", tags=["tasks"])
@@ -60,6 +70,16 @@ def _comment_validation_error() -> HTTPException:
)
def _blocked_task_error(blocked_by_task_ids: Sequence[UUID]) -> HTTPException:
return HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
"message": "Task is blocked by incomplete dependencies.",
"blocked_by_task_ids": [str(value) for value in blocked_by_task_ids],
},
)
async def has_valid_recent_comment(
session: AsyncSession,
task: Task,
@@ -124,6 +144,75 @@ def _lead_created_task(task: Task, lead: Agent) -> bool:
return task.auto_reason == f"lead_agent:{lead.id}"
async def _reconcile_dependents_for_dependency_toggle(
session: AsyncSession,
*,
board_id: UUID,
dependency_task: Task,
previous_status: str,
actor_agent_id: UUID | None,
) -> None:
done_toggled = (previous_status == "done") != (dependency_task.status == "done")
if not done_toggled:
return
dependent_ids = await dependent_task_ids(
session,
board_id=board_id,
dependency_task_id=dependency_task.id,
)
if not dependent_ids:
return
dependents = list(
await session.exec(
select(Task)
.where(col(Task.board_id) == board_id)
.where(col(Task.id).in_(dependent_ids))
)
)
reopened = previous_status == "done" and dependency_task.status != "done"
for dependent in dependents:
if dependent.status == "done":
continue
if reopened:
should_reset = (
dependent.status != "inbox"
or dependent.assigned_agent_id is not None
or dependent.in_progress_at is not None
)
if should_reset:
dependent.status = "inbox"
dependent.assigned_agent_id = None
dependent.in_progress_at = None
dependent.updated_at = utcnow()
session.add(dependent)
record_activity(
session,
event_type="task.status_changed",
task_id=dependent.id,
message=f"Task returned to inbox: dependency reopened ({dependency_task.title}).",
agent_id=actor_agent_id,
)
else:
record_activity(
session,
event_type="task.updated",
task_id=dependent.id,
message=f"Dependency completion changed: {dependency_task.title}.",
agent_id=actor_agent_id,
)
else:
record_activity(
session,
event_type="task.updated",
task_id=dependent.id,
message=f"Dependency completion changed: {dependency_task.title}.",
agent_id=actor_agent_id,
)
async def _fetch_task_events(
session: AsyncSession,
board_id: UUID,
@@ -144,12 +233,6 @@ async def _fetch_task_events(
return list(await session.exec(statement))
def _serialize_task(task: Task | None) -> dict[str, object] | None:
if task is None:
return None
return TaskRead.model_validate(task).model_dump(mode="json")
def _serialize_comment(event: ActivityEvent) -> dict[str, object]:
return TaskCommentRead.model_validate(event).model_dump(mode="json")
@@ -372,8 +455,30 @@ async def stream_tasks(
while True:
if await request.is_disconnected():
break
deps_map: dict[UUID, list[UUID]] = {}
dep_status: dict[UUID, str] = {}
async with async_session_maker() as session:
rows = await _fetch_task_events(session, board.id, last_seen)
task_ids = [
task.id
for event, task in rows
if task is not None and event.event_type != "task.comment"
]
if task_ids:
deps_map = await dependency_ids_by_task_id(
session,
board_id=board.id,
task_ids=list({*task_ids}),
)
dep_ids: list[UUID] = []
for value in deps_map.values():
dep_ids.extend(value)
if dep_ids:
dep_status = await dependency_status_by_id(
session,
board_id=board.id,
dependency_ids=list({*dep_ids}),
)
for event, task in rows:
if event.id in seen_ids:
continue
@@ -388,7 +493,27 @@ async def stream_tasks(
if event.event_type == "task.comment":
payload["comment"] = _serialize_comment(event)
else:
payload["task"] = _serialize_task(task)
if task is None:
payload["task"] = None
else:
dep_list = deps_map.get(task.id, [])
blocked_by = blocked_by_dependency_ids(
dependency_ids=dep_list,
status_by_id=dep_status,
)
if task.status == "done":
blocked_by = []
payload["task"] = (
TaskRead.model_validate(task, from_attributes=True)
.model_copy(
update={
"depends_on_task_ids": dep_list,
"blocked_by_task_ids": blocked_by,
"is_blocked": bool(blocked_by),
}
)
.model_dump(mode="json")
)
yield {"event": "task", "data": json.dumps(payload)}
await asyncio.sleep(2)
@@ -422,21 +547,80 @@ async def list_tasks(
if unassigned:
statement = statement.where(col(Task.assigned_agent_id).is_(None))
statement = statement.order_by(col(Task.created_at).desc())
return await paginate(session, statement)
async def _transform(items: Sequence[object]) -> Sequence[object]:
tasks = cast(Sequence[Task], items)
if not tasks:
return []
task_ids = [task.id for task in tasks]
deps_map = await dependency_ids_by_task_id(session, board_id=board.id, task_ids=task_ids)
dep_ids: list[UUID] = []
for value in deps_map.values():
dep_ids.extend(value)
dep_status = await dependency_status_by_id(
session,
board_id=board.id,
dependency_ids=list({*dep_ids}),
)
output: list[TaskRead] = []
for task in tasks:
dep_list = deps_map.get(task.id, [])
blocked_by = blocked_by_dependency_ids(dependency_ids=dep_list, status_by_id=dep_status)
if task.status == "done":
blocked_by = []
output.append(
TaskRead.model_validate(task, from_attributes=True).model_copy(
update={
"depends_on_task_ids": dep_list,
"blocked_by_task_ids": blocked_by,
"is_blocked": bool(blocked_by),
}
)
)
return output
return await paginate(session, statement, transformer=_transform)
@router.post("", response_model=TaskRead)
@router.post("", response_model=TaskRead, responses={409: {"model": BlockedTaskError}})
async def create_task(
payload: TaskCreate,
board: Board = Depends(get_board_or_404),
session: AsyncSession = Depends(get_session),
auth: AuthContext = Depends(require_admin_auth),
) -> Task:
task = Task.model_validate(payload)
) -> TaskRead:
data = payload.model_dump()
depends_on_task_ids = cast(list[UUID], data.pop("depends_on_task_ids", []) or [])
task = Task.model_validate(data)
task.board_id = board.id
if task.created_by_user_id is None and auth.user is not None:
task.created_by_user_id = auth.user.id
normalized_deps = await validate_dependency_update(
session,
board_id=board.id,
task_id=task.id,
depends_on_task_ids=depends_on_task_ids,
)
dep_status = await dependency_status_by_id(
session,
board_id=board.id,
dependency_ids=normalized_deps,
)
blocked_by = blocked_by_dependency_ids(dependency_ids=normalized_deps, status_by_id=dep_status)
if blocked_by and (task.assigned_agent_id is not None or task.status != "inbox"):
raise _blocked_task_error(blocked_by)
session.add(task)
for dep_id in normalized_deps:
session.add(
TaskDependency(
board_id=board.id,
task_id=task.id,
depends_on_task_id=dep_id,
)
)
await session.commit()
await session.refresh(task)
@@ -457,59 +641,128 @@ async def create_task(
task=task,
agent=assigned_agent,
)
return task
return TaskRead.model_validate(task, from_attributes=True).model_copy(
update={
"depends_on_task_ids": normalized_deps,
"blocked_by_task_ids": blocked_by,
"is_blocked": bool(blocked_by),
}
)
@router.patch("/{task_id}", response_model=TaskRead)
@router.patch(
"/{task_id}",
response_model=TaskRead,
responses={409: {"model": BlockedTaskError}},
)
async def update_task(
payload: TaskUpdate,
task: Task = Depends(get_task_or_404),
session: AsyncSession = Depends(get_session),
actor: ActorContext = Depends(require_admin_or_agent),
) -> Task:
) -> TaskRead:
if task.board_id is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Task board_id is required.",
)
board_id = task.board_id
previous_status = task.status
previous_assigned = task.assigned_agent_id
updates = payload.model_dump(exclude_unset=True)
comment = updates.pop("comment", None)
depends_on_task_ids = cast(list[UUID] | None, updates.pop("depends_on_task_ids", None))
requested_fields = set(updates)
if comment is not None:
requested_fields.add("comment")
if depends_on_task_ids is not None:
requested_fields.add("depends_on_task_ids")
async def _current_dep_ids() -> list[UUID]:
deps_map = await dependency_ids_by_task_id(session, board_id=board_id, task_ids=[task.id])
return deps_map.get(task.id, [])
async def _blocked_by(dep_ids: Sequence[UUID]) -> list[UUID]:
if not dep_ids:
return []
dep_status = await dependency_status_by_id(
session,
board_id=board_id,
dependency_ids=list(dep_ids),
)
return blocked_by_dependency_ids(dependency_ids=list(dep_ids), status_by_id=dep_status)
# Lead agent: delegation only (assign/unassign, resolve review, manage dependencies).
if actor.actor_type == "agent" and actor.agent and actor.agent.is_board_lead:
allowed_fields = {"assigned_agent_id", "status"}
if comment is not None or not set(updates).issubset(allowed_fields):
allowed_fields = {"assigned_agent_id", "status", "depends_on_task_ids"}
if comment is not None or not requested_fields.issubset(allowed_fields):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads can only assign or unassign tasks.",
detail=(
"Board leads can only assign/unassign tasks, update dependencies, or resolve review tasks."
),
)
if "assigned_agent_id" in updates:
assigned_id = updates["assigned_agent_id"]
if assigned_id:
agent = await session.get(Agent, assigned_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if agent.is_board_lead:
normalized_deps: list[UUID] | None = None
if depends_on_task_ids is not None:
if task.status == "done":
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=("Cannot change task dependencies after a task is done."),
)
normalized_deps = await replace_task_dependencies(
session,
board_id=board_id,
task_id=task.id,
depends_on_task_ids=depends_on_task_ids,
)
effective_deps = (
normalized_deps if normalized_deps is not None else await _current_dep_ids()
)
blocked_by = await _blocked_by(effective_deps)
# Blocked tasks cannot be assigned or moved out of inbox (unless already done).
if blocked_by and task.status != "done":
task.status = "inbox"
task.assigned_agent_id = None
task.in_progress_at = None
else:
if "assigned_agent_id" in updates:
assigned_id = updates["assigned_agent_id"]
if assigned_id:
agent = await session.get(Agent, assigned_id)
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if agent.is_board_lead:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads cannot assign tasks to themselves.",
)
if agent.board_id and task.board_id and agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
task.assigned_agent_id = agent.id
else:
task.assigned_agent_id = None
if "status" in updates:
if task.status != "review":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads cannot assign tasks to themselves.",
detail="Board leads can only change status when a task is in review.",
)
if agent.board_id and task.board_id and agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
task.assigned_agent_id = agent.id
else:
task.assigned_agent_id = None
if "status" in updates:
if task.status != "review":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads can only change status when a task is in review.",
)
if updates["status"] not in {"done", "inbox"}:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads can only move review tasks to done or inbox.",
)
if updates["status"] == "inbox":
task.assigned_agent_id = None
task.in_progress_at = None
task.status = updates["status"]
if updates["status"] not in {"done", "inbox"}:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Board leads can only move review tasks to done or inbox.",
)
if updates["status"] == "inbox":
task.assigned_agent_id = None
task.in_progress_at = None
task.status = updates["status"]
task.updated_at = utcnow()
session.add(task)
if task.status != previous_status:
@@ -525,12 +778,17 @@ async def update_task(
message=message,
agent_id=actor.agent.id,
)
await _reconcile_dependents_for_dependency_toggle(
session,
board_id=board_id,
dependency_task=task,
previous_status=previous_status,
actor_agent_id=actor.agent.id,
)
await session.commit()
await session.refresh(task)
if task.assigned_agent_id and task.assigned_agent_id != previous_assigned:
if actor.actor_type == "agent" and actor.agent and task.assigned_agent_id == actor.agent.id:
return task
assigned_agent = await session.get(Agent, task.assigned_agent_id)
if assigned_agent:
board = await session.get(Board, task.board_id) if task.board_id else None
@@ -541,15 +799,33 @@ async def update_task(
task=task,
agent=assigned_agent,
)
return task
dep_ids = await _current_dep_ids()
blocked_ids = await _blocked_by(dep_ids)
if task.status == "done":
blocked_ids = []
return TaskRead.model_validate(task, from_attributes=True).model_copy(
update={
"depends_on_task_ids": dep_ids,
"blocked_by_task_ids": blocked_ids,
"is_blocked": bool(blocked_ids),
}
)
# Non-lead agent: can only change status + comment, and cannot start blocked tasks.
if actor.actor_type == "agent":
if actor.agent and actor.agent.board_id and task.board_id:
if actor.agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
allowed_fields = {"status", "comment"}
if not set(updates).issubset(allowed_fields):
if depends_on_task_ids is not None or not set(updates).issubset(allowed_fields):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if "status" in updates:
if updates["status"] != "inbox":
dep_ids = await _current_dep_ids()
blocked_ids = await _blocked_by(dep_ids)
if blocked_ids:
raise _blocked_task_error(blocked_ids)
if updates["status"] == "inbox":
task.assigned_agent_id = None
task.in_progress_at = None
@@ -557,18 +833,51 @@ async def update_task(
task.assigned_agent_id = actor.agent.id if actor.agent else None
if updates["status"] == "in_progress":
task.in_progress_at = utcnow()
elif "status" in updates:
if updates["status"] == "inbox":
else:
# Admin user: dependencies can be edited until the task is done.
admin_normalized_deps: list[UUID] | None = None
if depends_on_task_ids is not None:
if task.status == "done":
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=("Cannot change task dependencies after a task is done."),
)
admin_normalized_deps = await replace_task_dependencies(
session,
board_id=board_id,
task_id=task.id,
depends_on_task_ids=depends_on_task_ids,
)
effective_deps = (
admin_normalized_deps if admin_normalized_deps is not None else await _current_dep_ids()
)
blocked_ids = await _blocked_by(effective_deps)
target_status = cast(str, updates.get("status", task.status))
if blocked_ids and not (task.status == "done" and target_status == "done"):
# Blocked tasks cannot be assigned or moved out of inbox. If the task is already in
# flight, force it back to inbox and unassign it.
task.status = "inbox"
task.assigned_agent_id = None
task.in_progress_at = None
elif updates["status"] == "in_progress":
task.in_progress_at = utcnow()
if "assigned_agent_id" in updates and updates["assigned_agent_id"]:
agent = await session.get(Agent, updates["assigned_agent_id"])
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if agent.board_id and task.board_id and agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
updates["status"] = "inbox"
updates["assigned_agent_id"] = None
if "status" in updates:
if updates["status"] == "inbox":
task.assigned_agent_id = None
task.in_progress_at = None
elif updates["status"] == "in_progress":
task.in_progress_at = utcnow()
if "assigned_agent_id" in updates and updates["assigned_agent_id"]:
agent = await session.get(Agent, updates["assigned_agent_id"])
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if agent.board_id and task.board_id and agent.board_id != task.board_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
for key, value in updates.items():
setattr(task, key, value)
task.updated_at = utcnow()
@@ -606,14 +915,23 @@ async def update_task(
else:
event_type = "task.updated"
message = f"Task updated: {task.title}."
actor_agent_id = actor.agent.id if actor.actor_type == "agent" and actor.agent else None
record_activity(
session,
event_type=event_type,
task_id=task.id,
message=message,
agent_id=actor.agent.id if actor.actor_type == "agent" and actor.agent else None,
agent_id=actor_agent_id,
)
await _reconcile_dependents_for_dependency_toggle(
session,
board_id=board_id,
dependency_task=task,
previous_status=previous_status,
actor_agent_id=actor_agent_id,
)
await session.commit()
if task.status == "inbox" and task.assigned_agent_id is None:
if previous_status != "inbox" or previous_assigned is not None:
board = await session.get(Board, task.board_id) if task.board_id else None
@@ -625,18 +943,31 @@ async def update_task(
)
if task.assigned_agent_id and task.assigned_agent_id != previous_assigned:
if actor.actor_type == "agent" and actor.agent and task.assigned_agent_id == actor.agent.id:
return task
assigned_agent = await session.get(Agent, task.assigned_agent_id)
if assigned_agent:
board = await session.get(Board, task.board_id) if task.board_id else None
if board:
await _notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=assigned_agent,
)
return task
# Don't notify the actor about their own assignment.
pass
else:
assigned_agent = await session.get(Agent, task.assigned_agent_id)
if assigned_agent:
board = await session.get(Board, task.board_id) if task.board_id else None
if board:
await _notify_agent_on_task_assign(
session=session,
board=board,
task=task,
agent=assigned_agent,
)
dep_ids = await _current_dep_ids()
blocked_ids = await _blocked_by(dep_ids)
if task.status == "done":
blocked_ids = []
return TaskRead.model_validate(task, from_attributes=True).model_copy(
update={
"depends_on_task_ids": dep_ids,
"blocked_by_task_ids": blocked_ids,
"is_blocked": bool(blocked_ids),
}
)
@router.delete("/{task_id}", response_model=OkResponse)
@@ -648,6 +979,14 @@ async def delete_task(
await session.execute(delete(ActivityEvent).where(col(ActivityEvent.task_id) == task.id))
await session.execute(delete(TaskFingerprint).where(col(TaskFingerprint.task_id) == task.id))
await session.execute(delete(Approval).where(col(Approval.task_id) == task.id))
await session.execute(
delete(TaskDependency).where(
or_(
col(TaskDependency.task_id) == task.id,
col(TaskDependency.depends_on_task_id) == task.id,
)
)
)
await session.delete(task)
await session.commit()
return OkResponse()