Harden non-lead task patch auth and add regression tests

This commit is contained in:
Abhimanyu Saharan
2026-02-14 06:03:35 +00:00
parent 42a41f64bc
commit 4fe8db9649
3 changed files with 367 additions and 3 deletions

View File

@@ -126,6 +126,16 @@ def _comment_validation_error() -> HTTPException:
)
def _task_update_forbidden_error(*, code: str, message: str) -> HTTPException:
return HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"message": message,
"code": code,
},
)
def _blocked_task_error(blocked_by_task_ids: Sequence[UUID]) -> HTTPException:
# NOTE: Keep this payload machine-readable; UI and automation rely on it.
return HTTPException(
@@ -2051,7 +2061,29 @@ async def _apply_non_lead_agent_task_rules(
and update.task.board_id
and update.actor.agent.board_id != update.task.board_id
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
raise _task_update_forbidden_error(
code="task_board_mismatch",
message="Agent can only update tasks for their assigned board.",
)
if (
update.actor.agent
and "status" in update.updates
and (update.task.assigned_agent_id is None)
):
raise _task_update_forbidden_error(
code="task_assignee_required",
message="Agents can only change status on tasks assigned to them.",
)
if (
update.actor.agent
and update.task.assigned_agent_id is not None
and update.task.assigned_agent_id != update.actor.agent.id
and "status" in update.updates
):
raise _task_update_forbidden_error(
code="task_assignee_mismatch",
message="Agents can only change status on tasks assigned to them.",
)
# Agents are limited to status/comment updates, and non-inbox status moves
# must pass dependency checks before they can proceed.
allowed_fields = {"status", "comment", "custom_field_values"}
@@ -2062,7 +2094,10 @@ async def _apply_non_lead_agent_task_rules(
allowed_fields,
)
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
raise _task_update_forbidden_error(
code="task_update_field_forbidden",
message="Agents may only update status, comment, and custom field values.",
)
if "status" in update.updates:
only_lead_can_change_status = (
await session.exec(

View File

@@ -0,0 +1,323 @@
from __future__ import annotations
from uuid import uuid4
import pytest
from fastapi import HTTPException
from sqlmodel import col, select, SQLModel
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlmodel.ext.asyncio.session import AsyncSession
from app.api import tasks as tasks_api
from app.api.deps import ActorContext
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organizations import Organization
from app.models.tasks import Task
from app.schemas.tasks import TaskUpdate
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.connect() as conn, conn.begin():
await conn.run_sync(SQLModel.metadata.create_all)
return engine
async def _make_session(engine: AsyncEngine) -> AsyncSession:
return AsyncSession(engine, expire_on_commit=False)
@pytest.mark.asyncio
async def test_non_lead_agent_can_update_status_for_assigned_task() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
worker_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
),
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
),
)
session.add(
Agent(
id=worker_id,
name="worker",
board_id=board_id,
gateway_id=gateway_id,
status="online",
),
)
session.add(
Task(
id=task_id,
board_id=board_id,
title="assigned task",
description="",
status="inbox",
assigned_agent_id=worker_id,
),
)
await session.commit()
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
assert task is not None
actor = (await session.exec(select(Agent).where(col(Agent.id) == worker_id))).first()
assert actor is not None
updated = await tasks_api.update_task(
payload=TaskUpdate(status="in_progress"),
task=task,
session=session,
actor=ActorContext(actor_type="agent", agent=actor),
)
assert updated.status == "in_progress"
assert updated.assigned_agent_id == worker_id
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_non_lead_agent_forbidden_without_assigned_task() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
actor_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
),
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
),
)
session.add(
Agent(
id=actor_id,
name="actor",
board_id=board_id,
gateway_id=gateway_id,
status="online",
),
)
session.add(
Task(
id=task_id,
board_id=board_id,
title="unassigned task",
description="",
status="inbox",
assigned_agent_id=None,
),
)
await session.commit()
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
assert task is not None
actor = (await session.exec(select(Agent).where(col(Agent.id) == actor_id))).first()
assert actor is not None
with pytest.raises(HTTPException) as exc:
await tasks_api.update_task(
payload=TaskUpdate(status="in_progress"),
task=task,
session=session,
actor=ActorContext(actor_type="agent", agent=actor),
)
assert exc.value.status_code == 403
assert isinstance(exc.value.detail, dict)
assert exc.value.detail["code"] == "task_assignee_required"
assert exc.value.detail["message"] == "Agents can only change status on tasks assigned to them."
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_non_lead_agent_forbidden_when_task_assigned_to_other_agent() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
actor_id = uuid4()
assignee_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
),
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
),
)
session.add(
Agent(
id=actor_id,
name="actor",
board_id=board_id,
gateway_id=gateway_id,
status="online",
),
)
session.add(
Agent(
id=assignee_id,
name="other",
board_id=board_id,
gateway_id=gateway_id,
status="online",
),
)
session.add(
Task(
id=task_id,
board_id=board_id,
title="other owner task",
description="",
status="inbox",
assigned_agent_id=assignee_id,
),
)
await session.commit()
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
assert task is not None
actor = (await session.exec(select(Agent).where(col(Agent.id) == actor_id))).first()
assert actor is not None
with pytest.raises(HTTPException) as exc:
await tasks_api.update_task(
payload=TaskUpdate(status="in_progress"),
task=task,
session=session,
actor=ActorContext(actor_type="agent", agent=actor),
)
assert exc.value.status_code == 403
assert isinstance(exc.value.detail, dict)
assert exc.value.detail["code"] == "task_assignee_mismatch"
assert exc.value.detail["message"] == "Agents can only change status on tasks assigned to them."
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_non_lead_agent_forbidden_for_lead_only_patch_fields() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
actor_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
),
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
),
)
session.add(
Agent(
id=actor_id,
name="actor",
board_id=board_id,
gateway_id=gateway_id,
status="online",
),
)
session.add(
Task(
id=task_id,
board_id=board_id,
title="owned task",
description="",
status="inbox",
assigned_agent_id=actor_id,
),
)
await session.commit()
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
assert task is not None
actor = (await session.exec(select(Agent).where(col(Agent.id) == actor_id))).first()
assert actor is not None
with pytest.raises(HTTPException) as exc:
await tasks_api.update_task(
payload=TaskUpdate(assigned_agent_id=actor_id),
task=task,
session=session,
actor=ActorContext(actor_type="agent", agent=actor),
)
assert exc.value.status_code == 403
assert isinstance(exc.value.detail, dict)
assert exc.value.detail["code"] == "task_update_field_forbidden"
finally:
await engine.dispose()

View File

@@ -61,7 +61,6 @@ async def _seed_board_task_and_agent(
block_status_changes_with_pending_approval=block_status_changes_with_pending_approval,
only_lead_can_change_status=only_lead_can_change_status,
)
task = Task(id=uuid4(), board_id=board.id, title="Task", status=task_status)
agent = Agent(
id=uuid4(),
board_id=board.id,
@@ -70,6 +69,13 @@ async def _seed_board_task_and_agent(
status="online",
is_board_lead=agent_is_board_lead,
)
task = Task(
id=uuid4(),
board_id=board.id,
title="Task",
status=task_status,
assigned_agent_id=agent.id,
)
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
session.add(gateway)