redesigned dashboard page

This commit is contained in:
Abhimanyu Saharan
2026-03-04 16:01:56 +05:30
parent a30e59de60
commit bdc9fc3f01
142 changed files with 8908 additions and 7773 deletions

View File

@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import asc, desc, func
from sqlalchemy import and_, asc, desc, func, or_
from sqlmodel import col, select
from sse_starlette.sse import EventSourceResponse
@@ -78,6 +78,46 @@ def _agent_role(agent: Agent | None) -> str | None:
return None
def _build_activity_route(
*,
event: ActivityEvent,
board_id: UUID | None,
) -> tuple[str, dict[str, str]]:
if board_id is not None:
board_id_str = str(board_id)
board_params = {"boardId": board_id_str}
if event.event_type == "task.comment" and event.task_id is not None:
return (
"board",
{
**board_params,
"taskId": str(event.task_id),
"commentId": str(event.id),
},
)
if event.event_type.startswith("approval."):
return ("board.approvals", board_params)
if event.event_type.startswith("board."):
return ("board", {**board_params, "panel": "chat"})
if event.task_id is not None:
return ("board", {**board_params, "taskId": str(event.task_id)})
return ("board", board_params)
fallback_params = {
"eventId": str(event.id),
"eventType": event.event_type,
"createdAt": event.created_at.isoformat(),
}
if event.task_id is not None:
fallback_params["taskId"] = str(event.task_id)
return ("activity", fallback_params)
def _feed_item(
event: ActivityEvent,
task: Task,
@@ -141,6 +181,46 @@ def _coerce_task_comment_rows(
return rows
def _coerce_activity_rows(
items: Sequence[Any],
) -> list[tuple[ActivityEvent, UUID | None, UUID | None]]:
rows: list[tuple[ActivityEvent, UUID | None, UUID | None]] = []
for item in items:
first: Any
second: Any
third: Any
if isinstance(item, tuple):
if len(item) != 3:
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
first, second, third = item
else:
try:
row_len = len(item)
first = item[0]
second = item[1]
third = item[2]
except (IndexError, KeyError, TypeError):
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg) from None
if row_len != 3:
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
if not isinstance(first, ActivityEvent):
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
if second is not None and not isinstance(second, UUID):
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
if third is not None and not isinstance(third, UUID):
msg = "Expected (ActivityEvent, event_board_id, task_board_id) rows"
raise TypeError(msg)
rows.append((first, second, third))
return rows
async def _fetch_task_comment_events(
session: AsyncSession,
since: datetime,
@@ -168,9 +248,16 @@ async def list_activity(
actor: ActorContext = ACTOR_DEP,
) -> LimitOffsetPage[ActivityEventRead]:
"""List activity events visible to the calling actor."""
statement = select(ActivityEvent)
statement: Any = (
select(
ActivityEvent,
col(ActivityEvent.board_id).label("event_board_id"),
col(Task.board_id).label("task_board_id"),
)
.outerjoin(Task, col(ActivityEvent.task_id) == col(Task.id))
)
if actor.actor_type == "agent" and actor.agent:
statement = statement.where(ActivityEvent.agent_id == actor.agent.id)
statement = statement.where(col(ActivityEvent.agent_id) == actor.agent.id)
elif actor.actor_type == "user" and actor.user:
member = await get_active_membership(session, actor.user)
if member is None:
@@ -179,12 +266,34 @@ async def list_activity(
if not board_ids:
statement = statement.where(col(ActivityEvent.id).is_(None))
else:
statement = statement.join(
Task,
col(ActivityEvent.task_id) == col(Task.id),
).where(col(Task.board_id).in_(board_ids))
statement = statement.where(
or_(
col(ActivityEvent.board_id).in_(board_ids),
and_(
col(ActivityEvent.board_id).is_(None),
col(Task.board_id).in_(board_ids),
),
),
)
statement = statement.order_by(desc(col(ActivityEvent.created_at)))
return await paginate(session, statement)
def _transform(items: Sequence[Any]) -> Sequence[Any]:
rows = _coerce_activity_rows(items)
events: list[ActivityEventRead] = []
for event, event_board_id, task_board_id in rows:
payload = ActivityEventRead.model_validate(event, from_attributes=True)
resolved_board_id = event_board_id or task_board_id
payload.board_id = resolved_board_id
route_name, route_params = _build_activity_route(
event=event,
board_id=resolved_board_id,
)
payload.route_name = route_name
payload.route_params = route_params
events.append(payload)
return events
return await paginate(session, statement, transformer=_transform)
@router.get(

View File

@@ -742,6 +742,7 @@ async def create_task(
task_id=task.id,
message=f"Task created by lead: {task.title}.",
agent_id=agent_ctx.agent.id,
board_id=task.board_id,
)
await session.commit()
if task.assigned_agent_id:

View File

@@ -266,6 +266,7 @@ async def _notify_lead_on_approval_resolution(
message=f"Lead agent notified for {approval.status} approval {approval.id}.",
agent_id=lead.id,
task_id=approval.task_id,
board_id=approval.board_id,
)
else:
record_activity(
@@ -274,6 +275,7 @@ async def _notify_lead_on_approval_resolution(
message=f"Lead notify failed for approval {approval.id}: {error}",
agent_id=lead.id,
task_id=approval.task_id,
board_id=approval.board_id,
)
await session.commit()

View File

@@ -345,6 +345,7 @@ async def _notify_agents_on_board_group_change(
f"{recipient_board.name} related to {board.name} and {group.name}."
),
agent_id=agent.id,
board_id=recipient_board.id,
)
else:
failed += 1
@@ -356,6 +357,7 @@ async def _notify_agents_on_board_group_change(
f"{recipient_board.name}: {error}"
),
agent_id=agent.id,
board_id=recipient_board.id,
)
if notified or failed:
@@ -441,6 +443,7 @@ async def _notify_lead_on_board_update(
event_type="board.lead_notified",
message=f"Lead agent notified for board update: {board.name}.",
agent_id=lead.id,
board_id=board.id,
)
else:
record_activity(
@@ -448,6 +451,7 @@ async def _notify_lead_on_board_update(
event_type="board.lead_notify_failed",
message=f"Lead board update notify failed for {board.name}: {error}",
agent_id=lead.id,
board_id=board.id,
)
await session.commit()

View File

@@ -18,12 +18,15 @@ from app.core.time import utcnow
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.approvals import Approval
from app.models.boards import Board
from app.models.tasks import Task
from app.schemas.metrics import (
DashboardBucketKey,
DashboardKpis,
DashboardMetrics,
DashboardPendingApproval,
DashboardPendingApprovals,
DashboardRangeKey,
DashboardRangeSeries,
DashboardSeriesPoint,
@@ -169,7 +172,7 @@ async def _query_throughput(
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
statement = (
select(bucket_col, func.count())
.where(col(Task.status) == "review")
.where(col(Task.status) == "done")
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
)
@@ -370,22 +373,87 @@ async def _active_agents(
return int(result)
async def _tasks_in_progress(
async def _task_status_counts(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> int:
) -> dict[str, int]:
if not board_ids:
return 0
return {
"inbox": 0,
"in_progress": 0,
"review": 0,
"done": 0,
}
statement = (
select(func.count())
.where(col(Task.status) == "in_progress")
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
select(col(Task.status), func.count())
.where(col(Task.board_id).in_(board_ids))
.group_by(col(Task.status))
)
result = (await session.exec(statement)).one()
return int(result)
results = (await session.exec(statement)).all()
counts = {
"inbox": 0,
"in_progress": 0,
"review": 0,
"done": 0,
}
for status_value, total in results:
key = str(status_value)
if key in counts:
counts[key] = int(total or 0)
return counts
async def _pending_approvals_snapshot(
session: AsyncSession,
board_ids: list[UUID],
*,
limit: int = 10,
) -> DashboardPendingApprovals:
if not board_ids:
return DashboardPendingApprovals(total=0, items=[])
total_statement = (
select(func.count(col(Approval.id)))
.where(col(Approval.board_id).in_(board_ids))
.where(col(Approval.status) == "pending")
)
total = int((await session.exec(total_statement)).one() or 0)
if total == 0:
return DashboardPendingApprovals(total=0, items=[])
rows = (
await session.exec(
select(
col(Approval.id),
col(Approval.board_id),
col(Board.name),
col(Approval.action_type),
col(Approval.confidence),
col(Approval.created_at),
col(Task.title),
)
.join(Board, col(Board.id) == col(Approval.board_id))
.outerjoin(Task, col(Task.id) == col(Approval.task_id))
.where(col(Approval.board_id).in_(board_ids))
.where(col(Approval.status) == "pending")
.order_by(col(Approval.created_at).desc())
.limit(limit)
)
).all()
items = [
DashboardPendingApproval(
approval_id=approval_id,
board_id=board_id,
board_name=board_name,
action_type=action_type,
confidence=float(confidence),
created_at=created_at,
task_title=task_title,
)
for approval_id, board_id, board_name, action_type, confidence, created_at, task_title in rows
]
return DashboardPendingApprovals(total=total, items=items)
async def _resolve_dashboard_board_ids(
@@ -461,10 +529,16 @@ async def dashboard_metrics(
primary=wip_primary,
comparison=wip_comparison,
)
task_status_counts = await _task_status_counts(session, board_ids)
pending_approvals = await _pending_approvals_snapshot(session, board_ids, limit=10)
kpis = DashboardKpis(
active_agents=await _active_agents(session, primary, board_ids),
tasks_in_progress=await _tasks_in_progress(session, primary, board_ids),
tasks_in_progress=task_status_counts["in_progress"],
inbox_tasks=task_status_counts["inbox"],
in_progress_tasks=task_status_counts["in_progress"],
review_tasks=task_status_counts["review"],
done_tasks=task_status_counts["done"],
error_rate_pct=await _error_rate_kpi(session, primary, board_ids),
median_cycle_time_hours_7d=await _median_cycle_time_for_range(
session,
@@ -481,4 +555,5 @@ async def dashboard_metrics(
cycle_time=cycle_time,
error_rate=error_rate,
wip=wip,
pending_approvals=pending_approvals,
)

View File

@@ -511,6 +511,7 @@ async def _reconcile_dependents_for_dependency_toggle(
"Task returned to inbox: dependency reopened " f"({dependency_task.title})."
),
agent_id=actor_agent_id,
board_id=dependent.board_id,
)
else:
record_activity(
@@ -519,6 +520,7 @@ async def _reconcile_dependents_for_dependency_toggle(
task_id=dependent.id,
message=f"Dependency completion changed: {dependency_task.title}.",
agent_id=actor_agent_id,
board_id=dependent.board_id,
)
else:
record_activity(
@@ -527,6 +529,7 @@ async def _reconcile_dependents_for_dependency_toggle(
task_id=dependent.id,
message=f"Dependency completion changed: {dependency_task.title}.",
agent_id=actor_agent_id,
board_id=dependent.board_id,
)
@@ -686,6 +689,7 @@ async def _notify_agent_on_task_assign(
message=f"Agent notified for assignment: {agent.name}.",
agent_id=agent.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()
else:
@@ -695,6 +699,7 @@ async def _notify_agent_on_task_assign(
message=f"Assignee notify failed: {error}",
agent_id=agent.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()
@@ -737,6 +742,7 @@ async def _notify_agent_on_task_rework(
message=f"Assignee notified about requested changes: {agent.name}.",
agent_id=agent.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()
else:
@@ -746,6 +752,7 @@ async def _notify_agent_on_task_rework(
message=f"Rework notify failed: {error}",
agent_id=agent.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()
@@ -810,6 +817,7 @@ async def _notify_lead_on_task_create(
message=f"Lead agent notified for task: {task.title}.",
agent_id=lead.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()
else:
@@ -819,6 +827,7 @@ async def _notify_lead_on_task_create(
message=f"Lead notify failed: {error}",
agent_id=lead.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()
@@ -867,6 +876,7 @@ async def _notify_lead_on_task_unassigned(
message=f"Lead notified task returned to inbox: {task.title}.",
agent_id=lead.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()
else:
@@ -876,6 +886,7 @@ async def _notify_lead_on_task_unassigned(
message=f"Lead notify failed: {error}",
agent_id=lead.id,
task_id=task.id,
board_id=board.id,
)
await session.commit()
@@ -1300,7 +1311,10 @@ def _task_event_payload(
resolved_custom_field_values_by_task_id = custom_field_values_by_task_id or {}
payload: dict[str, object] = {
"type": event.event_type,
"activity": ActivityEventRead.model_validate(event).model_dump(mode="json"),
"activity": ActivityEventRead.model_validate(event).model_dump(
mode="json",
exclude={"board_id", "route_name", "route_params"},
),
}
if event.event_type == "task.comment":
payload["comment"] = _serialize_comment(event)
@@ -1500,6 +1514,7 @@ async def create_task(
event_type="task.created",
task_id=task.id,
message=f"Task created: {task.title}.",
board_id=board.id,
)
await session.commit()
await _notify_lead_on_task_create(session=session, board=board, task=task)
@@ -2258,6 +2273,7 @@ async def _apply_lead_task_update(
task_id=update.task.id,
message=message,
agent_id=update.actor.agent.id,
board_id=update.board_id,
)
await _reconcile_dependents_for_dependency_toggle(
session,
@@ -2443,6 +2459,7 @@ async def _record_task_comment_from_update(
event_type="task.comment",
message=update.comment,
task_id=update.task.id,
board_id=update.task.board_id,
agent_id=(
update.actor.agent.id
if update.actor.actor_type == "agent" and update.actor.agent
@@ -2470,6 +2487,7 @@ async def _record_task_update_activity(
task_id=update.task.id,
message=message,
agent_id=actor_agent_id,
board_id=update.board_id,
)
await _reconcile_dependents_for_dependency_toggle(
session,
@@ -2669,6 +2687,7 @@ async def create_task_comment(
event_type="task.comment",
message=payload.message,
task_id=task.id,
board_id=task.board_id,
agent_id=_comment_actor_id(actor),
)
session.add(event)

View File

@@ -14,7 +14,7 @@ RUNTIME_ANNOTATION_TYPES = (datetime,)
class ActivityEvent(QueryModel, table=True):
"""Discrete activity event tied to tasks and agents."""
"""Discrete activity event tied to board/task/agent context."""
__tablename__ = "activity_events" # pyright: ignore[reportAssignmentType]
@@ -23,4 +23,5 @@ class ActivityEvent(QueryModel, table=True):
message: str | None = None
agent_id: UUID | None = Field(default=None, foreign_key="agents.id", index=True)
task_id: UUID | None = Field(default=None, foreign_key="tasks.id", index=True)
board_id: UUID | None = Field(default=None, foreign_key="boards.id", index=True)
created_at: datetime = Field(default_factory=utcnow)

View File

@@ -18,6 +18,9 @@ class ActivityEventRead(SQLModel):
message: str | None
agent_id: UUID | None
task_id: UUID | None
board_id: UUID | None = None
route_name: str | None = None
route_params: dict[str, str] | None = None
created_at: datetime

View File

@@ -4,10 +4,11 @@ from __future__ import annotations
from datetime import datetime
from typing import Literal
from uuid import UUID
from sqlmodel import SQLModel
RUNTIME_ANNOTATION_TYPES = (datetime,)
RUNTIME_ANNOTATION_TYPES = (datetime, UUID)
DashboardRangeKey = Literal["24h", "3d", "7d", "14d", "1m", "3m", "6m", "1y"]
DashboardBucketKey = Literal["hour", "day", "week", "month"]
@@ -64,10 +65,33 @@ class DashboardKpis(SQLModel):
active_agents: int
tasks_in_progress: int
inbox_tasks: int
in_progress_tasks: int
review_tasks: int
done_tasks: int
error_rate_pct: float
median_cycle_time_hours_7d: float | None
class DashboardPendingApproval(SQLModel):
"""Single pending approval item for cross-board dashboard listing."""
approval_id: UUID
board_id: UUID
board_name: str
action_type: str
confidence: float
created_at: datetime
task_title: str | None = None
class DashboardPendingApprovals(SQLModel):
"""Pending approval snapshot used on the dashboard."""
total: int
items: list[DashboardPendingApproval]
class DashboardMetrics(SQLModel):
"""Complete dashboard metrics response payload."""
@@ -78,3 +102,4 @@ class DashboardMetrics(SQLModel):
cycle_time: DashboardSeriesSet
error_rate: DashboardSeriesSet
wip: DashboardWipSeriesSet
pending_approvals: DashboardPendingApprovals

View File

@@ -19,6 +19,7 @@ def record_activity(
message: str,
agent_id: UUID | None = None,
task_id: UUID | None = None,
board_id: UUID | None = None,
) -> ActivityEvent:
"""Create and attach an activity event row to the current DB session."""
event = ActivityEvent(
@@ -26,6 +27,7 @@ def record_activity(
message=message,
agent_id=agent_id,
task_id=task_id,
board_id=board_id,
)
session.add(event)
return event

View File

@@ -91,6 +91,12 @@ async def delete_board(session: AsyncSession, *, board: Board) -> OkResponse:
col(TaskCustomFieldValue.task_id).in_(task_ids),
commit=False,
)
await crud.delete_where(
session,
ActivityEvent,
col(ActivityEvent.board_id) == board.id,
commit=False,
)
# Keep teardown ordered around FK/reference chains so dependent rows are gone
# before deleting their parent task/agent/board records.
await crud.delete_where(

View File

@@ -204,6 +204,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
event_type="agent.nudge.failed",
message=f"Nudge failed for {target.name}: {exc}",
agent_id=actor_agent.id,
board_id=board.id,
)
await self.session.commit()
self.logger.error(
@@ -233,6 +234,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
event_type="agent.nudge.sent",
message=f"Nudge sent to {target.name}.",
agent_id=actor_agent.id,
board_id=board.id,
)
await self.session.commit()
self.logger.info(
@@ -397,6 +399,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
event_type="agent.soul.updated",
message=note,
agent_id=actor_agent_id,
board_id=board.id,
)
await self.session.commit()
self.logger.info(
@@ -470,6 +473,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
event_type="gateway.lead.ask_user.failed",
message=f"Lead user question failed for {board.name}: {exc}",
agent_id=actor_agent.id,
board_id=board.id,
)
await self.session.commit()
self.logger.error(
@@ -501,6 +505,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
event_type="gateway.lead.ask_user.sent",
message=f"Lead requested user info via gateway agent for board: {board.name}.",
agent_id=actor_agent.id,
board_id=board.id,
)
main_agent = await Agent.objects.filter_by(gateway_id=gateway.id, board_id=None).first(
self.session,
@@ -595,6 +600,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
event_type="gateway.main.lead_message.failed",
message=f"Lead message failed for {board.name}: {exc}",
agent_id=actor_agent.id,
board_id=board.id,
)
await self.session.commit()
self.logger.error(
@@ -626,6 +632,7 @@ class GatewayCoordinationService(AbstractGatewayMessagingService):
event_type="gateway.main.lead_message.sent",
message=f"Sent {payload.kind} to lead for board: {board.name}.",
agent_id=actor_agent.id,
board_id=board.id,
)
await self.session.commit()
self.logger.info(

View File

@@ -942,6 +942,7 @@ class AgentLifecycleService(OpenClawDBService):
event_type="agent.heartbeat",
message=f"Heartbeat received from {agent.name}.",
agent_id=agent.id,
board_id=agent.board_id,
)
@staticmethod
@@ -957,6 +958,7 @@ class AgentLifecycleService(OpenClawDBService):
event_type=f"agent.{action}.failed",
message=f"{action_label} message failed: {error}",
agent_id=agent.id,
board_id=agent.board_id,
)
async def coerce_agent_create_payload(
@@ -1114,12 +1116,14 @@ class AgentLifecycleService(OpenClawDBService):
event_type=f"agent.{action}.direct",
message=f"{action.capitalize()}d directly for {provisioned.name}.",
agent_id=provisioned.id,
board_id=provisioned.board_id,
)
record_activity(
self.session,
event_type="agent.wakeup.sent",
message=f"Wakeup message sent to {provisioned.name}.",
agent_id=provisioned.id,
board_id=provisioned.board_id,
)
await self.session.commit()
self.logger.info(
@@ -1818,6 +1822,7 @@ class AgentLifecycleService(OpenClawDBService):
event_type="agent.delete.direct",
message=f"Deleted agent {agent.name}.",
agent_id=None,
board_id=agent.board_id,
)
now = utcnow()
await crud.update_where(

View File

@@ -0,0 +1,66 @@
"""add board_id to activity_events
Revision ID: a9b1c2d3e4f7
Revises: f1b2c3d4e5a6
Create Date: 2026-03-04 18:20:00.000000
"""
from __future__ import annotations
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "a9b1c2d3e4f7"
down_revision = "f1b2c3d4e5a6"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column("activity_events", sa.Column("board_id", sa.Uuid(), nullable=True))
op.execute(
"""
UPDATE activity_events AS ae
SET board_id = t.board_id
FROM tasks AS t
WHERE ae.task_id = t.id
AND ae.board_id IS NULL
"""
)
op.execute(
"""
UPDATE activity_events AS ae
SET board_id = a.board_id
FROM agents AS a
WHERE ae.agent_id = a.id
AND ae.board_id IS NULL
AND a.board_id IS NOT NULL
"""
)
op.create_foreign_key(
"fk_activity_events_board_id_boards",
"activity_events",
"boards",
["board_id"],
["id"],
ondelete="CASCADE",
)
op.create_index(
op.f("ix_activity_events_board_id"),
"activity_events",
["board_id"],
unique=False,
)
def downgrade() -> None:
op.drop_index(op.f("ix_activity_events_board_id"), table_name="activity_events")
op.drop_constraint(
"fk_activity_events_board_id_boards",
"activity_events",
type_="foreignkey",
)
op.drop_column("activity_events", "board_id")

View File

@@ -5,7 +5,7 @@ from uuid import uuid4
import pytest
from app.api.activity import _coerce_task_comment_rows
from app.api.activity import _build_activity_route, _coerce_activity_rows, _coerce_task_comment_rows
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.boards import Board
@@ -34,6 +34,25 @@ class _FakeSqlRow4:
raise IndexError(index)
@dataclass
class _FakeSqlRow3:
first: object
second: object
third: object
def __len__(self) -> int:
return 3
def __getitem__(self, index: int) -> object:
if index == 0:
return self.first
if index == 1:
return self.second
if index == 2:
return self.third
raise IndexError(index)
def _make_event() -> ActivityEvent:
return ActivityEvent(event_type="task.comment", message="hello")
@@ -87,3 +106,71 @@ def test_coerce_task_comment_rows_rejects_invalid_values():
match="Expected \\(ActivityEvent, Task, Board, Agent \\| None\\) rows",
):
_coerce_task_comment_rows([(uuid4(), task, board, None)])
def test_coerce_activity_rows_accepts_plain_tuple():
board_id = uuid4()
event = _make_event()
rows = _coerce_activity_rows([(event, board_id, None)])
assert rows == [(event, board_id, None)]
def test_coerce_activity_rows_accepts_row_like_values():
board_id = uuid4()
event = _make_event()
row = _FakeSqlRow3(event, board_id, None)
rows = _coerce_activity_rows([row])
assert rows == [(event, board_id, None)]
def test_coerce_activity_rows_rejects_invalid_values():
event = _make_event()
with pytest.raises(
TypeError,
match=(
"Expected \\(ActivityEvent, event_board_id, task_board_id\\) rows"
),
):
_coerce_activity_rows([(event, "bad", None)])
def test_build_activity_route_board_comment():
board_id = uuid4()
task_id = uuid4()
event = ActivityEvent(
event_type="task.comment",
task_id=task_id,
message="hello",
)
route_name, route_params = _build_activity_route(event=event, board_id=board_id)
assert route_name == "board"
assert route_params == {
"boardId": str(board_id),
"taskId": str(task_id),
"commentId": str(event.id),
}
def test_build_activity_route_board_approvals():
board_id = uuid4()
event = ActivityEvent(
event_type="approval.lead_notified",
message="hello",
)
route_name, route_params = _build_activity_route(event=event, board_id=board_id)
assert route_name == "board.approvals"
assert route_params == {"boardId": str(board_id)}
def test_build_activity_route_global_fallback():
event = ActivityEvent(
event_type="gateway.main.lead_broadcast.sent",
message="hello",
)
route_name, route_params = _build_activity_route(event=event, board_id=None)
assert route_name == "activity"
assert route_params["eventId"] == str(event.id)
assert route_params["eventType"] == event.event_type
assert route_params["createdAt"] == event.created_at.isoformat()

View File

@@ -62,6 +62,7 @@ async def test_delete_board_cleans_org_board_access_rows() -> None:
)
deleted_table_names = [statement.table.name for statement in session.executed]
assert "activity_events" in deleted_table_names
assert "organization_board_access" in deleted_table_names
assert "organization_invite_board_access" in deleted_table_names
assert "board_task_custom_fields" in deleted_table_names

View File

@@ -0,0 +1,130 @@
from __future__ import annotations
from datetime import datetime
from uuid import uuid4
import pytest
from app.api import metrics as metrics_api
class _ExecResult:
def __init__(self, rows: list[tuple[str, int]]) -> None:
self._rows = rows
def all(self) -> list[tuple[str, int]]:
return self._rows
class _FakeSession:
def __init__(self, rows: list[tuple[str, int]]) -> None:
self._rows = rows
async def exec(self, _statement: object) -> _ExecResult:
return _ExecResult(self._rows)
class _ExecOneResult:
def __init__(self, value: int) -> None:
self._value = value
def one(self) -> int:
return self._value
class _ExecAllResult:
def __init__(self, rows: list[tuple[object, ...]]) -> None:
self._rows = rows
def all(self) -> list[tuple[object, ...]]:
return self._rows
class _SequentialSession:
def __init__(self, responses: list[object]) -> None:
self._responses = responses
self._index = 0
async def exec(self, _statement: object) -> object:
response = self._responses[self._index]
self._index += 1
return response
@pytest.mark.asyncio
async def test_task_status_counts_returns_zeroes_for_empty_board_scope() -> None:
counts = await metrics_api._task_status_counts(_FakeSession([]), [])
assert counts == {
"inbox": 0,
"in_progress": 0,
"review": 0,
"done": 0,
}
@pytest.mark.asyncio
async def test_task_status_counts_maps_known_statuses() -> None:
session = _FakeSession(
[
("inbox", 4),
("in_progress", 3),
("review", 2),
("done", 7),
("blocked", 99),
],
)
counts = await metrics_api._task_status_counts(session, [uuid4()])
assert counts == {
"inbox": 4,
"in_progress": 3,
"review": 2,
"done": 7,
}
@pytest.mark.asyncio
async def test_pending_approvals_snapshot_returns_empty_for_empty_scope() -> None:
snapshot = await metrics_api._pending_approvals_snapshot(_SequentialSession([]), [])
assert snapshot.total == 0
assert snapshot.items == []
@pytest.mark.asyncio
async def test_pending_approvals_snapshot_maps_rows() -> None:
approval_id = uuid4()
board_id = uuid4()
created_at = datetime(2026, 3, 4, 12, 0, 0)
rows: list[tuple[object, ...]] = [
(
approval_id,
board_id,
"Operations Board",
"approve_task",
87.0,
created_at,
"Validate rollout checklist",
)
]
session = _SequentialSession(
[
_ExecOneResult(3),
_ExecAllResult(rows),
]
)
snapshot = await metrics_api._pending_approvals_snapshot(session, [board_id], limit=10)
assert snapshot.total == 3
assert len(snapshot.items) == 1
item = snapshot.items[0]
assert item.approval_id == approval_id
assert item.board_id == board_id
assert item.board_name == "Operations Board"
assert item.action_type == "approve_task"
assert item.confidence == 87.0
assert item.created_at == created_at
assert item.task_title == "Validate rollout checklist"