feat(metrics): extend time range options and update related metrics handling

This commit is contained in:
Abhimanyu Saharan
2026-02-12 19:29:58 +05:30
parent a2ab8b43b3
commit 8bd606a8dc
11 changed files with 368 additions and 127 deletions

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Literal
from uuid import UUID
from fastapi import APIRouter, Depends, Query
@@ -21,8 +20,10 @@ from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.tasks import Task
from app.schemas.metrics import (
DashboardBucketKey,
DashboardKpis,
DashboardMetrics,
DashboardRangeKey,
DashboardRangeSeries,
DashboardSeriesPoint,
DashboardSeriesSet,
@@ -34,7 +35,6 @@ from app.services.organizations import OrganizationContext, list_accessible_boar
router = APIRouter(prefix="/metrics", tags=["metrics"])
OFFLINE_AFTER = timedelta(minutes=10)
ERROR_EVENT_PATTERN = "%failed"
_RUNTIME_TYPE_REFERENCES = (UUID, AsyncSession)
RANGE_QUERY = Query(default="24h")
@@ -46,46 +46,77 @@ ORG_MEMBER_DEP = Depends(require_org_member)
class RangeSpec:
"""Resolved time-range specification for metric aggregation."""
key: Literal["24h", "7d"]
key: DashboardRangeKey
start: datetime
end: datetime
bucket: Literal["hour", "day"]
bucket: DashboardBucketKey
duration: timedelta
def _resolve_range(range_key: Literal["24h", "7d"]) -> RangeSpec:
def _resolve_range(range_key: DashboardRangeKey) -> RangeSpec:
now = utcnow()
if range_key == "7d":
return RangeSpec(
key="7d",
start=now - timedelta(days=7),
end=now,
bucket="day",
)
specs: dict[DashboardRangeKey, tuple[timedelta, DashboardBucketKey]] = {
"24h": (timedelta(hours=24), "hour"),
"3d": (timedelta(days=3), "day"),
"7d": (timedelta(days=7), "day"),
"14d": (timedelta(days=14), "day"),
"1m": (timedelta(days=30), "day"),
"3m": (timedelta(days=90), "week"),
"6m": (timedelta(days=180), "week"),
"1y": (timedelta(days=365), "month"),
}
duration, bucket = specs[range_key]
return RangeSpec(
key="24h",
start=now - timedelta(hours=24),
key=range_key,
start=now - duration,
end=now,
bucket="hour",
bucket=bucket,
duration=duration,
)
def _comparison_range(range_key: Literal["24h", "7d"]) -> RangeSpec:
return _resolve_range("7d" if range_key == "24h" else "24h")
def _comparison_range(range_spec: RangeSpec) -> RangeSpec:
return RangeSpec(
key=range_spec.key,
start=range_spec.start - range_spec.duration,
end=range_spec.end - range_spec.duration,
bucket=range_spec.bucket,
duration=range_spec.duration,
)
def _bucket_start(value: datetime, bucket: Literal["hour", "day"]) -> datetime:
def _bucket_start(value: datetime, bucket: DashboardBucketKey) -> datetime:
normalized = value.replace(hour=0, minute=0, second=0, microsecond=0)
if bucket == "month":
return normalized.replace(day=1)
if bucket == "week":
return normalized - timedelta(days=normalized.weekday())
if bucket == "day":
return value.replace(hour=0, minute=0, second=0, microsecond=0)
return normalized
return value.replace(minute=0, second=0, microsecond=0)
def _next_bucket(cursor: datetime, bucket: DashboardBucketKey) -> datetime:
if bucket == "hour":
return cursor + timedelta(hours=1)
if bucket == "day":
return cursor + timedelta(days=1)
if bucket == "week":
return cursor + timedelta(days=7)
next_month = cursor.month + 1
next_year = cursor.year
if next_month > 12:
next_month = 1
next_year += 1
return cursor.replace(year=next_year, month=next_month, day=1)
def _build_buckets(range_spec: RangeSpec) -> list[datetime]:
cursor = _bucket_start(range_spec.start, range_spec.bucket)
step = timedelta(days=1) if range_spec.bucket == "day" else timedelta(hours=1)
buckets: list[datetime] = []
while cursor <= range_spec.end:
buckets.append(cursor)
cursor += step
cursor = _next_bucket(cursor, range_spec.bucket)
return buckets
@@ -117,6 +148,7 @@ def _wip_series_from_mapping(
inbox=values.get("inbox", 0),
in_progress=values.get("in_progress", 0),
review=values.get("review", 0),
done=values.get("done", 0),
),
)
return DashboardWipRangeSeries(
@@ -215,50 +247,69 @@ async def _query_wip(
range_spec: RangeSpec,
board_ids: list[UUID],
) -> DashboardWipRangeSeries:
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
inbox_case = case((col(Task.status) == "inbox", 1), else_=0)
if not board_ids:
return _wip_series_from_mapping(range_spec, {})
inbox_bucket_col = func.date_trunc(range_spec.bucket, Task.created_at).label(
"inbox_bucket"
)
inbox_statement = (
select(inbox_bucket_col, func.count())
.where(col(Task.status) == "inbox")
.where(col(Task.created_at) >= range_spec.start)
.where(col(Task.created_at) <= range_spec.end)
.where(col(Task.board_id).in_(board_ids))
.group_by(inbox_bucket_col)
.order_by(inbox_bucket_col)
)
inbox_results = (await session.exec(inbox_statement)).all()
status_bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label(
"status_bucket"
)
progress_case = case((col(Task.status) == "in_progress", 1), else_=0)
review_case = case((col(Task.status) == "review", 1), else_=0)
statement = (
done_case = case((col(Task.status) == "done", 1), else_=0)
status_statement = (
select(
bucket_col,
func.sum(inbox_case),
status_bucket_col,
func.sum(progress_case),
func.sum(review_case),
func.sum(done_case),
)
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
.where(col(Task.board_id).in_(board_ids))
.group_by(status_bucket_col)
.order_by(status_bucket_col)
)
if not board_ids:
return _wip_series_from_mapping(range_spec, {})
statement = (
statement.where(col(Task.board_id).in_(board_ids)).group_by(bucket_col).order_by(bucket_col)
)
results = (await session.exec(statement)).all()
status_results = (await session.exec(status_statement)).all()
mapping: dict[datetime, dict[str, int]] = {}
for bucket, inbox, in_progress, review in results:
mapping[bucket] = {
"inbox": int(inbox or 0),
"in_progress": int(in_progress or 0),
"review": int(review or 0),
}
for bucket, inbox in inbox_results:
values = mapping.setdefault(bucket, {})
values["inbox"] = int(inbox or 0)
for bucket, in_progress, review, done in status_results:
values = mapping.setdefault(bucket, {})
values["in_progress"] = int(in_progress or 0)
values["review"] = int(review or 0)
values["done"] = int(done or 0)
return _wip_series_from_mapping(range_spec, mapping)
async def _median_cycle_time_7d(
async def _median_cycle_time_for_range(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> float | None:
now = utcnow()
start = now - timedelta(days=7)
in_progress = sql_cast(Task.in_progress_at, DateTime)
duration_hours = func.extract("epoch", Task.updated_at - in_progress) / 3600.0
statement = (
select(func.percentile_cont(0.5).within_group(duration_hours))
.where(col(Task.status) == "review")
.where(col(Task.in_progress_at).is_not(None))
.where(col(Task.updated_at) >= start)
.where(col(Task.updated_at) <= now)
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
)
if not board_ids:
return None
@@ -303,11 +354,15 @@ async def _error_rate_kpi(
return (error_count / total_count) * 100 if total_count > 0 else 0.0
async def _active_agents(session: AsyncSession, board_ids: list[UUID]) -> int:
threshold = utcnow() - OFFLINE_AFTER
async def _active_agents(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> int:
statement = select(func.count()).where(
col(Agent.last_seen_at).is_not(None),
col(Agent.last_seen_at) >= threshold,
col(Agent.last_seen_at) >= range_spec.start,
col(Agent.last_seen_at) <= range_spec.end,
)
if not board_ids:
return 0
@@ -316,12 +371,18 @@ async def _active_agents(session: AsyncSession, board_ids: list[UUID]) -> int:
return int(result)
async def _tasks_in_progress(session: AsyncSession, board_ids: list[UUID]) -> int:
async def _tasks_in_progress(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> int:
if not board_ids:
return 0
statement = (
select(func.count())
.where(col(Task.status) == "in_progress")
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
.where(col(Task.board_id).in_(board_ids))
)
result = (await session.exec(statement)).one()
@@ -330,13 +391,13 @@ async def _tasks_in_progress(session: AsyncSession, board_ids: list[UUID]) -> in
@router.get("/dashboard", response_model=DashboardMetrics)
async def dashboard_metrics(
range_key: Literal["24h", "7d"] = RANGE_QUERY,
range_key: DashboardRangeKey = RANGE_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> DashboardMetrics:
"""Return dashboard KPIs and time-series data for accessible boards."""
primary = _resolve_range(range_key)
comparison = _comparison_range(range_key)
comparison = _comparison_range(primary)
board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False)
throughput_primary = await _query_throughput(session, primary, board_ids)
@@ -365,10 +426,14 @@ async def dashboard_metrics(
)
kpis = DashboardKpis(
active_agents=await _active_agents(session, board_ids),
tasks_in_progress=await _tasks_in_progress(session, board_ids),
active_agents=await _active_agents(session, primary, board_ids),
tasks_in_progress=await _tasks_in_progress(session, primary, board_ids),
error_rate_pct=await _error_rate_kpi(session, primary, board_ids),
median_cycle_time_hours_7d=await _median_cycle_time_7d(session, board_ids),
median_cycle_time_hours_7d=await _median_cycle_time_for_range(
session,
primary,
board_ids,
),
)
return DashboardMetrics(