Merge pull request #95 from abhi1693/docs/backend-doc-pass

docs(backend): docstrings + rationale comments for tasks, provisioning, auth
This commit is contained in:
Abhimanyu Saharan
2026-02-25 03:37:14 +05:30
committed by GitHub
3 changed files with 102 additions and 0 deletions

View File

@@ -318,22 +318,41 @@ async def has_valid_recent_comment(
def _parse_since(value: str | None) -> datetime | None:
"""Parse an optional ISO-8601 timestamp into a naive UTC `datetime`.
The API accepts either naive timestamps (treated as UTC) or timezone-aware values.
Returning naive UTC simplifies SQLModel comparisons against stored naive UTC values.
"""
if not value:
return None
normalized = value.strip()
if not normalized:
return None
# Allow common ISO-8601 `Z` suffix (UTC) even though `datetime.fromisoformat` expects `+00:00`.
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
# No tzinfo: interpret as UTC for consistency with other API timestamps.
return parsed
def _coerce_task_items(items: Sequence[object]) -> list[Task]:
"""Validate/convert paginated query results to a concrete `list[Task]`.
SQLModel pagination helpers return `Sequence[object]`; we validate types early so the
rest of the route logic can assume real `Task` instances.
"""
tasks: list[Task] = []
for item in items:
if not isinstance(item, Task):
@@ -346,6 +365,15 @@ def _coerce_task_items(items: Sequence[object]) -> list[Task]:
def _coerce_task_event_rows(
items: Sequence[object],
) -> list[tuple[ActivityEvent, Task | None]]:
"""Normalize DB rows into `(ActivityEvent, Task | None)` tuples.
Depending on the SQLAlchemy/SQLModel execution path, result rows may arrive as:
- real Python tuples, or
- row-like objects supporting `__len__` and `__getitem__`.
This helper centralizes validation so SSE/event-stream logic can assume a stable shape.
"""
rows: list[tuple[ActivityEvent, Task | None]] = []
for item in items:
first: object
@@ -382,6 +410,12 @@ async def _lead_was_mentioned(
task: Task,
lead: Agent,
) -> bool:
"""Return `True` if the lead agent is mentioned in any comment on the task.
This is used to avoid redundant lead pings (especially in auto-created tasks) while still
ensuring escalation happens when explicitly requested.
"""
statement = (
select(ActivityEvent.message)
.where(col(ActivityEvent.task_id) == task.id)
@@ -398,6 +432,8 @@ async def _lead_was_mentioned(
def _lead_created_task(task: Task, lead: Agent) -> bool:
"""Return `True` if `task` was auto-created by the lead agent."""
if not task.auto_created or not task.auto_reason:
return False
return task.auto_reason == f"lead_agent:{lead.id}"
@@ -411,6 +447,13 @@ async def _reconcile_dependents_for_dependency_toggle(
previous_status: str,
actor_agent_id: UUID | None,
) -> None:
"""Apply dependency side-effects when a dependency task toggles done/undone.
The UI models dependencies as a DAG: when a dependency is reopened, dependents that were
previously marked done may need to be reopened or flagged. This helper keeps dependent state
consistent with the dependency graph without duplicating logic across endpoints.
"""
done_toggled = (previous_status == "done") != (dependency_task.status == "done")
if not done_toggled:
return

View File

@@ -66,6 +66,13 @@ class AuthContext:
def _extract_bearer_token(authorization: str | None) -> str | None:
"""Extract the bearer token from an `Authorization` header.
Returns `None` for missing/empty headers or non-bearer schemes.
Note: we do *not* validate the token here; this helper is only responsible for parsing.
"""
if not authorization:
return None
value = authorization.strip()
@@ -92,6 +99,14 @@ def _normalize_email(value: object) -> str | None:
def _extract_claim_email(claims: dict[str, object]) -> str | None:
"""Best-effort extraction of an email address from Clerk/JWT-like claims.
Clerk payloads vary depending on token type and SDK version. We try common flat keys first,
then fall back to an `email_addresses` list (either strings or dict-like entries).
Returns a normalized lowercase email or `None`.
"""
for key in ("email", "email_address", "primary_email_address"):
email = _normalize_email(claims.get(key))
if email:
@@ -119,10 +134,13 @@ def _extract_claim_email(claims: dict[str, object]) -> str | None:
return candidate
if fallback_email is None:
fallback_email = candidate
return fallback_email
def _extract_claim_name(claims: dict[str, object]) -> str | None:
"""Best-effort extraction of a display name from Clerk/JWT-like claims."""
for key in ("name", "full_name"):
text = _non_empty_str(claims.get(key))
if text:
@@ -137,6 +155,17 @@ def _extract_claim_name(claims: dict[str, object]) -> str | None:
def _extract_clerk_profile(profile: ClerkUser | None) -> tuple[str | None, str | None]:
"""Extract `(email, name)` from a Clerk user profile.
The Clerk SDK surface is not perfectly consistent across environments:
- some fields may be absent,
- email addresses may be represented as strings or objects,
- the "primary" email may be identified by id.
This helper implements a defensive, best-effort extraction strategy and returns `(None, None)`
when the profile is unavailable.
"""
if profile is None:
return None, None

View File

@@ -110,27 +110,42 @@ def _heartbeat_config(agent: Agent) -> dict[str, Any]:
def _channel_heartbeat_visibility_patch(config_data: dict[str, Any]) -> dict[str, Any] | None:
"""Build a minimal patch ensuring channel default heartbeat visibility is configured.
Gateways may have existing channel config; we only want to fill missing keys rather than
overwrite operator intent. Returns `None` if no change is needed, otherwise returns a shallow
patch dict suitable for a config merge."""
channels = config_data.get("channels")
if not isinstance(channels, dict):
return {"defaults": {"heartbeat": DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY.copy()}}
defaults = channels.get("defaults")
if not isinstance(defaults, dict):
return {"defaults": {"heartbeat": DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY.copy()}}
heartbeat = defaults.get("heartbeat")
if not isinstance(heartbeat, dict):
return {"defaults": {"heartbeat": DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY.copy()}}
merged = dict(heartbeat)
changed = False
for key, value in DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY.items():
if key not in merged:
merged[key] = value
changed = True
if not changed:
return None
return {"defaults": {"heartbeat": merged}}
def _template_env() -> Environment:
"""Create the Jinja environment used for gateway template rendering.
Note: we intentionally disable auto-escaping so markdown/plaintext templates render verbatim.
"""
return Environment(
loader=FileSystemLoader(_templates_root()),
# Render markdown verbatim (HTML escaping makes it harder for agents to read).
@@ -145,19 +160,34 @@ def _heartbeat_template_name(agent: Agent) -> str:
def _workspace_path(agent: Agent, workspace_root: str) -> str:
"""Return the absolute on-disk workspace directory for an agent.
Why this exists:
- We derive the folder name from a stable *agent key* (ultimately rooted in ids/session keys)
rather than display names to avoid collisions.
- We preserve a historical gateway-main naming quirk to avoid moving existing directories.
This path is later interpolated into template files (TOOLS.md, etc.) that agents treat as the
source of truth for where to read/write.
"""
if not workspace_root:
msg = "gateway_workspace_root is required"
raise ValueError(msg)
root = workspace_root.rstrip("/")
# Use agent key derived from session key when possible. This prevents collisions for
# lead agents (session key includes board id) even if multiple boards share the same
# display name (e.g. "Lead Agent").
key = _agent_key(agent)
# Backwards-compat: gateway-main agents historically used session keys that encoded
# "gateway-<id>" while the gateway agent id is "mc-gateway-<id>".
# Keep the on-disk workspace path stable so existing provisioned files aren't moved.
if key.startswith("mc-gateway-"):
key = key.removeprefix("mc-")
return f"{root}/workspace-{slugify(key)}"