feat(activity): Simplify query statements and improve code readability

This commit is contained in:
Abhimanyu Saharan
2026-02-05 00:21:33 +05:30
parent 79155e9067
commit 51313a9272
12 changed files with 65 additions and 151 deletions

View File

@@ -22,9 +22,5 @@ def list_activity(
statement = select(ActivityEvent)
if actor.actor_type == "agent" and actor.agent:
statement = statement.where(ActivityEvent.agent_id == actor.agent.id)
statement = (
statement.order_by(desc(col(ActivityEvent.created_at)))
.offset(offset)
.limit(limit)
)
statement = statement.order_by(desc(col(ActivityEvent.created_at))).offset(offset).limit(limit)
return list(session.exec(statement))

View File

@@ -5,22 +5,18 @@ from datetime import datetime, timedelta
from uuid import UUID, uuid4
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, col, select
from sqlalchemy import update
from sqlmodel import Session, col, select
from app.api.deps import ActorContext, require_admin_auth, require_admin_or_agent
from app.core.agent_tokens import generate_agent_token, hash_agent_token, verify_agent_token
from app.core.auth import AuthContext
from app.core.config import settings
from app.db.session import get_session
from app.integrations.openclaw_gateway import (
GatewayConfig as GatewayClientConfig,
OpenClawGatewayError,
ensure_session,
send_message,
)
from app.models.agents import Agent
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.schemas.agents import (
@@ -28,9 +24,9 @@ from app.schemas.agents import (
AgentDeleteConfirm,
AgentHeartbeat,
AgentHeartbeatCreate,
AgentProvisionConfirm,
AgentRead,
AgentUpdate,
AgentProvisionConfirm,
)
from app.services.activity_log import record_activity
from app.services.agent_provisioning import (
@@ -76,9 +72,7 @@ def _require_board(session: Session, board_id: UUID | str | None) -> Board:
return board
def _require_gateway(
session: Session, board: Board
) -> tuple[Gateway, GatewayClientConfig]:
def _require_gateway(session: Session, board: Board) -> tuple[Gateway, GatewayClientConfig]:
if not board.gateway_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
@@ -140,9 +134,7 @@ def _record_heartbeat(session: Session, agent: Agent) -> None:
)
def _record_instruction_failure(
session: Session, agent: Agent, error: str, action: str
) -> None:
def _record_instruction_failure(session: Session, agent: Agent, error: str, action: str) -> None:
action_label = action.replace("_", " ").capitalize()
record_activity(
session,
@@ -206,9 +198,7 @@ async def create_agent(
agent.provision_confirm_token_hash = hash_agent_token(provision_token)
agent.provision_requested_at = datetime.utcnow()
agent.provision_action = "provision"
session_key, session_error = await _ensure_gateway_session(
agent.name, client_config
)
session_key, session_error = await _ensure_gateway_session(agent.name, client_config)
agent.openclaw_session_id = session_key
session.add(agent)
session.commit()
@@ -315,9 +305,7 @@ async def update_agent(
session.commit()
session.refresh(agent)
try:
await send_update_message(
agent, board, gateway, raw_token, provision_token, auth.user
)
await send_update_message(agent, board, gateway, raw_token, provision_token, auth.user)
record_activity(
session,
event_type="agent.update.requested",
@@ -383,9 +371,7 @@ async def heartbeat_or_create_agent(
agent.provision_confirm_token_hash = hash_agent_token(provision_token)
agent.provision_requested_at = datetime.utcnow()
agent.provision_action = "provision"
session_key, session_error = await _ensure_gateway_session(
agent.name, client_config
)
session_key, session_error = await _ensure_gateway_session(agent.name, client_config)
agent.openclaw_session_id = session_key
session.add(agent)
session.commit()
@@ -456,9 +442,7 @@ async def heartbeat_or_create_agent(
elif not agent.openclaw_session_id:
board = _require_board(session, str(agent.board_id) if agent.board_id else None)
gateway, client_config = _require_gateway(session, board)
session_key, session_error = await _ensure_gateway_session(
agent.name, client_config
)
session_key, session_error = await _ensure_gateway_session(agent.name, client_config)
agent.openclaw_session_id = session_key
if session_error:
record_activity(
@@ -533,7 +517,7 @@ def delete_agent(
"2) Delete the agent session from the gateway.\n"
"3) Confirm deletion by calling:\n"
f" POST {base_url}/api/v1/agents/{agent.id}/delete/confirm\n"
" Body: {\"token\": \"" + raw_token + "\"}\n"
' Body: {"token": "' + raw_token + '"}\n'
"Reply NO_REPLY."
)
await ensure_session(main_session, config=client_config, label="Main Agent")
@@ -647,9 +631,7 @@ def confirm_delete_agent(
agent_id=None,
)
session.execute(
update(ActivityEvent)
.where(col(ActivityEvent.agent_id) == agent.id)
.values(agent_id=None)
update(ActivityEvent).where(col(ActivityEvent.agent_id) == agent.id).values(agent_id=None)
)
session.delete(agent)
session.commit()

View File

@@ -8,16 +8,11 @@ from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import delete
from sqlmodel import Session, col, select
from app.api.deps import (
ActorContext,
get_board_or_404,
require_admin_auth,
require_admin_or_agent,
)
from app.api.deps import ActorContext, get_board_or_404, require_admin_auth, require_admin_or_agent
from app.core.auth import AuthContext
from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import (
GatewayConfig as GatewayClientConfig,
OpenClawGatewayError,
delete_session,
ensure_session,
@@ -184,9 +179,7 @@ def delete_board(
auth: AuthContext = Depends(require_admin_auth),
) -> dict[str, bool]:
agents = list(session.exec(select(Agent).where(Agent.board_id == board.id)))
task_ids = list(
session.exec(select(Task.id).where(Task.board_id == board.id))
)
task_ids = list(session.exec(select(Task.id).where(Task.board_id == board.id)))
config, client_config = _board_gateway(session, board)
if config and client_config:
@@ -200,14 +193,10 @@ def delete_board(
) from exc
if task_ids:
session.execute(
delete(ActivityEvent).where(col(ActivityEvent.task_id).in_(task_ids))
)
session.execute(delete(ActivityEvent).where(col(ActivityEvent.task_id).in_(task_ids)))
if agents:
agent_ids = [agent.id for agent in agents]
session.execute(
delete(ActivityEvent).where(col(ActivityEvent.agent_id).in_(agent_ids))
)
session.execute(delete(ActivityEvent).where(col(ActivityEvent.agent_id).in_(agent_ids)))
session.execute(delete(Agent).where(col(Agent.id).in_(agent_ids)))
session.execute(delete(Task).where(col(Task.board_id) == board.id))
session.delete(board)

View File

@@ -4,8 +4,9 @@ from fastapi import APIRouter, Body, Depends, HTTPException, Query, status
from sqlmodel import Session
from app.core.auth import AuthContext, get_auth_context
from app.db.session import get_session
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import (
GatewayConfig as GatewayClientConfig,
OpenClawGatewayError,
ensure_session,
get_chat_history,
@@ -17,7 +18,6 @@ from app.integrations.openclaw_gateway_protocol import (
GATEWAY_METHODS,
PROTOCOL_VERSION,
)
from app.db.session import get_session
from app.models.boards import Board
from app.models.gateways import Gateway
@@ -71,9 +71,7 @@ def _resolve_gateway(
def _require_gateway(
session: Session, board_id: str | None
) -> tuple[Board, GatewayClientConfig, str | None]:
board, config, main_session = _resolve_gateway(
session, board_id, None, None, None
)
board, config, main_session = _resolve_gateway(session, board_id, None, None, None)
if board is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
@@ -108,9 +106,7 @@ async def gateways_status(
main_session_error: str | None = None
if main_session:
try:
ensured = await ensure_session(
main_session, config=config, label="Main Agent"
)
ensured = await ensure_session(main_session, config=config, label="Main Agent")
if isinstance(ensured, dict):
main_session_entry = ensured.get("entry") or ensured
except OpenClawGatewayError as exc:
@@ -157,9 +153,7 @@ async def list_gateway_sessions(
main_session_entry: object | None = None
if main_session:
try:
ensured = await ensure_session(
main_session, config=config, label="Main Agent"
)
ensured = await ensure_session(main_session, config=config, label="Main Agent")
if isinstance(ensured, dict):
main_session_entry = ensured.get("entry") or ensured
except OpenClawGatewayError:
@@ -194,9 +188,7 @@ async def get_gateway_session(
sessions_list = list(sessions.get("sessions") or [])
else:
sessions_list = list(sessions or [])
if main_session and not any(
session.get("key") == main_session for session in sessions_list
):
if main_session and not any(session.get("key") == main_session for session in sessions_list):
try:
await ensure_session(main_session, config=config, label="Main Agent")
refreshed = await openclaw_call("sessions.list", config=config)
@@ -206,9 +198,7 @@ async def get_gateway_session(
sessions_list = list(refreshed or [])
except OpenClawGatewayError:
pass
session_entry = next(
(item for item in sessions_list if item.get("key") == session_id), None
)
session_entry = next((item for item in sessions_list if item.get("key") == session_id), None)
if session_entry is None and main_session and session_id == main_session:
try:
ensured = await ensure_session(main_session, config=config, label="Main Agent")

View File

@@ -7,12 +7,8 @@ from sqlmodel import Session, select
from app.core.auth import AuthContext, get_auth_context
from app.db.session import get_session
from app.integrations.openclaw_gateway import (
GatewayConfig as GatewayClientConfig,
OpenClawGatewayError,
ensure_session,
send_message,
)
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import OpenClawGatewayError, ensure_session, send_message
from app.models.gateways import Gateway
from app.schemas.gateways import GatewayCreate, GatewayRead, GatewayUpdate
@@ -237,9 +233,7 @@ async def _send_skyll_enable_message(gateway: Gateway) -> None:
if not gateway.main_session_key:
raise OpenClawGatewayError("gateway main_session_key is required")
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
await ensure_session(
gateway.main_session_key, config=client_config, label="Main Agent"
)
await ensure_session(gateway.main_session_key, config=client_config, label="Main Agent")
await send_message(
SKYLL_ENABLE_MESSAGE,
session_key=gateway.main_session_key,
@@ -254,9 +248,7 @@ async def _send_skyll_disable_message(gateway: Gateway) -> None:
if not gateway.main_session_key:
raise OpenClawGatewayError("gateway main_session_key is required")
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
await ensure_session(
gateway.main_session_key, config=client_config, label="Main Agent"
)
await ensure_session(gateway.main_session_key, config=client_config, label="Main Agent")
await send_message(
SKYLL_DISABLE_MESSAGE,
session_key=gateway.main_session_key,

View File

@@ -5,7 +5,7 @@ from datetime import datetime, timedelta
from typing import Literal
from fastapi import APIRouter, Depends, Query
from sqlalchemy import case, func
from sqlalchemy import DateTime, case, cast, func
from sqlmodel import Session, col, select
from app.api.deps import require_admin_auth
@@ -114,7 +114,7 @@ def _wip_series_from_mapping(
def _query_throughput(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
statement = (
select(bucket_col, func.count(Task.id))
select(bucket_col, func.count())
.where(col(Task.status) == "review")
.where(col(Task.updated_at) >= range_spec.start)
.where(col(Task.updated_at) <= range_spec.end)
@@ -128,9 +128,8 @@ def _query_throughput(session: Session, range_spec: RangeSpec) -> DashboardRange
def _query_cycle_time(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
bucket_col = func.date_trunc(range_spec.bucket, Task.updated_at).label("bucket")
duration_hours = func.extract(
"epoch", Task.updated_at - Task.in_progress_at
) / 3600.0
in_progress = cast(Task.in_progress_at, DateTime)
duration_hours = func.extract("epoch", Task.updated_at - in_progress) / 3600.0
statement = (
select(bucket_col, func.avg(duration_hours))
.where(col(Task.status) == "review")
@@ -146,9 +145,7 @@ def _query_cycle_time(session: Session, range_spec: RangeSpec) -> DashboardRange
def _query_error_rate(session: Session, range_spec: RangeSpec) -> DashboardRangeSeries:
bucket_col = func.date_trunc(range_spec.bucket, ActivityEvent.created_at).label(
"bucket"
)
bucket_col = func.date_trunc(range_spec.bucket, ActivityEvent.created_at).label("bucket")
error_case = case(
(
col(ActivityEvent.event_type).like(ERROR_EVENT_PATTERN),
@@ -157,7 +154,7 @@ def _query_error_rate(session: Session, range_spec: RangeSpec) -> DashboardRange
else_=0,
)
statement = (
select(bucket_col, func.sum(error_case), func.count(ActivityEvent.id))
select(bucket_col, func.sum(error_case), func.count())
.where(col(ActivityEvent.created_at) >= range_spec.start)
.where(col(ActivityEvent.created_at) <= range_spec.end)
.group_by(bucket_col)
@@ -204,9 +201,8 @@ def _query_wip(session: Session, range_spec: RangeSpec) -> DashboardWipRangeSeri
def _median_cycle_time_7d(session: Session) -> float | None:
now = datetime.utcnow()
start = now - timedelta(days=7)
duration_hours = func.extract(
"epoch", Task.updated_at - Task.in_progress_at
) / 3600.0
in_progress = cast(Task.in_progress_at, DateTime)
duration_hours = func.extract("epoch", Task.updated_at - in_progress) / 3600.0
statement = (
select(func.percentile_cont(0.5).within_group(duration_hours))
.where(col(Task.status) == "review")
@@ -233,7 +229,7 @@ def _error_rate_kpi(session: Session, range_spec: RangeSpec) -> float:
else_=0,
)
statement = (
select(func.sum(error_case), func.count(ActivityEvent.id))
select(func.sum(error_case), func.count())
.where(col(ActivityEvent.created_at) >= range_spec.start)
.where(col(ActivityEvent.created_at) <= range_spec.end)
)
@@ -248,7 +244,7 @@ def _error_rate_kpi(session: Session, range_spec: RangeSpec) -> float:
def _active_agents(session: Session) -> int:
threshold = datetime.utcnow() - OFFLINE_AFTER
statement = select(func.count(Agent.id)).where(
statement = select(func.count()).where(
col(Agent.last_seen_at).is_not(None),
col(Agent.last_seen_at) >= threshold,
)
@@ -257,7 +253,7 @@ def _active_agents(session: Session) -> int:
def _tasks_in_progress(session: Session) -> int:
statement = select(func.count(Task.id)).where(col(Task.status) == "in_progress")
statement = select(func.count()).where(col(Task.status) == "in_progress")
result = session.exec(statement).one()
return int(result)

View File

@@ -16,17 +16,11 @@ from app.api.deps import (
)
from app.core.auth import AuthContext
from app.db.session import get_session
from app.models.agents import Agent
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.boards import Board
from app.models.tasks import Task
from app.schemas.tasks import (
TaskCommentCreate,
TaskCommentRead,
TaskCreate,
TaskRead,
TaskUpdate,
)
from app.schemas.tasks import TaskCommentCreate, TaskCommentRead, TaskCreate, TaskRead, TaskUpdate
from app.services.activity_log import record_activity
router = APIRouter(prefix="/boards/{board_id}/tasks", tags=["tasks"])

View File

@@ -18,9 +18,7 @@ class AgentAuthContext:
def _find_agent_for_token(session: Session, token: str) -> Agent | None:
agents = list(
session.exec(select(Agent).where(col(Agent.agent_token_hash).is_not(None)))
)
agents = list(session.exec(select(Agent).where(col(Agent.agent_token_hash).is_not(None))))
for agent in agents:
if agent.agent_token_hash and verify_agent_token(token, agent.agent_token_hash):
return agent

View File

@@ -41,7 +41,5 @@ def verify_agent_token(token: str, stored_hash: str) -> bool:
return False
salt = _b64decode(salt_b64)
expected_digest = _b64decode(digest_b64)
candidate = hashlib.pbkdf2_hmac(
"sha256", token.encode("utf-8"), salt, iterations_int
)
candidate = hashlib.pbkdf2_hmac("sha256", token.encode("utf-8"), salt, iterations_int)
return hmac.compare_digest(candidate, expected_digest)

View File

@@ -66,9 +66,7 @@ class AppLogFilter(logging.Filter):
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"timestamp": datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat(),
"timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),

View File

@@ -4,7 +4,7 @@ from datetime import datetime
from typing import Any
from uuid import UUID, uuid4
from sqlalchemy import Column, JSON, Text
from sqlalchemy import JSON, Column, Text
from sqlmodel import Field, SQLModel
@@ -17,9 +17,7 @@ class Agent(SQLModel, table=True):
status: str = Field(default="provisioning", index=True)
openclaw_session_id: str | None = Field(default=None, index=True)
agent_token_hash: str | None = Field(default=None, index=True)
heartbeat_config: dict[str, Any] | None = Field(
default=None, sa_column=Column(JSON)
)
heartbeat_config: dict[str, Any] | None = Field(default=None, sa_column=Column(JSON))
identity_template: str | None = Field(default=None, sa_column=Column(Text))
soul_template: str | None = Field(default=None, sa_column=Column(Text))
provision_requested_at: datetime | None = Field(default=None)

View File

@@ -9,11 +9,8 @@ from uuid import uuid4
from jinja2 import Environment, FileSystemLoader, StrictUndefined, select_autoescape
from app.core.config import settings
from app.integrations.openclaw_gateway import (
GatewayConfig as GatewayClientConfig,
ensure_session,
send_message,
)
from app.integrations.openclaw_gateway import GatewayConfig as GatewayClientConfig
from app.integrations.openclaw_gateway import ensure_session, send_message
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
@@ -130,12 +127,12 @@ def _build_context(
"auth_token": auth_token,
"main_session_key": main_session_key,
"workspace_root": workspace_root,
"user_name": user.name if user else "",
"user_preferred_name": user.preferred_name if user else "",
"user_pronouns": user.pronouns if user else "",
"user_timezone": user.timezone if user else "",
"user_notes": user.notes if user else "",
"user_context": user.context if user else "",
"user_name": (user.name or "") if user else "",
"user_preferred_name": (user.preferred_name or "") if user else "",
"user_pronouns": (user.pronouns or "") if user else "",
"user_timezone": (user.timezone or "") if user else "",
"user_notes": (user.notes or "") if user else "",
"user_context": (user.context or "") if user else "",
}
@@ -146,9 +143,7 @@ def _build_file_blocks(context: dict[str, str], agent: Agent) -> str:
if agent.soul_template:
overrides["SOUL.md"] = agent.soul_template
templates = _read_templates(context, overrides=overrides)
return "".join(
_render_file_block(name, templates.get(name, "")) for name in TEMPLATE_FILES
)
return "".join(_render_file_block(name, templates.get(name, "")) for name in TEMPLATE_FILES)
def build_provisioning_message(
@@ -198,7 +193,7 @@ def build_provisioning_message(
"run heartbeats.\n"
"7) After provisioning completes, confirm by calling:\n"
f" POST {context['base_url']}/api/v1/agents/{context['agent_id']}/provision/confirm\n"
f" Body: {{\"token\": \"{confirm_token}\", \"action\": \"provision\"}}\n\n"
f' Body: {{"token": "{confirm_token}", "action": "provision"}}\n\n'
"Files:" + file_blocks
)
@@ -248,7 +243,7 @@ def build_update_message(
"run heartbeats.\n"
"7) After the update completes (and only after files are written), confirm by calling:\n"
f" POST {context['base_url']}/api/v1/agents/{context['agent_id']}/provision/confirm\n"
f" Body: {{\"token\": \"{confirm_token}\", \"action\": \"update\"}}\n"
f' Body: {{"token": "{confirm_token}", "action": "update"}}\n'
" Mission Control will send the hello message only after this confirmation.\n\n"
"Files:" + file_blocks
)
@@ -267,16 +262,10 @@ async def send_provisioning_message(
if not gateway.main_session_key:
raise ValueError("gateway_main_session_key is required")
main_session = gateway.main_session_key
client_config = GatewayClientConfig(
url=gateway.url, token=gateway.token
)
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
await ensure_session(main_session, config=client_config, label="Main Agent")
message = build_provisioning_message(
agent, board, gateway, auth_token, confirm_token, user
)
await send_message(
message, session_key=main_session, config=client_config, deliver=False
)
message = build_provisioning_message(agent, board, gateway, auth_token, confirm_token, user)
await send_message(message, session_key=main_session, config=client_config, deliver=False)
async def send_update_message(
@@ -292,13 +281,7 @@ async def send_update_message(
if not gateway.main_session_key:
raise ValueError("gateway_main_session_key is required")
main_session = gateway.main_session_key
client_config = GatewayClientConfig(
url=gateway.url, token=gateway.token
)
client_config = GatewayClientConfig(url=gateway.url, token=gateway.token)
await ensure_session(main_session, config=client_config, label="Main Agent")
message = build_update_message(
agent, board, gateway, auth_token, confirm_token, user
)
await send_message(
message, session_key=main_session, config=client_config, deliver=False
)
message = build_update_message(agent, board, gateway, auth_token, confirm_token, user)
await send_message(message, session_key=main_session, config=client_config, deliver=False)