From adad72373c723991786a97a1acd19bfec2686a2c Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Wed, 25 Feb 2026 18:29:59 +0530 Subject: [PATCH] feat(tasks): add notification messages for task assignment and rework --- backend/app/api/tasks.py | 239 ++++++++++++++-- backend/tests/test_task_agent_permissions.py | 275 ++++++++++++++++++- 2 files changed, 489 insertions(+), 25 deletions(-) diff --git a/backend/app/api/tasks.py b/backend/app/api/tasks.py index 61b51c18..d846b8f8 100644 --- a/backend/app/api/tasks.py +++ b/backend/app/api/tasks.py @@ -576,6 +576,75 @@ async def _send_agent_task_message( ) +def _assignment_notification_message(*, board: Board, task: Task, agent: Agent) -> str: + description = _truncate_snippet(task.description or "") + details = [ + f"Board: {board.name}", + f"Task: {task.title}", + f"Task ID: {task.id}", + f"Status: {task.status}", + ] + if description: + details.append(f"Description: {description}") + if task.status == "review" and agent.is_board_lead: + action = ( + "Take action: review the deliverables now. " + "Approve by moving to done or return to inbox with clear feedback." + ) + return "TASK READY FOR LEAD REVIEW\n" + "\n".join(details) + f"\n\n{action}" + return ( + "TASK ASSIGNED\n" + + "\n".join(details) + + ("\n\nTake action: open the task and begin work. " "Post updates as task comments.") + ) + + +def _rework_notification_message( + *, + board: Board, + task: Task, + feedback: str | None, +) -> str: + description = _truncate_snippet(task.description or "") + details = [ + f"Board: {board.name}", + f"Task: {task.title}", + f"Task ID: {task.id}", + f"Status: {task.status}", + ] + if description: + details.append(f"Description: {description}") + requested_changes = ( + _truncate_snippet(feedback) + if feedback and feedback.strip() + else "Lead requested changes. Review latest task comments for exact required updates." + ) + return ( + "CHANGES REQUESTED\n" + + "\n".join(details) + + "\n\nRequested changes:\n" + + requested_changes + + "\n\nTake action: address the requested changes, then move the task back to review." + ) + + +async def _latest_task_comment_by_agent( + session: AsyncSession, + *, + task_id: UUID, + agent_id: UUID, +) -> str | None: + statement = ( + select(col(ActivityEvent.message)) + .where(col(ActivityEvent.task_id) == task_id) + .where(col(ActivityEvent.event_type) == "task.comment") + .where(col(ActivityEvent.agent_id) == agent_id) + .order_by(desc(col(ActivityEvent.created_at))) + .limit(1) + ) + return (await session.exec(statement)).first() + + async def _notify_agent_on_task_assign( *, session: AsyncSession, @@ -589,20 +658,7 @@ async def _notify_agent_on_task_assign( config = await dispatch.optional_gateway_config_for_board(board) if config is None: return - description = _truncate_snippet(task.description or "") - details = [ - f"Board: {board.name}", - f"Task: {task.title}", - f"Task ID: {task.id}", - f"Status: {task.status}", - ] - if description: - details.append(f"Description: {description}") - message = ( - "TASK ASSIGNED\n" - + "\n".join(details) - + ("\n\nTake action: open the task and begin work. " "Post updates as task comments.") - ) + message = _assignment_notification_message(board=board, task=task, agent=agent) error = await _send_agent_task_message( dispatch=dispatch, session_key=agent.openclaw_session_id, @@ -630,6 +686,57 @@ async def _notify_agent_on_task_assign( await session.commit() +async def _notify_agent_on_task_rework( + *, + session: AsyncSession, + board: Board, + task: Task, + agent: Agent, + lead: Agent, +) -> None: + if not agent.openclaw_session_id: + return + dispatch = GatewayDispatchService(session) + config = await dispatch.optional_gateway_config_for_board(board) + if config is None: + return + feedback = await _latest_task_comment_by_agent( + session, + task_id=task.id, + agent_id=lead.id, + ) + message = _rework_notification_message( + board=board, + task=task, + feedback=feedback, + ) + error = await _send_agent_task_message( + dispatch=dispatch, + session_key=agent.openclaw_session_id, + config=config, + agent_name=agent.name, + message=message, + ) + if error is None: + record_activity( + session, + event_type="task.rework_notified", + message=f"Assignee notified about requested changes: {agent.name}.", + agent_id=agent.id, + task_id=task.id, + ) + await session.commit() + else: + record_activity( + session, + event_type="task.rework_notify_failed", + message=f"Rework notify failed: {error}", + agent_id=agent.id, + task_id=task.id, + ) + await session.commit() + + async def notify_agent_on_task_assign( *, session: AsyncSession, @@ -1948,7 +2055,39 @@ async def _lead_apply_assignment( update.task.assigned_agent_id = agent.id -def _lead_apply_status(update: _TaskUpdateInput) -> None: +async def _last_worker_who_moved_task_to_review( + session: AsyncSession, + *, + task_id: UUID, + board_id: UUID, + lead_agent_id: UUID, +) -> UUID | None: + statement = ( + select(col(ActivityEvent.agent_id)) + .where(col(ActivityEvent.task_id) == task_id) + .where(col(ActivityEvent.event_type) == "task.status_changed") + .where(col(ActivityEvent.message).like("Task moved to review:%")) + .where(col(ActivityEvent.agent_id).is_not(None)) + .order_by(desc(col(ActivityEvent.created_at))) + ) + candidate_ids = list(await session.exec(statement)) + for candidate_id in candidate_ids: + if candidate_id is None or candidate_id == lead_agent_id: + continue + candidate = await Agent.objects.by_id(candidate_id).first(session) + if candidate is None: + continue + if candidate.board_id != board_id or candidate.is_board_lead: + continue + return candidate.id + return None + + +async def _lead_apply_status( + session: AsyncSession, + *, + update: _TaskUpdateInput, +) -> None: if "status" not in update.updates: return if update.task.status != "review": @@ -1969,7 +2108,12 @@ def _lead_apply_status(update: _TaskUpdateInput) -> None: ), ) if target_status == "inbox": - update.task.assigned_agent_id = None + update.task.assigned_agent_id = await _last_worker_who_moved_task_to_review( + session, + task_id=update.task.id, + board_id=update.board_id, + lead_agent_id=update.actor.agent.id, + ) update.task.in_progress_at = None update.task.status = target_status @@ -2001,6 +2145,21 @@ async def _lead_notify_new_assignee( else None ) if board: + if ( + update.previous_status == "review" + and update.task.status == "inbox" + and update.actor.actor_type == "agent" + and update.actor.agent + and update.actor.agent.is_board_lead + ): + await _notify_agent_on_task_rework( + session=session, + board=board, + task=update.task, + agent=assigned_agent, + lead=update.actor.agent, + ) + return await _notify_agent_on_task_assign( session=session, board=board, @@ -2037,7 +2196,7 @@ async def _apply_lead_task_update( raise _blocked_task_error(blocked_by) await _lead_apply_assignment(session, update=update) - _lead_apply_status(update) + await _lead_apply_status(session, update=update) await _require_no_pending_approval_for_status_change_when_enabled( session, board_id=update.board_id, @@ -2306,6 +2465,23 @@ async def _record_task_update_activity( await session.commit() +async def _assign_review_task_to_lead( + session: AsyncSession, + *, + update: _TaskUpdateInput, +) -> None: + if update.task.status != "review" or update.previous_status == "review": + return + lead = ( + await Agent.objects.filter_by(board_id=update.board_id) + .filter(col(Agent.is_board_lead).is_(True)) + .first(session) + ) + if lead is None: + return + update.task.assigned_agent_id = lead.id + + async def _notify_task_update_assignment_changes( session: AsyncSession, *, @@ -2333,12 +2509,6 @@ async def _notify_task_update_assignment_changes( or update.task.assigned_agent_id == update.previous_assigned ): return - if ( - update.actor.actor_type == "agent" - and update.actor.agent - and update.task.assigned_agent_id == update.actor.agent.id - ): - return assigned_agent = await Agent.objects.by_id(update.task.assigned_agent_id).first( session, ) @@ -2349,6 +2519,28 @@ async def _notify_task_update_assignment_changes( if update.task.board_id else None ) + if ( + update.previous_status == "review" + and update.task.status == "inbox" + and update.actor.actor_type == "agent" + and update.actor.agent + and update.actor.agent.is_board_lead + ): + if board: + await _notify_agent_on_task_rework( + session=session, + board=board, + task=update.task, + agent=assigned_agent, + lead=update.actor.agent, + ) + return + if ( + update.actor.actor_type == "agent" + and update.actor.agent + and update.task.assigned_agent_id == update.actor.agent.id + ): + return if board: await _notify_agent_on_task_assign( session=session, @@ -2406,6 +2598,7 @@ async def _finalize_updated_task( review_comment_since, ): raise _comment_validation_error() + await _assign_review_task_to_lead(session, update=update) if update.tag_ids is not None: normalized = ( diff --git a/backend/tests/test_task_agent_permissions.py b/backend/tests/test_task_agent_permissions.py index 44b3b6e2..60811f5f 100644 --- a/backend/tests/test_task_agent_permissions.py +++ b/backend/tests/test_task_agent_permissions.py @@ -1,5 +1,6 @@ from __future__ import annotations +from typing import Any from uuid import uuid4 import pytest @@ -11,6 +12,7 @@ from sqlmodel.ext.asyncio.session import AsyncSession from app.api import tasks as tasks_api from app.api.deps import ActorContext from app.core.time import utcnow +from app.models.activity_events import ActivityEvent from app.models.agents import Agent from app.models.boards import Board from app.models.gateways import Gateway @@ -326,7 +328,7 @@ async def test_non_lead_agent_forbidden_for_lead_only_patch_fields() -> None: @pytest.mark.asyncio -async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None: +async def test_non_lead_agent_moves_task_to_review_and_reassigns_to_lead() -> None: engine = await _make_engine() try: async with await _make_session(engine) as session: @@ -334,6 +336,7 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None: board_id = uuid4() gateway_id = uuid4() worker_id = uuid4() + lead_id = uuid4() task_id = uuid4() in_progress_at = utcnow() @@ -365,6 +368,16 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None: status="online", ), ) + session.add( + Agent( + id=lead_id, + name="Lead Agent", + board_id=board_id, + gateway_id=gateway_id, + status="online", + is_board_lead=True, + ), + ) session.add( Task( id=task_id, @@ -391,7 +404,7 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None: ) assert updated.status == "review" - assert updated.assigned_agent_id is None + assert updated.assigned_agent_id == lead_id assert updated.in_progress_at is None refreshed_task = ( @@ -399,6 +412,264 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None: ).first() assert refreshed_task is not None assert refreshed_task.previous_in_progress_at == in_progress_at + assert refreshed_task.assigned_agent_id == lead_id + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_non_lead_agent_move_to_review_reassigns_to_lead_and_sends_review_message( + monkeypatch: pytest.MonkeyPatch, +) -> None: + engine = await _make_engine() + try: + async with await _make_session(engine) as session: + org_id = uuid4() + board_id = uuid4() + gateway_id = uuid4() + worker_id = uuid4() + lead_id = uuid4() + task_id = uuid4() + + session.add(Organization(id=org_id, name="org")) + session.add( + Gateway( + id=gateway_id, + organization_id=org_id, + name="gateway", + url="https://gateway.local", + workspace_root="/tmp/workspace", + ), + ) + session.add( + Board( + id=board_id, + organization_id=org_id, + name="board", + slug="board", + gateway_id=gateway_id, + ), + ) + session.add( + Agent( + id=worker_id, + name="worker", + board_id=board_id, + gateway_id=gateway_id, + status="online", + ), + ) + session.add( + Agent( + id=lead_id, + name="Lead Agent", + board_id=board_id, + gateway_id=gateway_id, + status="online", + is_board_lead=True, + openclaw_session_id="lead-session", + ), + ) + session.add( + Task( + id=task_id, + board_id=board_id, + title="assigned task", + description="done and ready", + status="in_progress", + assigned_agent_id=worker_id, + in_progress_at=utcnow(), + ), + ) + await session.commit() + + sent: dict[str, str] = {} + + class _FakeDispatch: + def __init__(self, _session: AsyncSession) -> None: + pass + + async def optional_gateway_config_for_board(self, _board: Board) -> object: + return object() + + async def _fake_send_agent_task_message( + *, + dispatch: Any, + session_key: str, + config: Any, + agent_name: str, + message: str, + ) -> None: + _ = dispatch, config + sent["session_key"] = session_key + sent["agent_name"] = agent_name + sent["message"] = message + return None + + monkeypatch.setattr(tasks_api, "GatewayDispatchService", _FakeDispatch) + monkeypatch.setattr(tasks_api, "_send_agent_task_message", _fake_send_agent_task_message) + + task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first() + assert task is not None + actor = (await session.exec(select(Agent).where(col(Agent.id) == worker_id))).first() + assert actor is not None + + updated = await tasks_api.update_task( + payload=TaskUpdate(status="review", comment="Moving to review."), + task=task, + session=session, + actor=ActorContext(actor_type="agent", agent=actor), + ) + + assert updated.status == "review" + assert updated.assigned_agent_id == lead_id + assert sent["session_key"] == "lead-session" + assert sent["agent_name"] == "Lead Agent" + assert "TASK READY FOR LEAD REVIEW" in sent["message"] + assert "review the deliverables" in sent["message"] + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_lead_moves_review_task_to_inbox_and_reassigns_last_worker_with_rework_message( + monkeypatch: pytest.MonkeyPatch, +) -> None: + engine = await _make_engine() + try: + async with await _make_session(engine) as session: + org_id = uuid4() + board_id = uuid4() + gateway_id = uuid4() + worker_id = uuid4() + lead_id = uuid4() + task_id = uuid4() + + session.add(Organization(id=org_id, name="org")) + session.add( + Gateway( + id=gateway_id, + organization_id=org_id, + name="gateway", + url="https://gateway.local", + workspace_root="/tmp/workspace", + ), + ) + session.add( + Board( + id=board_id, + organization_id=org_id, + name="board", + slug="board", + gateway_id=gateway_id, + ), + ) + session.add( + Agent( + id=worker_id, + name="worker", + board_id=board_id, + gateway_id=gateway_id, + status="online", + openclaw_session_id="worker-session", + ), + ) + session.add( + Agent( + id=lead_id, + name="Lead Agent", + board_id=board_id, + gateway_id=gateway_id, + status="online", + is_board_lead=True, + openclaw_session_id="lead-session", + ), + ) + session.add( + Task( + id=task_id, + board_id=board_id, + title="assigned task", + description="ready", + status="in_progress", + assigned_agent_id=worker_id, + in_progress_at=utcnow(), + ), + ) + await session.commit() + + sent: list[dict[str, str]] = [] + + class _FakeDispatch: + def __init__(self, _session: AsyncSession) -> None: + pass + + async def optional_gateway_config_for_board(self, _board: Board) -> object: + return object() + + async def _fake_send_agent_task_message( + *, + dispatch: Any, + session_key: str, + config: Any, + agent_name: str, + message: str, + ) -> None: + _ = dispatch, config + sent.append( + { + "session_key": session_key, + "agent_name": agent_name, + "message": message, + }, + ) + return None + + monkeypatch.setattr(tasks_api, "GatewayDispatchService", _FakeDispatch) + monkeypatch.setattr(tasks_api, "_send_agent_task_message", _fake_send_agent_task_message) + + task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first() + assert task is not None + worker = (await session.exec(select(Agent).where(col(Agent.id) == worker_id))).first() + assert worker is not None + lead = (await session.exec(select(Agent).where(col(Agent.id) == lead_id))).first() + assert lead is not None + + moved_to_review = await tasks_api.update_task( + payload=TaskUpdate(status="review", comment="Ready for review."), + task=task, + session=session, + actor=ActorContext(actor_type="agent", agent=worker), + ) + assert moved_to_review.status == "review" + assert moved_to_review.assigned_agent_id == lead_id + + session.add( + ActivityEvent( + event_type="task.comment", + task_id=task_id, + agent_id=lead_id, + message="Please update error handling and add tests for edge cases.", + ), + ) + await session.commit() + + review_task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first() + assert review_task is not None + reverted = await tasks_api.update_task( + payload=TaskUpdate(status="inbox"), + task=review_task, + session=session, + actor=ActorContext(actor_type="agent", agent=lead), + ) + + assert reverted.status == "inbox" + assert reverted.assigned_agent_id == worker_id + worker_messages = [item for item in sent if item["session_key"] == "worker-session"] + assert worker_messages + final_message = worker_messages[-1]["message"] + assert "CHANGES REQUESTED" in final_message + assert "Please update error handling and add tests for edge cases." in final_message finally: await engine.dispose()