feat(tasks): add notification messages for task assignment and rework
This commit is contained in:
@@ -576,6 +576,75 @@ async def _send_agent_task_message(
|
||||
)
|
||||
|
||||
|
||||
def _assignment_notification_message(*, board: Board, task: Task, agent: Agent) -> str:
|
||||
description = _truncate_snippet(task.description or "")
|
||||
details = [
|
||||
f"Board: {board.name}",
|
||||
f"Task: {task.title}",
|
||||
f"Task ID: {task.id}",
|
||||
f"Status: {task.status}",
|
||||
]
|
||||
if description:
|
||||
details.append(f"Description: {description}")
|
||||
if task.status == "review" and agent.is_board_lead:
|
||||
action = (
|
||||
"Take action: review the deliverables now. "
|
||||
"Approve by moving to done or return to inbox with clear feedback."
|
||||
)
|
||||
return "TASK READY FOR LEAD REVIEW\n" + "\n".join(details) + f"\n\n{action}"
|
||||
return (
|
||||
"TASK ASSIGNED\n"
|
||||
+ "\n".join(details)
|
||||
+ ("\n\nTake action: open the task and begin work. " "Post updates as task comments.")
|
||||
)
|
||||
|
||||
|
||||
def _rework_notification_message(
|
||||
*,
|
||||
board: Board,
|
||||
task: Task,
|
||||
feedback: str | None,
|
||||
) -> str:
|
||||
description = _truncate_snippet(task.description or "")
|
||||
details = [
|
||||
f"Board: {board.name}",
|
||||
f"Task: {task.title}",
|
||||
f"Task ID: {task.id}",
|
||||
f"Status: {task.status}",
|
||||
]
|
||||
if description:
|
||||
details.append(f"Description: {description}")
|
||||
requested_changes = (
|
||||
_truncate_snippet(feedback)
|
||||
if feedback and feedback.strip()
|
||||
else "Lead requested changes. Review latest task comments for exact required updates."
|
||||
)
|
||||
return (
|
||||
"CHANGES REQUESTED\n"
|
||||
+ "\n".join(details)
|
||||
+ "\n\nRequested changes:\n"
|
||||
+ requested_changes
|
||||
+ "\n\nTake action: address the requested changes, then move the task back to review."
|
||||
)
|
||||
|
||||
|
||||
async def _latest_task_comment_by_agent(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
task_id: UUID,
|
||||
agent_id: UUID,
|
||||
) -> str | None:
|
||||
statement = (
|
||||
select(col(ActivityEvent.message))
|
||||
.where(col(ActivityEvent.task_id) == task_id)
|
||||
.where(col(ActivityEvent.event_type) == "task.comment")
|
||||
.where(col(ActivityEvent.agent_id) == agent_id)
|
||||
.order_by(desc(col(ActivityEvent.created_at)))
|
||||
.limit(1)
|
||||
)
|
||||
return (await session.exec(statement)).first()
|
||||
|
||||
|
||||
async def _notify_agent_on_task_assign(
|
||||
*,
|
||||
session: AsyncSession,
|
||||
@@ -589,20 +658,7 @@ async def _notify_agent_on_task_assign(
|
||||
config = await dispatch.optional_gateway_config_for_board(board)
|
||||
if config is None:
|
||||
return
|
||||
description = _truncate_snippet(task.description or "")
|
||||
details = [
|
||||
f"Board: {board.name}",
|
||||
f"Task: {task.title}",
|
||||
f"Task ID: {task.id}",
|
||||
f"Status: {task.status}",
|
||||
]
|
||||
if description:
|
||||
details.append(f"Description: {description}")
|
||||
message = (
|
||||
"TASK ASSIGNED\n"
|
||||
+ "\n".join(details)
|
||||
+ ("\n\nTake action: open the task and begin work. " "Post updates as task comments.")
|
||||
)
|
||||
message = _assignment_notification_message(board=board, task=task, agent=agent)
|
||||
error = await _send_agent_task_message(
|
||||
dispatch=dispatch,
|
||||
session_key=agent.openclaw_session_id,
|
||||
@@ -630,6 +686,57 @@ async def _notify_agent_on_task_assign(
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def _notify_agent_on_task_rework(
|
||||
*,
|
||||
session: AsyncSession,
|
||||
board: Board,
|
||||
task: Task,
|
||||
agent: Agent,
|
||||
lead: Agent,
|
||||
) -> None:
|
||||
if not agent.openclaw_session_id:
|
||||
return
|
||||
dispatch = GatewayDispatchService(session)
|
||||
config = await dispatch.optional_gateway_config_for_board(board)
|
||||
if config is None:
|
||||
return
|
||||
feedback = await _latest_task_comment_by_agent(
|
||||
session,
|
||||
task_id=task.id,
|
||||
agent_id=lead.id,
|
||||
)
|
||||
message = _rework_notification_message(
|
||||
board=board,
|
||||
task=task,
|
||||
feedback=feedback,
|
||||
)
|
||||
error = await _send_agent_task_message(
|
||||
dispatch=dispatch,
|
||||
session_key=agent.openclaw_session_id,
|
||||
config=config,
|
||||
agent_name=agent.name,
|
||||
message=message,
|
||||
)
|
||||
if error is None:
|
||||
record_activity(
|
||||
session,
|
||||
event_type="task.rework_notified",
|
||||
message=f"Assignee notified about requested changes: {agent.name}.",
|
||||
agent_id=agent.id,
|
||||
task_id=task.id,
|
||||
)
|
||||
await session.commit()
|
||||
else:
|
||||
record_activity(
|
||||
session,
|
||||
event_type="task.rework_notify_failed",
|
||||
message=f"Rework notify failed: {error}",
|
||||
agent_id=agent.id,
|
||||
task_id=task.id,
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def notify_agent_on_task_assign(
|
||||
*,
|
||||
session: AsyncSession,
|
||||
@@ -1948,7 +2055,39 @@ async def _lead_apply_assignment(
|
||||
update.task.assigned_agent_id = agent.id
|
||||
|
||||
|
||||
def _lead_apply_status(update: _TaskUpdateInput) -> None:
|
||||
async def _last_worker_who_moved_task_to_review(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
task_id: UUID,
|
||||
board_id: UUID,
|
||||
lead_agent_id: UUID,
|
||||
) -> UUID | None:
|
||||
statement = (
|
||||
select(col(ActivityEvent.agent_id))
|
||||
.where(col(ActivityEvent.task_id) == task_id)
|
||||
.where(col(ActivityEvent.event_type) == "task.status_changed")
|
||||
.where(col(ActivityEvent.message).like("Task moved to review:%"))
|
||||
.where(col(ActivityEvent.agent_id).is_not(None))
|
||||
.order_by(desc(col(ActivityEvent.created_at)))
|
||||
)
|
||||
candidate_ids = list(await session.exec(statement))
|
||||
for candidate_id in candidate_ids:
|
||||
if candidate_id is None or candidate_id == lead_agent_id:
|
||||
continue
|
||||
candidate = await Agent.objects.by_id(candidate_id).first(session)
|
||||
if candidate is None:
|
||||
continue
|
||||
if candidate.board_id != board_id or candidate.is_board_lead:
|
||||
continue
|
||||
return candidate.id
|
||||
return None
|
||||
|
||||
|
||||
async def _lead_apply_status(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
update: _TaskUpdateInput,
|
||||
) -> None:
|
||||
if "status" not in update.updates:
|
||||
return
|
||||
if update.task.status != "review":
|
||||
@@ -1969,7 +2108,12 @@ def _lead_apply_status(update: _TaskUpdateInput) -> None:
|
||||
),
|
||||
)
|
||||
if target_status == "inbox":
|
||||
update.task.assigned_agent_id = None
|
||||
update.task.assigned_agent_id = await _last_worker_who_moved_task_to_review(
|
||||
session,
|
||||
task_id=update.task.id,
|
||||
board_id=update.board_id,
|
||||
lead_agent_id=update.actor.agent.id,
|
||||
)
|
||||
update.task.in_progress_at = None
|
||||
update.task.status = target_status
|
||||
|
||||
@@ -2001,6 +2145,21 @@ async def _lead_notify_new_assignee(
|
||||
else None
|
||||
)
|
||||
if board:
|
||||
if (
|
||||
update.previous_status == "review"
|
||||
and update.task.status == "inbox"
|
||||
and update.actor.actor_type == "agent"
|
||||
and update.actor.agent
|
||||
and update.actor.agent.is_board_lead
|
||||
):
|
||||
await _notify_agent_on_task_rework(
|
||||
session=session,
|
||||
board=board,
|
||||
task=update.task,
|
||||
agent=assigned_agent,
|
||||
lead=update.actor.agent,
|
||||
)
|
||||
return
|
||||
await _notify_agent_on_task_assign(
|
||||
session=session,
|
||||
board=board,
|
||||
@@ -2037,7 +2196,7 @@ async def _apply_lead_task_update(
|
||||
raise _blocked_task_error(blocked_by)
|
||||
|
||||
await _lead_apply_assignment(session, update=update)
|
||||
_lead_apply_status(update)
|
||||
await _lead_apply_status(session, update=update)
|
||||
await _require_no_pending_approval_for_status_change_when_enabled(
|
||||
session,
|
||||
board_id=update.board_id,
|
||||
@@ -2306,6 +2465,23 @@ async def _record_task_update_activity(
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def _assign_review_task_to_lead(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
update: _TaskUpdateInput,
|
||||
) -> None:
|
||||
if update.task.status != "review" or update.previous_status == "review":
|
||||
return
|
||||
lead = (
|
||||
await Agent.objects.filter_by(board_id=update.board_id)
|
||||
.filter(col(Agent.is_board_lead).is_(True))
|
||||
.first(session)
|
||||
)
|
||||
if lead is None:
|
||||
return
|
||||
update.task.assigned_agent_id = lead.id
|
||||
|
||||
|
||||
async def _notify_task_update_assignment_changes(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
@@ -2333,12 +2509,6 @@ async def _notify_task_update_assignment_changes(
|
||||
or update.task.assigned_agent_id == update.previous_assigned
|
||||
):
|
||||
return
|
||||
if (
|
||||
update.actor.actor_type == "agent"
|
||||
and update.actor.agent
|
||||
and update.task.assigned_agent_id == update.actor.agent.id
|
||||
):
|
||||
return
|
||||
assigned_agent = await Agent.objects.by_id(update.task.assigned_agent_id).first(
|
||||
session,
|
||||
)
|
||||
@@ -2349,6 +2519,28 @@ async def _notify_task_update_assignment_changes(
|
||||
if update.task.board_id
|
||||
else None
|
||||
)
|
||||
if (
|
||||
update.previous_status == "review"
|
||||
and update.task.status == "inbox"
|
||||
and update.actor.actor_type == "agent"
|
||||
and update.actor.agent
|
||||
and update.actor.agent.is_board_lead
|
||||
):
|
||||
if board:
|
||||
await _notify_agent_on_task_rework(
|
||||
session=session,
|
||||
board=board,
|
||||
task=update.task,
|
||||
agent=assigned_agent,
|
||||
lead=update.actor.agent,
|
||||
)
|
||||
return
|
||||
if (
|
||||
update.actor.actor_type == "agent"
|
||||
and update.actor.agent
|
||||
and update.task.assigned_agent_id == update.actor.agent.id
|
||||
):
|
||||
return
|
||||
if board:
|
||||
await _notify_agent_on_task_assign(
|
||||
session=session,
|
||||
@@ -2406,6 +2598,7 @@ async def _finalize_updated_task(
|
||||
review_comment_since,
|
||||
):
|
||||
raise _comment_validation_error()
|
||||
await _assign_review_task_to_lead(session, update=update)
|
||||
|
||||
if update.tag_ids is not None:
|
||||
normalized = (
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
@@ -11,6 +12,7 @@ from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
from app.api import tasks as tasks_api
|
||||
from app.api.deps import ActorContext
|
||||
from app.core.time import utcnow
|
||||
from app.models.activity_events import ActivityEvent
|
||||
from app.models.agents import Agent
|
||||
from app.models.boards import Board
|
||||
from app.models.gateways import Gateway
|
||||
@@ -326,7 +328,7 @@ async def test_non_lead_agent_forbidden_for_lead_only_patch_fields() -> None:
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
|
||||
async def test_non_lead_agent_moves_task_to_review_and_reassigns_to_lead() -> None:
|
||||
engine = await _make_engine()
|
||||
try:
|
||||
async with await _make_session(engine) as session:
|
||||
@@ -334,6 +336,7 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
|
||||
board_id = uuid4()
|
||||
gateway_id = uuid4()
|
||||
worker_id = uuid4()
|
||||
lead_id = uuid4()
|
||||
task_id = uuid4()
|
||||
in_progress_at = utcnow()
|
||||
|
||||
@@ -365,6 +368,16 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
|
||||
status="online",
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Agent(
|
||||
id=lead_id,
|
||||
name="Lead Agent",
|
||||
board_id=board_id,
|
||||
gateway_id=gateway_id,
|
||||
status="online",
|
||||
is_board_lead=True,
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Task(
|
||||
id=task_id,
|
||||
@@ -391,7 +404,7 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
|
||||
)
|
||||
|
||||
assert updated.status == "review"
|
||||
assert updated.assigned_agent_id is None
|
||||
assert updated.assigned_agent_id == lead_id
|
||||
assert updated.in_progress_at is None
|
||||
|
||||
refreshed_task = (
|
||||
@@ -399,6 +412,264 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
|
||||
).first()
|
||||
assert refreshed_task is not None
|
||||
assert refreshed_task.previous_in_progress_at == in_progress_at
|
||||
assert refreshed_task.assigned_agent_id == lead_id
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_lead_agent_move_to_review_reassigns_to_lead_and_sends_review_message(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
engine = await _make_engine()
|
||||
try:
|
||||
async with await _make_session(engine) as session:
|
||||
org_id = uuid4()
|
||||
board_id = uuid4()
|
||||
gateway_id = uuid4()
|
||||
worker_id = uuid4()
|
||||
lead_id = uuid4()
|
||||
task_id = uuid4()
|
||||
|
||||
session.add(Organization(id=org_id, name="org"))
|
||||
session.add(
|
||||
Gateway(
|
||||
id=gateway_id,
|
||||
organization_id=org_id,
|
||||
name="gateway",
|
||||
url="https://gateway.local",
|
||||
workspace_root="/tmp/workspace",
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Board(
|
||||
id=board_id,
|
||||
organization_id=org_id,
|
||||
name="board",
|
||||
slug="board",
|
||||
gateway_id=gateway_id,
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Agent(
|
||||
id=worker_id,
|
||||
name="worker",
|
||||
board_id=board_id,
|
||||
gateway_id=gateway_id,
|
||||
status="online",
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Agent(
|
||||
id=lead_id,
|
||||
name="Lead Agent",
|
||||
board_id=board_id,
|
||||
gateway_id=gateway_id,
|
||||
status="online",
|
||||
is_board_lead=True,
|
||||
openclaw_session_id="lead-session",
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Task(
|
||||
id=task_id,
|
||||
board_id=board_id,
|
||||
title="assigned task",
|
||||
description="done and ready",
|
||||
status="in_progress",
|
||||
assigned_agent_id=worker_id,
|
||||
in_progress_at=utcnow(),
|
||||
),
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
sent: dict[str, str] = {}
|
||||
|
||||
class _FakeDispatch:
|
||||
def __init__(self, _session: AsyncSession) -> None:
|
||||
pass
|
||||
|
||||
async def optional_gateway_config_for_board(self, _board: Board) -> object:
|
||||
return object()
|
||||
|
||||
async def _fake_send_agent_task_message(
|
||||
*,
|
||||
dispatch: Any,
|
||||
session_key: str,
|
||||
config: Any,
|
||||
agent_name: str,
|
||||
message: str,
|
||||
) -> None:
|
||||
_ = dispatch, config
|
||||
sent["session_key"] = session_key
|
||||
sent["agent_name"] = agent_name
|
||||
sent["message"] = message
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(tasks_api, "GatewayDispatchService", _FakeDispatch)
|
||||
monkeypatch.setattr(tasks_api, "_send_agent_task_message", _fake_send_agent_task_message)
|
||||
|
||||
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
|
||||
assert task is not None
|
||||
actor = (await session.exec(select(Agent).where(col(Agent.id) == worker_id))).first()
|
||||
assert actor is not None
|
||||
|
||||
updated = await tasks_api.update_task(
|
||||
payload=TaskUpdate(status="review", comment="Moving to review."),
|
||||
task=task,
|
||||
session=session,
|
||||
actor=ActorContext(actor_type="agent", agent=actor),
|
||||
)
|
||||
|
||||
assert updated.status == "review"
|
||||
assert updated.assigned_agent_id == lead_id
|
||||
assert sent["session_key"] == "lead-session"
|
||||
assert sent["agent_name"] == "Lead Agent"
|
||||
assert "TASK READY FOR LEAD REVIEW" in sent["message"]
|
||||
assert "review the deliverables" in sent["message"]
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_lead_moves_review_task_to_inbox_and_reassigns_last_worker_with_rework_message(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
engine = await _make_engine()
|
||||
try:
|
||||
async with await _make_session(engine) as session:
|
||||
org_id = uuid4()
|
||||
board_id = uuid4()
|
||||
gateway_id = uuid4()
|
||||
worker_id = uuid4()
|
||||
lead_id = uuid4()
|
||||
task_id = uuid4()
|
||||
|
||||
session.add(Organization(id=org_id, name="org"))
|
||||
session.add(
|
||||
Gateway(
|
||||
id=gateway_id,
|
||||
organization_id=org_id,
|
||||
name="gateway",
|
||||
url="https://gateway.local",
|
||||
workspace_root="/tmp/workspace",
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Board(
|
||||
id=board_id,
|
||||
organization_id=org_id,
|
||||
name="board",
|
||||
slug="board",
|
||||
gateway_id=gateway_id,
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Agent(
|
||||
id=worker_id,
|
||||
name="worker",
|
||||
board_id=board_id,
|
||||
gateway_id=gateway_id,
|
||||
status="online",
|
||||
openclaw_session_id="worker-session",
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Agent(
|
||||
id=lead_id,
|
||||
name="Lead Agent",
|
||||
board_id=board_id,
|
||||
gateway_id=gateway_id,
|
||||
status="online",
|
||||
is_board_lead=True,
|
||||
openclaw_session_id="lead-session",
|
||||
),
|
||||
)
|
||||
session.add(
|
||||
Task(
|
||||
id=task_id,
|
||||
board_id=board_id,
|
||||
title="assigned task",
|
||||
description="ready",
|
||||
status="in_progress",
|
||||
assigned_agent_id=worker_id,
|
||||
in_progress_at=utcnow(),
|
||||
),
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
sent: list[dict[str, str]] = []
|
||||
|
||||
class _FakeDispatch:
|
||||
def __init__(self, _session: AsyncSession) -> None:
|
||||
pass
|
||||
|
||||
async def optional_gateway_config_for_board(self, _board: Board) -> object:
|
||||
return object()
|
||||
|
||||
async def _fake_send_agent_task_message(
|
||||
*,
|
||||
dispatch: Any,
|
||||
session_key: str,
|
||||
config: Any,
|
||||
agent_name: str,
|
||||
message: str,
|
||||
) -> None:
|
||||
_ = dispatch, config
|
||||
sent.append(
|
||||
{
|
||||
"session_key": session_key,
|
||||
"agent_name": agent_name,
|
||||
"message": message,
|
||||
},
|
||||
)
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(tasks_api, "GatewayDispatchService", _FakeDispatch)
|
||||
monkeypatch.setattr(tasks_api, "_send_agent_task_message", _fake_send_agent_task_message)
|
||||
|
||||
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
|
||||
assert task is not None
|
||||
worker = (await session.exec(select(Agent).where(col(Agent.id) == worker_id))).first()
|
||||
assert worker is not None
|
||||
lead = (await session.exec(select(Agent).where(col(Agent.id) == lead_id))).first()
|
||||
assert lead is not None
|
||||
|
||||
moved_to_review = await tasks_api.update_task(
|
||||
payload=TaskUpdate(status="review", comment="Ready for review."),
|
||||
task=task,
|
||||
session=session,
|
||||
actor=ActorContext(actor_type="agent", agent=worker),
|
||||
)
|
||||
assert moved_to_review.status == "review"
|
||||
assert moved_to_review.assigned_agent_id == lead_id
|
||||
|
||||
session.add(
|
||||
ActivityEvent(
|
||||
event_type="task.comment",
|
||||
task_id=task_id,
|
||||
agent_id=lead_id,
|
||||
message="Please update error handling and add tests for edge cases.",
|
||||
),
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
review_task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
|
||||
assert review_task is not None
|
||||
reverted = await tasks_api.update_task(
|
||||
payload=TaskUpdate(status="inbox"),
|
||||
task=review_task,
|
||||
session=session,
|
||||
actor=ActorContext(actor_type="agent", agent=lead),
|
||||
)
|
||||
|
||||
assert reverted.status == "inbox"
|
||||
assert reverted.assigned_agent_id == worker_id
|
||||
worker_messages = [item for item in sent if item["session_key"] == "worker-session"]
|
||||
assert worker_messages
|
||||
final_message = worker_messages[-1]["message"]
|
||||
assert "CHANGES REQUESTED" in final_message
|
||||
assert "Please update error handling and add tests for edge cases." in final_message
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user