Files
openclaw-mission-control/backend/app/api/metrics.py

485 lines
16 KiB
Python

"""Dashboard metric aggregation endpoints."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import DateTime, case
from sqlalchemy import cast as sql_cast
from sqlalchemy import func
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.api.deps import require_org_member
from app.core.time import utcnow
from app.db.session import get_session
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.boards import Board
from app.models.tasks import Task
from app.schemas.metrics import (
DashboardBucketKey,
DashboardKpis,
DashboardMetrics,
DashboardRangeKey,
DashboardRangeSeries,
DashboardSeriesPoint,
DashboardSeriesSet,
DashboardWipPoint,
DashboardWipRangeSeries,
DashboardWipSeriesSet,
)
from app.services.organizations import OrganizationContext, list_accessible_board_ids
router = APIRouter(prefix="/metrics", tags=["metrics"])
ERROR_EVENT_PATTERN = "%failed"
_RUNTIME_TYPE_REFERENCES = (UUID, AsyncSession)
RANGE_QUERY = Query(default="24h")
BOARD_ID_QUERY = Query(default=None)
GROUP_ID_QUERY = Query(default=None)
SESSION_DEP = Depends(get_session)
ORG_MEMBER_DEP = Depends(require_org_member)
@dataclass(frozen=True)
class RangeSpec:
"""Resolved time-range specification for metric aggregation."""
key: DashboardRangeKey
start: datetime
end: datetime
bucket: DashboardBucketKey
duration: timedelta
def _resolve_range(range_key: DashboardRangeKey) -> RangeSpec:
now = utcnow()
specs: dict[DashboardRangeKey, tuple[timedelta, DashboardBucketKey]] = {
"24h": (timedelta(hours=24), "hour"),
"3d": (timedelta(days=3), "day"),
"7d": (timedelta(days=7), "day"),
"14d": (timedelta(days=14), "day"),
"1m": (timedelta(days=30), "day"),
"3m": (timedelta(days=90), "week"),
"6m": (timedelta(days=180), "week"),
"1y": (timedelta(days=365), "month"),
}
duration, bucket = specs[range_key]
return RangeSpec(
key=range_key,
start=now - duration,
end=now,
bucket=bucket,
duration=duration,
)
def _comparison_range(range_spec: RangeSpec) -> RangeSpec:
return RangeSpec(
key=range_spec.key,
start=range_spec.start - range_spec.duration,
end=range_spec.end - range_spec.duration,
bucket=range_spec.bucket,
duration=range_spec.duration,
)
def _bucket_start(value: datetime, bucket: DashboardBucketKey) -> datetime:
normalized = value.replace(hour=0, minute=0, second=0, microsecond=0)
if bucket == "month":
return normalized.replace(day=1)
if bucket == "week":
return normalized - timedelta(days=normalized.weekday())
if bucket == "day":
return normalized
return value.replace(minute=0, second=0, microsecond=0)
def _next_bucket(cursor: datetime, bucket: DashboardBucketKey) -> datetime:
if bucket == "hour":
return cursor + timedelta(hours=1)
if bucket == "day":
return cursor + timedelta(days=1)
if bucket == "week":
return cursor + timedelta(days=7)
next_month = cursor.month + 1
next_year = cursor.year
if next_month > 12:
next_month = 1
next_year += 1
return cursor.replace(year=next_year, month=next_month, day=1)
def _build_buckets(range_spec: RangeSpec) -> list[datetime]:
cursor = _bucket_start(range_spec.start, range_spec.bucket)
buckets: list[datetime] = []
while cursor <= range_spec.end:
buckets.append(cursor)
cursor = _next_bucket(cursor, range_spec.bucket)
return buckets
def _series_from_mapping(
range_spec: RangeSpec,
mapping: dict[datetime, float],
) -> DashboardRangeSeries:
points = [
DashboardSeriesPoint(period=bucket, value=float(mapping.get(bucket, 0)))
for bucket in _build_buckets(range_spec)
]
return DashboardRangeSeries(
range=range_spec.key,
bucket=range_spec.bucket,
points=points,
)
def _wip_series_from_mapping(
range_spec: RangeSpec,
mapping: dict[datetime, dict[str, int]],
) -> DashboardWipRangeSeries:
points: list[DashboardWipPoint] = []
for bucket in _build_buckets(range_spec):
values = mapping.get(bucket, {})
points.append(
DashboardWipPoint(
period=bucket,
inbox=values.get("inbox", 0),
in_progress=values.get("in_progress", 0),
review=values.get("review", 0),
done=values.get("done", 0),
),
)
return DashboardWipRangeSeries(
range=range_spec.key,
bucket=range_spec.bucket,
points=points,
)
async def _query_throughput(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> DashboardRangeSeries:
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
statement = (
select(bucket_col, func.count())
.where(col(Task.status) == "review")
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
)
if not board_ids:
return _series_from_mapping(range_spec, {})
statement = (
statement.where(col(Task.board_id).in_(board_ids)).group_by(bucket_col).order_by(bucket_col)
)
results = (await session.exec(statement)).all()
mapping = {row[0]: float(row[1]) for row in results}
return _series_from_mapping(range_spec, mapping)
async def _query_cycle_time(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> DashboardRangeSeries:
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
in_progress = sql_cast(Task.in_progress_at, DateTime)
duration_hours = func.extract("epoch", Task.updated_at - in_progress) / 3600.0
statement = (
select(bucket_col, func.avg(duration_hours))
.where(col(Task.status) == "review")
.where(col(Task.in_progress_at).is_not(None))
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
)
if not board_ids:
return _series_from_mapping(range_spec, {})
statement = (
statement.where(col(Task.board_id).in_(board_ids)).group_by(bucket_col).order_by(bucket_col)
)
results = (await session.exec(statement)).all()
mapping = {row[0]: float(row[1] or 0) for row in results}
return _series_from_mapping(range_spec, mapping)
async def _query_error_rate(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> DashboardRangeSeries:
bucket_col = func.date_trunc(
range_spec.bucket,
ActivityEvent.created_at,
).label("bucket")
error_case = case(
(
col(ActivityEvent.event_type).like(ERROR_EVENT_PATTERN),
1,
),
else_=0,
)
statement = (
select(bucket_col, func.sum(error_case), func.count())
.join(Task, col(ActivityEvent.task_id) == col(Task.id))
.where(col(ActivityEvent.created_at) >= range_spec.start)
.where(col(ActivityEvent.created_at) <= range_spec.end)
)
if not board_ids:
return _series_from_mapping(range_spec, {})
statement = (
statement.where(col(Task.board_id).in_(board_ids)).group_by(bucket_col).order_by(bucket_col)
)
results = (await session.exec(statement)).all()
mapping: dict[datetime, float] = {}
for bucket, errors, total in results:
total_count = float(total or 0)
error_count = float(errors or 0)
rate = (error_count / total_count) * 100 if total_count > 0 else 0.0
mapping[bucket] = rate
return _series_from_mapping(range_spec, mapping)
async def _query_wip(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> DashboardWipRangeSeries:
if not board_ids:
return _wip_series_from_mapping(range_spec, {})
inbox_bucket_col = func.date_trunc(range_spec.bucket, Task.created_at).label("inbox_bucket")
inbox_statement = (
select(inbox_bucket_col, func.count())
.where(col(Task.status) == "inbox")
.where(col(Task.created_at) >= range_spec.start)
.where(col(Task.created_at) <= range_spec.end)
.where(col(Task.board_id).in_(board_ids))
.group_by(inbox_bucket_col)
.order_by(inbox_bucket_col)
)
inbox_results = (await session.exec(inbox_statement)).all()
status_bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("status_bucket")
progress_case = case((col(Task.status) == "in_progress", 1), else_=0)
review_case = case((col(Task.status) == "review", 1), else_=0)
done_case = case((col(Task.status) == "done", 1), else_=0)
status_statement = (
select(
status_bucket_col,
func.sum(progress_case),
func.sum(review_case),
func.sum(done_case),
)
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
.where(col(Task.board_id).in_(board_ids))
.group_by(status_bucket_col)
.order_by(status_bucket_col)
)
status_results = (await session.exec(status_statement)).all()
mapping: dict[datetime, dict[str, int]] = {}
for bucket, inbox in inbox_results:
values = mapping.setdefault(bucket, {})
values["inbox"] = int(inbox or 0)
for bucket, in_progress, review, done in status_results:
values = mapping.setdefault(bucket, {})
values["in_progress"] = int(in_progress or 0)
values["review"] = int(review or 0)
values["done"] = int(done or 0)
return _wip_series_from_mapping(range_spec, mapping)
async def _median_cycle_time_for_range(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> float | None:
in_progress = sql_cast(Task.in_progress_at, DateTime)
duration_hours = func.extract("epoch", Task.updated_at - in_progress) / 3600.0
statement = (
select(func.percentile_cont(0.5).within_group(duration_hours))
.where(col(Task.status) == "review")
.where(col(Task.in_progress_at).is_not(None))
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
)
if not board_ids:
return None
statement = statement.where(col(Task.board_id).in_(board_ids))
value = (await session.exec(statement)).one_or_none()
if value is None:
return None
if isinstance(value, tuple):
value = value[0]
if value is None:
return None
return float(value)
async def _error_rate_kpi(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> float:
error_case = case(
(
col(ActivityEvent.event_type).like(ERROR_EVENT_PATTERN),
1,
),
else_=0,
)
statement = (
select(func.sum(error_case), func.count())
.join(Task, col(ActivityEvent.task_id) == col(Task.id))
.where(col(ActivityEvent.created_at) >= range_spec.start)
.where(col(ActivityEvent.created_at) <= range_spec.end)
)
if not board_ids:
return 0.0
statement = statement.where(col(Task.board_id).in_(board_ids))
result = (await session.exec(statement)).one_or_none()
if result is None:
return 0.0
errors, total = result
total_count = float(total or 0)
error_count = float(errors or 0)
return (error_count / total_count) * 100 if total_count > 0 else 0.0
async def _active_agents(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> int:
statement = select(func.count()).where(
col(Agent.last_seen_at).is_not(None),
col(Agent.last_seen_at) >= range_spec.start,
col(Agent.last_seen_at) <= range_spec.end,
)
if not board_ids:
return 0
statement = statement.where(col(Agent.board_id).in_(board_ids))
result = (await session.exec(statement)).one()
return int(result)
async def _tasks_in_progress(
session: AsyncSession,
range_spec: RangeSpec,
board_ids: list[UUID],
) -> int:
if not board_ids:
return 0
statement = (
select(func.count())
.where(col(Task.status) == "in_progress")
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
.where(col(Task.board_id).in_(board_ids))
)
result = (await session.exec(statement)).one()
return int(result)
async def _resolve_dashboard_board_ids(
session: AsyncSession,
*,
ctx: OrganizationContext,
board_id: UUID | None,
group_id: UUID | None,
) -> list[UUID]:
board_ids = await list_accessible_board_ids(session, member=ctx.member, write=False)
if not board_ids:
return []
allowed = set(board_ids)
if board_id is not None and board_id not in allowed:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if group_id is None:
return [board_id] if board_id is not None else board_ids
group_board_ids = list(
await session.exec(
select(Board.id)
.where(col(Board.organization_id) == ctx.member.organization_id)
.where(col(Board.board_group_id) == group_id)
.where(col(Board.id).in_(board_ids)),
),
)
if board_id is not None:
return [board_id] if board_id in set(group_board_ids) else []
return group_board_ids
@router.get("/dashboard", response_model=DashboardMetrics)
async def dashboard_metrics(
range_key: DashboardRangeKey = RANGE_QUERY,
board_id: UUID | None = BOARD_ID_QUERY,
group_id: UUID | None = GROUP_ID_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> DashboardMetrics:
"""Return dashboard KPIs and time-series data for accessible boards."""
primary = _resolve_range(range_key)
comparison = _comparison_range(primary)
board_ids = await _resolve_dashboard_board_ids(
session,
ctx=ctx,
board_id=board_id,
group_id=group_id,
)
throughput_primary = await _query_throughput(session, primary, board_ids)
throughput_comparison = await _query_throughput(session, comparison, board_ids)
throughput = DashboardSeriesSet(
primary=throughput_primary,
comparison=throughput_comparison,
)
cycle_time_primary = await _query_cycle_time(session, primary, board_ids)
cycle_time_comparison = await _query_cycle_time(session, comparison, board_ids)
cycle_time = DashboardSeriesSet(
primary=cycle_time_primary,
comparison=cycle_time_comparison,
)
error_rate_primary = await _query_error_rate(session, primary, board_ids)
error_rate_comparison = await _query_error_rate(session, comparison, board_ids)
error_rate = DashboardSeriesSet(
primary=error_rate_primary,
comparison=error_rate_comparison,
)
wip_primary = await _query_wip(session, primary, board_ids)
wip_comparison = await _query_wip(session, comparison, board_ids)
wip = DashboardWipSeriesSet(
primary=wip_primary,
comparison=wip_comparison,
)
kpis = DashboardKpis(
active_agents=await _active_agents(session, primary, board_ids),
tasks_in_progress=await _tasks_in_progress(session, primary, board_ids),
error_rate_pct=await _error_rate_kpi(session, primary, board_ids),
median_cycle_time_hours_7d=await _median_cycle_time_for_range(
session,
primary,
board_ids,
),
)
return DashboardMetrics(
range=primary.key,
generated_at=utcnow(),
kpis=kpis,
throughput=throughput,
cycle_time=cycle_time,
error_rate=error_rate,
wip=wip,
)