307 lines
9.8 KiB
Python
307 lines
9.8 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from typing import Literal
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from sqlalchemy import case, func
|
|
from sqlmodel import Session, col, select
|
|
|
|
from app.api.deps import require_admin_auth
|
|
from app.core.auth import AuthContext
|
|
from app.db.session import get_session
|
|
from app.models.activity_events import ActivityEvent
|
|
from app.models.agents import Agent
|
|
from app.models.tasks import Task
|
|
from app.schemas.metrics import (
|
|
DashboardKpis,
|
|
DashboardMetrics,
|
|
DashboardRangeSeries,
|
|
DashboardSeriesPoint,
|
|
DashboardSeriesSet,
|
|
DashboardWipPoint,
|
|
DashboardWipRangeSeries,
|
|
DashboardWipSeriesSet,
|
|
)
|
|
|
|
router = APIRouter(prefix="/metrics", tags=["metrics"])
|
|
|
|
OFFLINE_AFTER = timedelta(minutes=10)
|
|
ERROR_EVENT_PATTERN = "%failed"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RangeSpec:
|
|
key: Literal["24h", "7d"]
|
|
start: datetime
|
|
end: datetime
|
|
bucket: Literal["hour", "day"]
|
|
|
|
|
|
def _resolve_range(range_key: Literal["24h", "7d"]) -> RangeSpec:
|
|
now = datetime.utcnow()
|
|
if range_key == "7d":
|
|
return RangeSpec(
|
|
key="7d",
|
|
start=now - timedelta(days=7),
|
|
end=now,
|
|
bucket="day",
|
|
)
|
|
return RangeSpec(
|
|
key="24h",
|
|
start=now - timedelta(hours=24),
|
|
end=now,
|
|
bucket="hour",
|
|
)
|
|
|
|
|
|
def _comparison_range(range_key: Literal["24h", "7d"]) -> RangeSpec:
|
|
return _resolve_range("7d" if range_key == "24h" else "24h")
|
|
|
|
|
|
def _bucket_start(value: datetime, bucket: Literal["hour", "day"]) -> datetime:
|
|
if bucket == "day":
|
|
return value.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
return value.replace(minute=0, second=0, microsecond=0)
|
|
|
|
|
|
def _build_buckets(range_spec: RangeSpec) -> list[datetime]:
|
|
cursor = _bucket_start(range_spec.start, range_spec.bucket)
|
|
step = timedelta(days=1) if range_spec.bucket == "day" else timedelta(hours=1)
|
|
buckets: list[datetime] = []
|
|
while cursor <= range_spec.end:
|
|
buckets.append(cursor)
|
|
cursor += step
|
|
return buckets
|
|
|
|
|
|
def _series_from_mapping(
|
|
range_spec: RangeSpec, mapping: dict[datetime, float]
|
|
) -> DashboardRangeSeries:
|
|
points = [
|
|
DashboardSeriesPoint(period=bucket, value=float(mapping.get(bucket, 0)))
|
|
for bucket in _build_buckets(range_spec)
|
|
]
|
|
return DashboardRangeSeries(
|
|
range=range_spec.key,
|
|
bucket=range_spec.bucket,
|
|
points=points,
|
|
)
|
|
|
|
|
|
def _wip_series_from_mapping(
|
|
range_spec: RangeSpec, mapping: dict[datetime, dict[str, int]]
|
|
) -> DashboardWipRangeSeries:
|
|
points: list[DashboardWipPoint] = []
|
|
for bucket in _build_buckets(range_spec):
|
|
values = mapping.get(bucket, {})
|
|
points.append(
|
|
DashboardWipPoint(
|
|
period=bucket,
|
|
inbox=values.get("inbox", 0),
|
|
in_progress=values.get("in_progress", 0),
|
|
review=values.get("review", 0),
|
|
)
|
|
)
|
|
return DashboardWipRangeSeries(
|
|
range=range_spec.key,
|
|
bucket=range_spec.bucket,
|
|
points=points,
|
|
)
|
|
|
|
|
|
def _query_throughput(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
|
|
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
|
|
statement = (
|
|
select(bucket_col, func.count(Task.id))
|
|
.where(col(Task.status) == "review")
|
|
.where(col(Task.updated_at) >= range_spec.start)
|
|
.where(col(Task.updated_at) <= range_spec.end)
|
|
.group_by(bucket_col)
|
|
.order_by(bucket_col)
|
|
)
|
|
results = session.exec(statement).all()
|
|
mapping = {row[0]: float(row[1]) for row in results}
|
|
return _series_from_mapping(range_spec, mapping)
|
|
|
|
|
|
def _query_cycle_time(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
|
|
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
|
|
duration_hours = func.extract(
|
|
"epoch", Task.updated_at - Task.in_progress_at
|
|
) / 3600.0
|
|
statement = (
|
|
select(bucket_col, func.avg(duration_hours))
|
|
.where(col(Task.status) == "review")
|
|
.where(col(Task.in_progress_at).is_not(None))
|
|
.where(col(Task.updated_at) >= range_spec.start)
|
|
.where(col(Task.updated_at) <= range_spec.end)
|
|
.group_by(bucket_col)
|
|
.order_by(bucket_col)
|
|
)
|
|
results = session.exec(statement).all()
|
|
mapping = {row[0]: float(row[1] or 0) for row in results}
|
|
return _series_from_mapping(range_spec, mapping)
|
|
|
|
|
|
def _query_error_rate(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
|
|
bucket_col = func.date_trunc(range_spec.bucket, ActivityEvent.created_at).label(
|
|
"bucket"
|
|
)
|
|
error_case = case(
|
|
(
|
|
col(ActivityEvent.event_type).like(ERROR_EVENT_PATTERN),
|
|
1,
|
|
),
|
|
else_=0,
|
|
)
|
|
statement = (
|
|
select(bucket_col, func.sum(error_case), func.count(ActivityEvent.id))
|
|
.where(col(ActivityEvent.created_at) >= range_spec.start)
|
|
.where(col(ActivityEvent.created_at) <= range_spec.end)
|
|
.group_by(bucket_col)
|
|
.order_by(bucket_col)
|
|
)
|
|
results = session.exec(statement).all()
|
|
mapping: dict[datetime, float] = {}
|
|
for bucket, errors, total in results:
|
|
total_count = float(total or 0)
|
|
error_count = float(errors or 0)
|
|
rate = (error_count / total_count) * 100 if total_count > 0 else 0.0
|
|
mapping[bucket] = rate
|
|
return _series_from_mapping(range_spec, mapping)
|
|
|
|
|
|
def _query_wip(session: Session, range_spec: RangeSpec) -> DashboardWipRangeSeries:
|
|
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
|
|
inbox_case = case((col(Task.status) == "inbox", 1), else_=0)
|
|
progress_case = case((col(Task.status) == "in_progress", 1), else_=0)
|
|
review_case = case((col(Task.status) == "review", 1), else_=0)
|
|
statement = (
|
|
select(
|
|
bucket_col,
|
|
func.sum(inbox_case),
|
|
func.sum(progress_case),
|
|
func.sum(review_case),
|
|
)
|
|
.where(col(Task.updated_at) >= range_spec.start)
|
|
.where(col(Task.updated_at) <= range_spec.end)
|
|
.group_by(bucket_col)
|
|
.order_by(bucket_col)
|
|
)
|
|
results = session.exec(statement).all()
|
|
mapping: dict[datetime, dict[str, int]] = {}
|
|
for bucket, inbox, in_progress, review in results:
|
|
mapping[bucket] = {
|
|
"inbox": int(inbox or 0),
|
|
"in_progress": int(in_progress or 0),
|
|
"review": int(review or 0),
|
|
}
|
|
return _wip_series_from_mapping(range_spec, mapping)
|
|
|
|
|
|
def _median_cycle_time_7d(session: Session) -> float | None:
|
|
now = datetime.utcnow()
|
|
start = now - timedelta(days=7)
|
|
duration_hours = func.extract(
|
|
"epoch", Task.updated_at - Task.in_progress_at
|
|
) / 3600.0
|
|
statement = (
|
|
select(func.percentile_cont(0.5).within_group(duration_hours))
|
|
.where(col(Task.status) == "review")
|
|
.where(col(Task.in_progress_at).is_not(None))
|
|
.where(col(Task.updated_at) >= start)
|
|
.where(col(Task.updated_at) <= now)
|
|
)
|
|
value = session.exec(statement).one_or_none()
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, tuple):
|
|
value = value[0]
|
|
if value is None:
|
|
return None
|
|
return float(value)
|
|
|
|
|
|
def _error_rate_kpi(session: Session, range_spec: RangeSpec) -> float:
|
|
error_case = case(
|
|
(
|
|
col(ActivityEvent.event_type).like(ERROR_EVENT_PATTERN),
|
|
1,
|
|
),
|
|
else_=0,
|
|
)
|
|
statement = (
|
|
select(func.sum(error_case), func.count(ActivityEvent.id))
|
|
.where(col(ActivityEvent.created_at) >= range_spec.start)
|
|
.where(col(ActivityEvent.created_at) <= range_spec.end)
|
|
)
|
|
result = session.exec(statement).one_or_none()
|
|
if result is None:
|
|
return 0.0
|
|
errors, total = result
|
|
total_count = float(total or 0)
|
|
error_count = float(errors or 0)
|
|
return (error_count / total_count) * 100 if total_count > 0 else 0.0
|
|
|
|
|
|
def _active_agents(session: Session) -> int:
|
|
threshold = datetime.utcnow() - OFFLINE_AFTER
|
|
statement = select(func.count(Agent.id)).where(
|
|
col(Agent.last_seen_at).is_not(None),
|
|
col(Agent.last_seen_at) >= threshold,
|
|
)
|
|
result = session.exec(statement).one()
|
|
return int(result)
|
|
|
|
|
|
def _tasks_in_progress(session: Session) -> int:
|
|
statement = select(func.count(Task.id)).where(col(Task.status) == "in_progress")
|
|
result = session.exec(statement).one()
|
|
return int(result)
|
|
|
|
|
|
@router.get("/dashboard", response_model=DashboardMetrics)
|
|
def dashboard_metrics(
|
|
range: Literal["24h", "7d"] = Query(default="24h"),
|
|
session: Session = Depends(get_session),
|
|
auth: AuthContext = Depends(require_admin_auth),
|
|
) -> DashboardMetrics:
|
|
primary = _resolve_range(range)
|
|
comparison = _comparison_range(range)
|
|
|
|
throughput = DashboardSeriesSet(
|
|
primary=_query_throughput(session, primary),
|
|
comparison=_query_throughput(session, comparison),
|
|
)
|
|
cycle_time = DashboardSeriesSet(
|
|
primary=_query_cycle_time(session, primary),
|
|
comparison=_query_cycle_time(session, comparison),
|
|
)
|
|
error_rate = DashboardSeriesSet(
|
|
primary=_query_error_rate(session, primary),
|
|
comparison=_query_error_rate(session, comparison),
|
|
)
|
|
wip = DashboardWipSeriesSet(
|
|
primary=_query_wip(session, primary),
|
|
comparison=_query_wip(session, comparison),
|
|
)
|
|
|
|
kpis = DashboardKpis(
|
|
active_agents=_active_agents(session),
|
|
tasks_in_progress=_tasks_in_progress(session),
|
|
error_rate_pct=_error_rate_kpi(session, primary),
|
|
median_cycle_time_hours_7d=_median_cycle_time_7d(session),
|
|
)
|
|
|
|
return DashboardMetrics(
|
|
range=primary.key,
|
|
generated_at=datetime.utcnow(),
|
|
kpis=kpis,
|
|
throughput=throughput,
|
|
cycle_time=cycle_time,
|
|
error_rate=error_rate,
|
|
wip=wip,
|
|
)
|