feat(metrics): Implement dashboard metrics API and integrate metrics chart components
This commit is contained in:
306
backend/app/api/metrics.py
Normal file
306
backend/app/api/metrics.py
Normal file
@@ -0,0 +1,306 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Literal
|
||||
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
from sqlalchemy import case, func
|
||||
from sqlmodel import Session, col, select
|
||||
|
||||
from app.api.deps import require_admin_auth
|
||||
from app.core.auth import AuthContext
|
||||
from app.db.session import get_session
|
||||
from app.models.activity_events import ActivityEvent
|
||||
from app.models.agents import Agent
|
||||
from app.models.tasks import Task
|
||||
from app.schemas.metrics import (
|
||||
DashboardKpis,
|
||||
DashboardMetrics,
|
||||
DashboardRangeSeries,
|
||||
DashboardSeriesPoint,
|
||||
DashboardSeriesSet,
|
||||
DashboardWipPoint,
|
||||
DashboardWipRangeSeries,
|
||||
DashboardWipSeriesSet,
|
||||
)
|
||||
|
||||
router = APIRouter(prefix="/metrics", tags=["metrics"])
|
||||
|
||||
OFFLINE_AFTER = timedelta(minutes=10)
|
||||
ERROR_EVENT_PATTERN = "%failed"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RangeSpec:
|
||||
key: Literal["24h", "7d"]
|
||||
start: datetime
|
||||
end: datetime
|
||||
bucket: Literal["hour", "day"]
|
||||
|
||||
|
||||
def _resolve_range(range_key: Literal["24h", "7d"]) -> RangeSpec:
|
||||
now = datetime.utcnow()
|
||||
if range_key == "7d":
|
||||
return RangeSpec(
|
||||
key="7d",
|
||||
start=now - timedelta(days=7),
|
||||
end=now,
|
||||
bucket="day",
|
||||
)
|
||||
return RangeSpec(
|
||||
key="24h",
|
||||
start=now - timedelta(hours=24),
|
||||
end=now,
|
||||
bucket="hour",
|
||||
)
|
||||
|
||||
|
||||
def _comparison_range(range_key: Literal["24h", "7d"]) -> RangeSpec:
|
||||
return _resolve_range("7d" if range_key == "24h" else "24h")
|
||||
|
||||
|
||||
def _bucket_start(value: datetime, bucket: Literal["hour", "day"]) -> datetime:
|
||||
if bucket == "day":
|
||||
return value.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
return value.replace(minute=0, second=0, microsecond=0)
|
||||
|
||||
|
||||
def _build_buckets(range_spec: RangeSpec) -> list[datetime]:
|
||||
cursor = _bucket_start(range_spec.start, range_spec.bucket)
|
||||
step = timedelta(days=1) if range_spec.bucket == "day" else timedelta(hours=1)
|
||||
buckets: list[datetime] = []
|
||||
while cursor <= range_spec.end:
|
||||
buckets.append(cursor)
|
||||
cursor += step
|
||||
return buckets
|
||||
|
||||
|
||||
def _series_from_mapping(
|
||||
range_spec: RangeSpec, mapping: dict[datetime, float]
|
||||
) -> DashboardRangeSeries:
|
||||
points = [
|
||||
DashboardSeriesPoint(period=bucket, value=float(mapping.get(bucket, 0)))
|
||||
for bucket in _build_buckets(range_spec)
|
||||
]
|
||||
return DashboardRangeSeries(
|
||||
range=range_spec.key,
|
||||
bucket=range_spec.bucket,
|
||||
points=points,
|
||||
)
|
||||
|
||||
|
||||
def _wip_series_from_mapping(
|
||||
range_spec: RangeSpec, mapping: dict[datetime, dict[str, int]]
|
||||
) -> DashboardWipRangeSeries:
|
||||
points: list[DashboardWipPoint] = []
|
||||
for bucket in _build_buckets(range_spec):
|
||||
values = mapping.get(bucket, {})
|
||||
points.append(
|
||||
DashboardWipPoint(
|
||||
period=bucket,
|
||||
inbox=values.get("inbox", 0),
|
||||
in_progress=values.get("in_progress", 0),
|
||||
review=values.get("review", 0),
|
||||
)
|
||||
)
|
||||
return DashboardWipRangeSeries(
|
||||
range=range_spec.key,
|
||||
bucket=range_spec.bucket,
|
||||
points=points,
|
||||
)
|
||||
|
||||
|
||||
def _query_throughput(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
|
||||
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
|
||||
statement = (
|
||||
select(bucket_col, func.count(Task.id))
|
||||
.where(col(Task.status) == "review")
|
||||
.where(col(Task.updated_at) >= range_spec.start)
|
||||
.where(col(Task.updated_at) <= range_spec.end)
|
||||
.group_by(bucket_col)
|
||||
.order_by(bucket_col)
|
||||
)
|
||||
results = session.exec(statement).all()
|
||||
mapping = {row[0]: float(row[1]) for row in results}
|
||||
return _series_from_mapping(range_spec, mapping)
|
||||
|
||||
|
||||
def _query_cycle_time(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
|
||||
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
|
||||
duration_hours = func.extract(
|
||||
"epoch", Task.updated_at - Task.in_progress_at
|
||||
) / 3600.0
|
||||
statement = (
|
||||
select(bucket_col, func.avg(duration_hours))
|
||||
.where(col(Task.status) == "review")
|
||||
.where(col(Task.in_progress_at).is_not(None))
|
||||
.where(col(Task.updated_at) >= range_spec.start)
|
||||
.where(col(Task.updated_at) <= range_spec.end)
|
||||
.group_by(bucket_col)
|
||||
.order_by(bucket_col)
|
||||
)
|
||||
results = session.exec(statement).all()
|
||||
mapping = {row[0]: float(row[1] or 0) for row in results}
|
||||
return _series_from_mapping(range_spec, mapping)
|
||||
|
||||
|
||||
def _query_error_rate(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
|
||||
bucket_col = func.date_trunc(range_spec.bucket, ActivityEvent.created_at).label(
|
||||
"bucket"
|
||||
)
|
||||
error_case = case(
|
||||
(
|
||||
col(ActivityEvent.event_type).like(ERROR_EVENT_PATTERN),
|
||||
1,
|
||||
),
|
||||
else_=0,
|
||||
)
|
||||
statement = (
|
||||
select(bucket_col, func.sum(error_case), func.count(ActivityEvent.id))
|
||||
.where(col(ActivityEvent.created_at) >= range_spec.start)
|
||||
.where(col(ActivityEvent.created_at) <= range_spec.end)
|
||||
.group_by(bucket_col)
|
||||
.order_by(bucket_col)
|
||||
)
|
||||
results = session.exec(statement).all()
|
||||
mapping: dict[datetime, float] = {}
|
||||
for bucket, errors, total in results:
|
||||
total_count = float(total or 0)
|
||||
error_count = float(errors or 0)
|
||||
rate = (error_count / total_count) * 100 if total_count > 0 else 0.0
|
||||
mapping[bucket] = rate
|
||||
return _series_from_mapping(range_spec, mapping)
|
||||
|
||||
|
||||
def _query_wip(session: Session, range_spec: RangeSpec) -> DashboardWipRangeSeries:
|
||||
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
|
||||
inbox_case = case((col(Task.status) == "inbox", 1), else_=0)
|
||||
progress_case = case((col(Task.status) == "in_progress", 1), else_=0)
|
||||
review_case = case((col(Task.status) == "review", 1), else_=0)
|
||||
statement = (
|
||||
select(
|
||||
bucket_col,
|
||||
func.sum(inbox_case),
|
||||
func.sum(progress_case),
|
||||
func.sum(review_case),
|
||||
)
|
||||
.where(col(Task.updated_at) >= range_spec.start)
|
||||
.where(col(Task.updated_at) <= range_spec.end)
|
||||
.group_by(bucket_col)
|
||||
.order_by(bucket_col)
|
||||
)
|
||||
results = session.exec(statement).all()
|
||||
mapping: dict[datetime, dict[str, int]] = {}
|
||||
for bucket, inbox, in_progress, review in results:
|
||||
mapping[bucket] = {
|
||||
"inbox": int(inbox or 0),
|
||||
"in_progress": int(in_progress or 0),
|
||||
"review": int(review or 0),
|
||||
}
|
||||
return _wip_series_from_mapping(range_spec, mapping)
|
||||
|
||||
|
||||
def _median_cycle_time_7d(session: Session) -> float | None:
|
||||
now = datetime.utcnow()
|
||||
start = now - timedelta(days=7)
|
||||
duration_hours = func.extract(
|
||||
"epoch", Task.updated_at - Task.in_progress_at
|
||||
) / 3600.0
|
||||
statement = (
|
||||
select(func.percentile_cont(0.5).within_group(duration_hours))
|
||||
.where(col(Task.status) == "review")
|
||||
.where(col(Task.in_progress_at).is_not(None))
|
||||
.where(col(Task.updated_at) >= start)
|
||||
.where(col(Task.updated_at) <= now)
|
||||
)
|
||||
value = session.exec(statement).one_or_none()
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, tuple):
|
||||
value = value[0]
|
||||
if value is None:
|
||||
return None
|
||||
return float(value)
|
||||
|
||||
|
||||
def _error_rate_kpi(session: Session, range_spec: RangeSpec) -> float:
|
||||
error_case = case(
|
||||
(
|
||||
col(ActivityEvent.event_type).like(ERROR_EVENT_PATTERN),
|
||||
1,
|
||||
),
|
||||
else_=0,
|
||||
)
|
||||
statement = (
|
||||
select(func.sum(error_case), func.count(ActivityEvent.id))
|
||||
.where(col(ActivityEvent.created_at) >= range_spec.start)
|
||||
.where(col(ActivityEvent.created_at) <= range_spec.end)
|
||||
)
|
||||
result = session.exec(statement).one_or_none()
|
||||
if result is None:
|
||||
return 0.0
|
||||
errors, total = result
|
||||
total_count = float(total or 0)
|
||||
error_count = float(errors or 0)
|
||||
return (error_count / total_count) * 100 if total_count > 0 else 0.0
|
||||
|
||||
|
||||
def _active_agents(session: Session) -> int:
|
||||
threshold = datetime.utcnow() - OFFLINE_AFTER
|
||||
statement = select(func.count(Agent.id)).where(
|
||||
col(Agent.last_seen_at).is_not(None),
|
||||
col(Agent.last_seen_at) >= threshold,
|
||||
)
|
||||
result = session.exec(statement).one()
|
||||
return int(result)
|
||||
|
||||
|
||||
def _tasks_in_progress(session: Session) -> int:
|
||||
statement = select(func.count(Task.id)).where(col(Task.status) == "in_progress")
|
||||
result = session.exec(statement).one()
|
||||
return int(result)
|
||||
|
||||
|
||||
@router.get("/dashboard", response_model=DashboardMetrics)
|
||||
def dashboard_metrics(
|
||||
range: Literal["24h", "7d"] = Query(default="24h"),
|
||||
session: Session = Depends(get_session),
|
||||
auth: AuthContext = Depends(require_admin_auth),
|
||||
) -> DashboardMetrics:
|
||||
primary = _resolve_range(range)
|
||||
comparison = _comparison_range(range)
|
||||
|
||||
throughput = DashboardSeriesSet(
|
||||
primary=_query_throughput(session, primary),
|
||||
comparison=_query_throughput(session, comparison),
|
||||
)
|
||||
cycle_time = DashboardSeriesSet(
|
||||
primary=_query_cycle_time(session, primary),
|
||||
comparison=_query_cycle_time(session, comparison),
|
||||
)
|
||||
error_rate = DashboardSeriesSet(
|
||||
primary=_query_error_rate(session, primary),
|
||||
comparison=_query_error_rate(session, comparison),
|
||||
)
|
||||
wip = DashboardWipSeriesSet(
|
||||
primary=_query_wip(session, primary),
|
||||
comparison=_query_wip(session, comparison),
|
||||
)
|
||||
|
||||
kpis = DashboardKpis(
|
||||
active_agents=_active_agents(session),
|
||||
tasks_in_progress=_tasks_in_progress(session),
|
||||
error_rate_pct=_error_rate_kpi(session, primary),
|
||||
median_cycle_time_hours_7d=_median_cycle_time_7d(session),
|
||||
)
|
||||
|
||||
return DashboardMetrics(
|
||||
range=primary.key,
|
||||
generated_at=datetime.utcnow(),
|
||||
kpis=kpis,
|
||||
throughput=throughput,
|
||||
cycle_time=cycle_time,
|
||||
error_rate=error_rate,
|
||||
wip=wip,
|
||||
)
|
||||
@@ -8,6 +8,7 @@ from app.api.agents import router as agents_router
|
||||
from app.api.auth import router as auth_router
|
||||
from app.api.boards import router as boards_router
|
||||
from app.api.gateway import router as gateway_router
|
||||
from app.api.metrics import router as metrics_router
|
||||
from app.api.tasks import router as tasks_router
|
||||
from app.api.users import router as users_router
|
||||
from app.core.config import settings
|
||||
@@ -54,6 +55,7 @@ api_v1.include_router(auth_router)
|
||||
api_v1.include_router(agents_router)
|
||||
api_v1.include_router(activity_router)
|
||||
api_v1.include_router(gateway_router)
|
||||
api_v1.include_router(metrics_router)
|
||||
api_v1.include_router(boards_router)
|
||||
api_v1.include_router(tasks_router)
|
||||
api_v1.include_router(users_router)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from app.schemas.activity_events import ActivityEventRead
|
||||
from app.schemas.agents import AgentCreate, AgentRead, AgentUpdate
|
||||
from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
|
||||
from app.schemas.metrics import DashboardMetrics
|
||||
from app.schemas.tasks import TaskCreate, TaskRead, TaskUpdate
|
||||
from app.schemas.users import UserCreate, UserRead, UserUpdate
|
||||
|
||||
@@ -12,6 +13,7 @@ __all__ = [
|
||||
"BoardCreate",
|
||||
"BoardRead",
|
||||
"BoardUpdate",
|
||||
"DashboardMetrics",
|
||||
"TaskCreate",
|
||||
"TaskRead",
|
||||
"TaskUpdate",
|
||||
|
||||
57
backend/app/schemas/metrics.py
Normal file
57
backend/app/schemas/metrics.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from sqlmodel import SQLModel
|
||||
|
||||
|
||||
class DashboardSeriesPoint(SQLModel):
|
||||
period: datetime
|
||||
value: float
|
||||
|
||||
|
||||
class DashboardWipPoint(SQLModel):
|
||||
period: datetime
|
||||
inbox: int
|
||||
in_progress: int
|
||||
review: int
|
||||
|
||||
|
||||
class DashboardRangeSeries(SQLModel):
|
||||
range: Literal["24h", "7d"]
|
||||
bucket: Literal["hour", "day"]
|
||||
points: list[DashboardSeriesPoint]
|
||||
|
||||
|
||||
class DashboardWipRangeSeries(SQLModel):
|
||||
range: Literal["24h", "7d"]
|
||||
bucket: Literal["hour", "day"]
|
||||
points: list[DashboardWipPoint]
|
||||
|
||||
|
||||
class DashboardSeriesSet(SQLModel):
|
||||
primary: DashboardRangeSeries
|
||||
comparison: DashboardRangeSeries
|
||||
|
||||
|
||||
class DashboardWipSeriesSet(SQLModel):
|
||||
primary: DashboardWipRangeSeries
|
||||
comparison: DashboardWipRangeSeries
|
||||
|
||||
|
||||
class DashboardKpis(SQLModel):
|
||||
active_agents: int
|
||||
tasks_in_progress: int
|
||||
error_rate_pct: float
|
||||
median_cycle_time_hours_7d: float | None
|
||||
|
||||
|
||||
class DashboardMetrics(SQLModel):
|
||||
range: Literal["24h", "7d"]
|
||||
generated_at: datetime
|
||||
kpis: DashboardKpis
|
||||
throughput: DashboardSeriesSet
|
||||
cycle_time: DashboardSeriesSet
|
||||
error_rate: DashboardSeriesSet
|
||||
wip: DashboardWipSeriesSet
|
||||
Reference in New Issue
Block a user