Merge branch 'master' into fix/agent-auth-accept-bearer-in-optional-dep

This commit is contained in:
Abhimanyu Saharan
2026-02-27 01:49:45 +05:30
committed by GitHub
58 changed files with 2905 additions and 502 deletions

View File

@@ -102,22 +102,18 @@ jobs:
- name: Run backend checks
env:
# Keep CI builds deterministic.
NEXT_TELEMETRY_DISABLED: "1"
AUTH_MODE: "clerk"
CLERK_SECRET_KEY: ${{ secrets.CLERK_SECRET_KEY }}
AUTH_MODE: "local"
LOCAL_AUTH_TOKEN: "ci-local-auth-token-0123456789-0123456789-0123456789x"
run: |
make backend-lint
make backend-typecheck
make backend-coverage
- name: Run frontend checks
env:
# Keep CI builds deterministic.
NEXT_TELEMETRY_DISABLED: "1"
NEXT_PUBLIC_API_URL: ${{ secrets.NEXT_PUBLIC_API_URL }}
NEXT_PUBLIC_AUTH_MODE: "clerk"
CLERK_SECRET_KEY: ${{ secrets.CLERK_SECRET_KEY }}
NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY: ${{ secrets.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY }}
NEXT_PUBLIC_API_URL: "http://localhost:8000"
NEXT_PUBLIC_AUTH_MODE: "local"
run: |
make frontend-lint
make frontend-typecheck
@@ -125,7 +121,7 @@ jobs:
make frontend-build
- name: Docs quality gates (lint + relative link check)
- name: Docs quality gates
run: |
make docs-check
@@ -219,11 +215,9 @@ jobs:
- name: Start frontend (dev server)
env:
NEXT_PUBLIC_API_URL: ${{ secrets.NEXT_PUBLIC_API_URL }}
NEXT_PUBLIC_AUTH_MODE: "clerk"
NEXT_PUBLIC_API_URL: "http://localhost:8000"
NEXT_PUBLIC_AUTH_MODE: "local"
NEXT_TELEMETRY_DISABLED: "1"
CLERK_SECRET_KEY: ${{ secrets.CLERK_SECRET_KEY }}
NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY: ${{ secrets.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY }}
run: |
cd frontend
npm run dev -- --hostname 0.0.0.0 --port 3000 &
@@ -236,13 +230,9 @@ jobs:
- name: Run Cypress E2E
env:
NEXT_PUBLIC_API_URL: ${{ secrets.NEXT_PUBLIC_API_URL }}
NEXT_PUBLIC_AUTH_MODE: "clerk"
NEXT_PUBLIC_API_URL: "http://localhost:8000"
NEXT_PUBLIC_AUTH_MODE: "local"
NEXT_TELEMETRY_DISABLED: "1"
# Clerk testing tokens (official @clerk/testing Cypress integration)
CLERK_SECRET_KEY: ${{ secrets.CLERK_SECRET_KEY }}
# Also set for the app itself.
NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY: ${{ secrets.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY }}
run: |
cd frontend
npm run e2e -- --browser chrome

View File

@@ -55,10 +55,10 @@ frontend-format-check: frontend-tooling ## Check frontend formatting (prettier)
$(NODE_WRAP) --cwd $(FRONTEND_DIR) npx prettier --check "src/**/*.{ts,tsx,js,jsx,json,css,md}" "*.{ts,js,json,md,mdx}"
.PHONY: lint
lint: backend-lint frontend-lint ## Lint backend + frontend
lint: backend-lint frontend-lint docs-lint ## Lint backend + frontend + docs
.PHONY: backend-lint
backend-lint: ## Lint backend (flake8)
backend-lint: backend-format-check backend-typecheck ## Lint backend (isort/black checks + flake8 + mypy)
cd $(BACKEND_DIR) && uv run flake8 --config .flake8
.PHONY: frontend-lint

View File

@@ -7,6 +7,11 @@ REQUEST_LOG_INCLUDE_HEALTH=false
DATABASE_URL=postgresql+psycopg://postgres:postgres@localhost:5432/mission_control
CORS_ORIGINS=http://localhost:3000
BASE_URL=
# Security response headers (blank values disable each header).
SECURITY_HEADER_X_CONTENT_TYPE_OPTIONS=
SECURITY_HEADER_X_FRAME_OPTIONS=
SECURITY_HEADER_REFERRER_POLICY=
SECURITY_HEADER_PERMISSIONS_POLICY=
# Auth mode: clerk or local.
AUTH_MODE=local

38
backend/.env.test Normal file
View File

@@ -0,0 +1,38 @@
# Commit-safe backend test environment.
# Usage:
# cd backend
# uv run --env-file .env.test uvicorn app.main:app --reload --port 8000
ENVIRONMENT=dev
LOG_LEVEL=INFO
LOG_FORMAT=text
LOG_USE_UTC=false
REQUEST_LOG_SLOW_MS=1000
REQUEST_LOG_INCLUDE_HEALTH=false
# Local backend -> local Postgres (adjust host/port if needed)
DATABASE_URL=postgresql+psycopg://postgres:postgres@localhost:5432/mission_control_test
CORS_ORIGINS=http://localhost:3000
BASE_URL=
# Auth mode: local for test/dev
AUTH_MODE=local
# Must be non-placeholder and >= 50 chars
LOCAL_AUTH_TOKEN=test-local-token-0123456789-0123456789-0123456789x
# Clerk settings kept empty in local auth mode
CLERK_SECRET_KEY=
CLERK_API_URL=https://api.clerk.com
CLERK_VERIFY_IAT=true
CLERK_LEEWAY=10.0
# Database
DB_AUTO_MIGRATE=true
# Queue / dispatch
RQ_REDIS_URL=redis://localhost:6379/0
RQ_QUEUE_NAME=default
RQ_DISPATCH_THROTTLE_SECONDS=15.0
RQ_DISPATCH_MAX_RETRIES=3
GATEWAY_MIN_VERSION=2026.02.9

View File

@@ -101,17 +101,20 @@ Notes:
From repo root (recommended):
```bash
make backend-test
make backend-lint
make backend-typecheck
make backend-test
make backend-coverage
```
`make backend-lint` runs backend format checks (`isort`, `black`), lint (`flake8`), and typecheck (`mypy`) in one command.
Or from `backend/`:
```bash
cd backend
uv run pytest
uv run isort . --check-only --diff
uv run black . --check --diff
uv run flake8 --config .flake8
uv run mypy
```

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
import json
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Literal, cast
from uuid import UUID
@@ -63,6 +65,43 @@ _ERR_GATEWAY_MAIN_AGENT_REQUIRED = (
)
def _format_board_field_value(value: object) -> str:
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, UUID):
return str(value)
if isinstance(value, dict):
return json.dumps(value, sort_keys=True, default=str)
if isinstance(value, bool):
return "true" if value else "false"
if value is None:
return "null"
return str(value)
def _board_update_message(
*,
board: Board,
changed_fields: dict[str, tuple[object, object]],
) -> str:
lines = [
"BOARD UPDATED",
f"Board: {board.name}",
f"Board ID: {board.id}",
"",
"Changed fields:",
]
for field_name in sorted(changed_fields):
previous, current = changed_fields[field_name]
lines.append(
f"- {field_name}: {_format_board_field_value(previous)}"
f" -> {_format_board_field_value(current)}"
)
lines.append("")
lines.append("Take action: review the board changes and adjust plan/assignments as needed.")
return "\n".join(lines)
async def _require_gateway_main_agent(session: AsyncSession, gateway: Gateway) -> None:
main_agent = (
await Agent.objects.filter_by(gateway_id=gateway.id)
@@ -366,6 +405,53 @@ async def _notify_agents_on_board_group_removal(
)
async def _notify_lead_on_board_update(
*,
session: AsyncSession,
board: Board,
changed_fields: dict[str, tuple[object, object]],
) -> None:
if not changed_fields:
return
lead = (
await Agent.objects.filter_by(board_id=board.id)
.filter(col(Agent.is_board_lead).is_(True))
.first(session)
)
if lead is None or not lead.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
message = _board_update_message(
board=board,
changed_fields=changed_fields,
)
error = await dispatch.try_send_agent_message(
session_key=lead.openclaw_session_id,
config=config,
agent_name=lead.name,
message=message,
deliver=False,
)
if error is None:
record_activity(
session,
event_type="board.lead_notified",
message=f"Lead agent notified for board update: {board.name}.",
agent_id=lead.id,
)
else:
record_activity(
session,
event_type="board.lead_notify_failed",
message=f"Lead board update notify failed for {board.name}: {error}",
agent_id=lead.id,
)
await session.commit()
@router.get("", response_model=DefaultLimitOffsetPage[BoardRead])
async def list_boards(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
@@ -450,8 +536,19 @@ async def update_board(
board: Board = BOARD_USER_WRITE_DEP,
) -> Board:
"""Update mutable board properties."""
requested_updates = payload.model_dump(exclude_unset=True)
previous_values = {
field_name: getattr(board, field_name)
for field_name in requested_updates
if hasattr(board, field_name)
}
previous_group_id = board.board_group_id
updated = await _apply_board_update(payload=payload, session=session, board=board)
changed_fields = {
field_name: (previous_value, getattr(updated, field_name))
for field_name, previous_value in previous_values.items()
if previous_value != getattr(updated, field_name)
}
new_group_id = updated.board_group_id
if previous_group_id is not None and previous_group_id != new_group_id:
previous_group = await crud.get_by_id(session, BoardGroup, previous_group_id)
@@ -483,6 +580,19 @@ async def update_board(
updated.id,
new_group_id,
)
if changed_fields:
try:
await _notify_lead_on_board_update(
session=session,
board=updated,
changed_fields=changed_fields,
)
except (OpenClawGatewayError, OSError, RuntimeError, ValueError):
logger.exception(
"board.update.notify_lead_unexpected board_id=%s changed_fields=%s",
updated.id,
sorted(changed_fields),
)
return updated

View File

@@ -259,6 +259,19 @@ async def _require_review_before_done_when_enabled(
raise _review_required_for_done_error()
async def _require_comment_for_review_when_enabled(
session: AsyncSession,
*,
board_id: UUID,
) -> bool:
requires_comment = (
await session.exec(
select(col(Board.comment_required_for_review)).where(col(Board.id) == board_id),
)
).first()
return bool(requires_comment)
async def _require_no_pending_approval_for_status_change_when_enabled(
session: AsyncSession,
*,
@@ -318,22 +331,41 @@ async def has_valid_recent_comment(
def _parse_since(value: str | None) -> datetime | None:
"""Parse an optional ISO-8601 timestamp into a naive UTC `datetime`.
The API accepts either naive timestamps (treated as UTC) or timezone-aware values.
Returning naive UTC simplifies SQLModel comparisons against stored naive UTC values.
"""
if not value:
return None
normalized = value.strip()
if not normalized:
return None
# Allow common ISO-8601 `Z` suffix (UTC) even though `datetime.fromisoformat` expects `+00:00`.
normalized = normalized.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is not None:
return parsed.astimezone(UTC).replace(tzinfo=None)
# No tzinfo: interpret as UTC for consistency with other API timestamps.
return parsed
def _coerce_task_items(items: Sequence[object]) -> list[Task]:
"""Validate/convert paginated query results to a concrete `list[Task]`.
SQLModel pagination helpers return `Sequence[object]`; we validate types early so the
rest of the route logic can assume real `Task` instances.
"""
tasks: list[Task] = []
for item in items:
if not isinstance(item, Task):
@@ -346,6 +378,15 @@ def _coerce_task_items(items: Sequence[object]) -> list[Task]:
def _coerce_task_event_rows(
items: Sequence[object],
) -> list[tuple[ActivityEvent, Task | None]]:
"""Normalize DB rows into `(ActivityEvent, Task | None)` tuples.
Depending on the SQLAlchemy/SQLModel execution path, result rows may arrive as:
- real Python tuples, or
- row-like objects supporting `__len__` and `__getitem__`.
This helper centralizes validation so SSE/event-stream logic can assume a stable shape.
"""
rows: list[tuple[ActivityEvent, Task | None]] = []
for item in items:
first: object
@@ -382,6 +423,12 @@ async def _lead_was_mentioned(
task: Task,
lead: Agent,
) -> bool:
"""Return `True` if the lead agent is mentioned in any comment on the task.
This is used to avoid redundant lead pings (especially in auto-created tasks) while still
ensuring escalation happens when explicitly requested.
"""
statement = (
select(ActivityEvent.message)
.where(col(ActivityEvent.task_id) == task.id)
@@ -398,6 +445,8 @@ async def _lead_was_mentioned(
def _lead_created_task(task: Task, lead: Agent) -> bool:
"""Return `True` if `task` was auto-created by the lead agent."""
if not task.auto_created or not task.auto_reason:
return False
return task.auto_reason == f"lead_agent:{lead.id}"
@@ -411,6 +460,13 @@ async def _reconcile_dependents_for_dependency_toggle(
previous_status: str,
actor_agent_id: UUID | None,
) -> None:
"""Apply dependency side-effects when a dependency task toggles done/undone.
The UI models dependencies as a DAG: when a dependency is reopened, dependents that were
previously marked done may need to be reopened or flagged. This helper keeps dependent state
consistent with the dependency graph without duplicating logic across endpoints.
"""
done_toggled = (previous_status == "done") != (dependency_task.status == "done")
if not done_toggled:
return
@@ -533,6 +589,75 @@ async def _send_agent_task_message(
)
def _assignment_notification_message(*, board: Board, task: Task, agent: Agent) -> str:
description = _truncate_snippet(task.description or "")
details = [
f"Board: {board.name}",
f"Task: {task.title}",
f"Task ID: {task.id}",
f"Status: {task.status}",
]
if description:
details.append(f"Description: {description}")
if task.status == "review" and agent.is_board_lead:
action = (
"Take action: review the deliverables now. "
"Approve by moving to done or return to inbox with clear feedback."
)
return "TASK READY FOR LEAD REVIEW\n" + "\n".join(details) + f"\n\n{action}"
return (
"TASK ASSIGNED\n"
+ "\n".join(details)
+ ("\n\nTake action: open the task and begin work. " "Post updates as task comments.")
)
def _rework_notification_message(
*,
board: Board,
task: Task,
feedback: str | None,
) -> str:
description = _truncate_snippet(task.description or "")
details = [
f"Board: {board.name}",
f"Task: {task.title}",
f"Task ID: {task.id}",
f"Status: {task.status}",
]
if description:
details.append(f"Description: {description}")
requested_changes = (
_truncate_snippet(feedback)
if feedback and feedback.strip()
else "Lead requested changes. Review latest task comments for exact required updates."
)
return (
"CHANGES REQUESTED\n"
+ "\n".join(details)
+ "\n\nRequested changes:\n"
+ requested_changes
+ "\n\nTake action: address the requested changes, then move the task back to review."
)
async def _latest_task_comment_by_agent(
session: AsyncSession,
*,
task_id: UUID,
agent_id: UUID,
) -> str | None:
statement = (
select(col(ActivityEvent.message))
.where(col(ActivityEvent.task_id) == task_id)
.where(col(ActivityEvent.event_type) == "task.comment")
.where(col(ActivityEvent.agent_id) == agent_id)
.order_by(desc(col(ActivityEvent.created_at)))
.limit(1)
)
return (await session.exec(statement)).first()
async def _notify_agent_on_task_assign(
*,
session: AsyncSession,
@@ -546,20 +671,7 @@ async def _notify_agent_on_task_assign(
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
description = _truncate_snippet(task.description or "")
details = [
f"Board: {board.name}",
f"Task: {task.title}",
f"Task ID: {task.id}",
f"Status: {task.status}",
]
if description:
details.append(f"Description: {description}")
message = (
"TASK ASSIGNED\n"
+ "\n".join(details)
+ ("\n\nTake action: open the task and begin work. " "Post updates as task comments.")
)
message = _assignment_notification_message(board=board, task=task, agent=agent)
error = await _send_agent_task_message(
dispatch=dispatch,
session_key=agent.openclaw_session_id,
@@ -587,6 +699,57 @@ async def _notify_agent_on_task_assign(
await session.commit()
async def _notify_agent_on_task_rework(
*,
session: AsyncSession,
board: Board,
task: Task,
agent: Agent,
lead: Agent,
) -> None:
if not agent.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
feedback = await _latest_task_comment_by_agent(
session,
task_id=task.id,
agent_id=lead.id,
)
message = _rework_notification_message(
board=board,
task=task,
feedback=feedback,
)
error = await _send_agent_task_message(
dispatch=dispatch,
session_key=agent.openclaw_session_id,
config=config,
agent_name=agent.name,
message=message,
)
if error is None:
record_activity(
session,
event_type="task.rework_notified",
message=f"Assignee notified about requested changes: {agent.name}.",
agent_id=agent.id,
task_id=task.id,
)
await session.commit()
else:
record_activity(
session,
event_type="task.rework_notify_failed",
message=f"Rework notify failed: {error}",
agent_id=agent.id,
task_id=task.id,
)
await session.commit()
async def notify_agent_on_task_assign(
*,
session: AsyncSession,
@@ -1905,7 +2068,42 @@ async def _lead_apply_assignment(
update.task.assigned_agent_id = agent.id
def _lead_apply_status(update: _TaskUpdateInput) -> None:
async def _last_worker_who_moved_task_to_review(
session: AsyncSession,
*,
task_id: UUID,
board_id: UUID,
lead_agent_id: UUID,
) -> UUID | None:
statement = (
select(col(ActivityEvent.agent_id))
.where(col(ActivityEvent.task_id) == task_id)
.where(col(ActivityEvent.event_type) == "task.status_changed")
.where(col(ActivityEvent.message).like("Task moved to review:%"))
.where(col(ActivityEvent.agent_id).is_not(None))
.order_by(desc(col(ActivityEvent.created_at)))
)
candidate_ids = list(await session.exec(statement))
for candidate_id in candidate_ids:
if candidate_id is None or candidate_id == lead_agent_id:
continue
candidate = await Agent.objects.by_id(candidate_id).first(session)
if candidate is None:
continue
if candidate.board_id != board_id or candidate.is_board_lead:
continue
return candidate.id
return None
async def _lead_apply_status(
session: AsyncSession,
*,
update: _TaskUpdateInput,
) -> None:
if update.actor.actor_type != "agent" or update.actor.agent is None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
lead_agent = update.actor.agent
if "status" not in update.updates:
return
if update.task.status != "review":
@@ -1926,7 +2124,12 @@ def _lead_apply_status(update: _TaskUpdateInput) -> None:
),
)
if target_status == "inbox":
update.task.assigned_agent_id = None
update.task.assigned_agent_id = await _last_worker_who_moved_task_to_review(
session,
task_id=update.task.id,
board_id=update.board_id,
lead_agent_id=lead_agent.id,
)
update.task.in_progress_at = None
update.task.status = target_status
@@ -1958,6 +2161,21 @@ async def _lead_notify_new_assignee(
else None
)
if board:
if (
update.previous_status == "review"
and update.task.status == "inbox"
and update.actor.actor_type == "agent"
and update.actor.agent
and update.actor.agent.is_board_lead
):
await _notify_agent_on_task_rework(
session=session,
board=board,
task=update.task,
agent=assigned_agent,
lead=update.actor.agent,
)
return
await _notify_agent_on_task_assign(
session=session,
board=board,
@@ -1994,7 +2212,7 @@ async def _apply_lead_task_update(
raise _blocked_task_error(blocked_by)
await _lead_apply_assignment(session, update=update)
_lead_apply_status(update)
await _lead_apply_status(session, update=update)
await _require_no_pending_approval_for_status_change_when_enabled(
session,
board_id=update.board_id,
@@ -2263,6 +2481,23 @@ async def _record_task_update_activity(
await session.commit()
async def _assign_review_task_to_lead(
session: AsyncSession,
*,
update: _TaskUpdateInput,
) -> None:
if update.task.status != "review" or update.previous_status == "review":
return
lead = (
await Agent.objects.filter_by(board_id=update.board_id)
.filter(col(Agent.is_board_lead).is_(True))
.first(session)
)
if lead is None:
return
update.task.assigned_agent_id = lead.id
async def _notify_task_update_assignment_changes(
session: AsyncSession,
*,
@@ -2290,12 +2525,6 @@ async def _notify_task_update_assignment_changes(
or update.task.assigned_agent_id == update.previous_assigned
):
return
if (
update.actor.actor_type == "agent"
and update.actor.agent
and update.task.assigned_agent_id == update.actor.agent.id
):
return
assigned_agent = await Agent.objects.by_id(update.task.assigned_agent_id).first(
session,
)
@@ -2306,6 +2535,28 @@ async def _notify_task_update_assignment_changes(
if update.task.board_id
else None
)
if (
update.previous_status == "review"
and update.task.status == "inbox"
and update.actor.actor_type == "agent"
and update.actor.agent
and update.actor.agent.is_board_lead
):
if board:
await _notify_agent_on_task_rework(
session=session,
board=board,
task=update.task,
agent=assigned_agent,
lead=update.actor.agent,
)
return
if (
update.actor.actor_type == "agent"
and update.actor.agent
and update.task.assigned_agent_id == update.actor.agent.id
):
return
if board:
await _notify_agent_on_task_assign(
session=session,
@@ -2346,9 +2597,12 @@ async def _finalize_updated_task(
update.task.updated_at = utcnow()
status_raw = update.updates.get("status")
# Entering review requires either a new comment or a valid recent one to
# ensure reviewers get context on readiness.
if status_raw == "review":
# Entering review can require a new comment or valid recent context when
# the board-level rule is enabled.
if status_raw == "review" and await _require_comment_for_review_when_enabled(
session,
board_id=update.board_id,
):
comment_text = (update.comment or "").strip()
review_comment_author = update.task.assigned_agent_id or update.previous_assigned
review_comment_since = (
@@ -2363,6 +2617,7 @@ async def _finalize_updated_task(
review_comment_since,
):
raise _comment_validation_error()
await _assign_review_task_to_lead(session, update=update)
if update.tag_ids is not None:
normalized = (

View File

@@ -66,6 +66,13 @@ class AuthContext:
def _extract_bearer_token(authorization: str | None) -> str | None:
"""Extract the bearer token from an `Authorization` header.
Returns `None` for missing/empty headers or non-bearer schemes.
Note: we do *not* validate the token here; this helper is only responsible for parsing.
"""
if not authorization:
return None
value = authorization.strip()
@@ -92,6 +99,14 @@ def _normalize_email(value: object) -> str | None:
def _extract_claim_email(claims: dict[str, object]) -> str | None:
"""Best-effort extraction of an email address from Clerk/JWT-like claims.
Clerk payloads vary depending on token type and SDK version. We try common flat keys first,
then fall back to an `email_addresses` list (either strings or dict-like entries).
Returns a normalized lowercase email or `None`.
"""
for key in ("email", "email_address", "primary_email_address"):
email = _normalize_email(claims.get(key))
if email:
@@ -119,10 +134,13 @@ def _extract_claim_email(claims: dict[str, object]) -> str | None:
return candidate
if fallback_email is None:
fallback_email = candidate
return fallback_email
def _extract_claim_name(claims: dict[str, object]) -> str | None:
"""Best-effort extraction of a display name from Clerk/JWT-like claims."""
for key in ("name", "full_name"):
text = _non_empty_str(claims.get(key))
if text:
@@ -137,6 +155,17 @@ def _extract_claim_name(claims: dict[str, object]) -> str | None:
def _extract_clerk_profile(profile: ClerkUser | None) -> tuple[str | None, str | None]:
"""Extract `(email, name)` from a Clerk user profile.
The Clerk SDK surface is not perfectly consistent across environments:
- some fields may be absent,
- email addresses may be represented as strings or objects,
- the "primary" email may be identified by id.
This helper implements a defensive, best-effort extraction strategy and returns `(None, None)`
when the profile is unavailable.
"""
if profile is None:
return None, None

View File

@@ -49,6 +49,11 @@ class Settings(BaseSettings):
cors_origins: str = ""
base_url: str = ""
# Security response headers (blank disables header injection)
security_header_x_content_type_options: str = ""
security_header_x_frame_options: str = ""
security_header_referrer_policy: str = ""
security_header_permissions_policy: str = ""
# Database lifecycle
db_auto_migrate: bool = False

View File

@@ -0,0 +1,81 @@
"""ASGI middleware for configurable security response headers."""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING: # pragma: no cover
from starlette.types import ASGIApp, Message, Receive, Scope, Send
class SecurityHeadersMiddleware:
"""Inject configured security headers into every HTTP response."""
_X_CONTENT_TYPE_OPTIONS = b"x-content-type-options"
_X_FRAME_OPTIONS = b"x-frame-options"
_REFERRER_POLICY = b"referrer-policy"
_PERMISSIONS_POLICY = b"permissions-policy"
def __init__(
self,
app: ASGIApp,
*,
x_content_type_options: str = "",
x_frame_options: str = "",
referrer_policy: str = "",
permissions_policy: str = "",
) -> None:
self._app = app
self._configured_headers = self._build_configured_headers(
x_content_type_options=x_content_type_options,
x_frame_options=x_frame_options,
referrer_policy=referrer_policy,
permissions_policy=permissions_policy,
)
@classmethod
def _build_configured_headers(
cls,
*,
x_content_type_options: str,
x_frame_options: str,
referrer_policy: str,
permissions_policy: str,
) -> tuple[tuple[bytes, bytes, bytes], ...]:
configured: list[tuple[bytes, bytes, bytes]] = []
for header_name, value in (
(cls._X_CONTENT_TYPE_OPTIONS, x_content_type_options),
(cls._X_FRAME_OPTIONS, x_frame_options),
(cls._REFERRER_POLICY, referrer_policy),
(cls._PERMISSIONS_POLICY, permissions_policy),
):
normalized = value.strip()
if not normalized:
continue
configured.append(
(
header_name.lower(),
header_name,
normalized.encode("latin-1"),
)
)
return tuple(configured)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""Append configured security headers unless already present."""
if scope["type"] != "http" or not self._configured_headers:
await self._app(scope, receive, send)
return
async def send_with_security_headers(message: Message) -> None:
if message["type"] == "http.response.start":
# Starlette uses `list[tuple[bytes, bytes]]` for raw headers.
headers: list[tuple[bytes, bytes]] = message.setdefault("headers", [])
existing = {key.lower() for key, _ in headers}
for key_lower, key, value in self._configured_headers:
if key_lower not in existing:
headers.append((key, value))
existing.add(key_lower)
await send(message)
await self._app(scope, receive, send_with_security_headers)

View File

@@ -34,6 +34,7 @@ from app.api.users import router as users_router
from app.core.config import settings
from app.core.error_handling import install_error_handling
from app.core.logging import configure_logging, get_logger
from app.core.security_headers import SecurityHeadersMiddleware
from app.db.session import init_db
from app.schemas.health import HealthStatusResponse
@@ -464,6 +465,13 @@ if origins:
else:
logger.info("app.cors.disabled")
app.add_middleware(
SecurityHeadersMiddleware,
x_content_type_options=settings.security_header_x_content_type_options,
x_frame_options=settings.security_header_x_frame_options,
referrer_policy=settings.security_header_referrer_policy,
permissions_policy=settings.security_header_permissions_policy,
)
install_error_handling(app)

View File

@@ -43,6 +43,11 @@ class Agent(QueryModel, table=True):
delete_requested_at: datetime | None = Field(default=None)
delete_confirm_token_hash: str | None = Field(default=None, index=True)
last_seen_at: datetime | None = Field(default=None)
lifecycle_generation: int = Field(default=0)
wake_attempts: int = Field(default=0)
last_wake_sent_at: datetime | None = Field(default=None)
checkin_deadline_at: datetime | None = Field(default=None)
last_provision_error: str | None = Field(default=None, sa_column=Column(Text))
is_board_lead: bool = Field(default=False, index=True)
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)

View File

@@ -41,6 +41,7 @@ class Board(TenantScoped, table=True):
goal_source: str | None = None
require_approval_for_done: bool = Field(default=True)
require_review_before_done: bool = Field(default=False)
comment_required_for_review: bool = Field(default=False)
block_status_changes_with_pending_approval: bool = Field(default=False)
only_lead_can_change_status: bool = Field(default=False)
max_agents: int = Field(default=1)

View File

@@ -31,6 +31,7 @@ class BoardBase(SQLModel):
goal_source: str | None = None
require_approval_for_done: bool = True
require_review_before_done: bool = False
comment_required_for_review: bool = False
block_status_changes_with_pending_approval: bool = False
only_lead_can_change_status: bool = False
max_agents: int = Field(default=1, ge=0)
@@ -75,6 +76,7 @@ class BoardUpdate(SQLModel):
goal_source: str | None = None
require_approval_for_done: bool | None = None
require_review_before_done: bool | None = None
comment_required_for_review: bool | None = None
block_status_changes_with_pending_approval: bool | None = None
only_lead_can_change_status: bool | None = None
max_agents: int | None = Field(default=None, ge=0)

View File

@@ -21,24 +21,18 @@ from app.models.gateways import Gateway
from app.models.tasks import Task
from app.schemas.gateways import GatewayTemplatesSyncResult
from app.services.openclaw.constants import DEFAULT_HEARTBEAT_CONFIG
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.error_messages import normalize_gateway_error_message
from app.services.openclaw.gateway_compat import check_gateway_version_compatibility
from app.services.openclaw.gateway_rpc import GatewayConfig as GatewayClientConfig
from app.services.openclaw.gateway_rpc import OpenClawGatewayError, openclaw_call
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.provisioning_db import (
GatewayTemplateSyncOptions,
OpenClawProvisioningService,
)
from app.services.openclaw.session_service import GatewayTemplateSyncQuery
from app.services.openclaw.shared import GatewayAgentIdentity
from app.services.organizations import get_org_owner_user
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -222,69 +216,38 @@ class GatewayAdminLifecycleService(OpenClawDBService):
action: str,
notify: bool,
) -> Agent:
template_user = user or await get_org_owner_user(
self.session,
organization_id=gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail="Organization owner not found (required for gateway agent USER.md rendering).",
)
raw_token = mint_agent_token(agent)
mark_provision_requested(
agent,
action=action,
status="updating" if action == "update" else "provisioning",
)
await self.add_commit_refresh(agent)
if not gateway.url:
return agent
orchestrator = AgentLifecycleOrchestrator(self.session)
try:
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=agent,
provisioned = await orchestrator.run_lifecycle(
gateway=gateway,
agent_id=agent.id,
board=None,
auth_token=raw_token,
user=template_user,
user=user,
action=action,
auth_token=None,
force_bootstrap=False,
reset_session=False,
wake=notify,
deliver_wakeup=True,
wakeup_verb=None,
clear_confirm_token=False,
raise_gateway_errors=True,
)
except OpenClawGatewayError as exc:
except HTTPException:
self.logger.error(
"gateway.main_agent.provision_failed_gateway gateway_id=%s agent_id=%s error=%s",
"gateway.main_agent.provision_failed gateway_id=%s agent_id=%s action=%s",
gateway.id,
agent.id,
str(exc),
action,
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
except (OSError, RuntimeError, ValueError) as exc:
self.logger.error(
"gateway.main_agent.provision_failed gateway_id=%s agent_id=%s error=%s",
gateway.id,
agent.id,
str(exc),
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing gateway provisioning.",
) from exc
mark_provision_complete(agent, status="online")
await self.add_commit_refresh(agent)
raise
self.logger.info(
"gateway.main_agent.provision_success gateway_id=%s agent_id=%s action=%s",
gateway.id,
agent.id,
provisioned.id,
action,
)
return agent
return provisioned
async def ensure_main_agent(
self,

View File

@@ -18,6 +18,11 @@ DEFAULT_HEARTBEAT_CONFIG: dict[str, Any] = {
}
OFFLINE_AFTER = timedelta(minutes=10)
# Provisioning convergence policy:
# - require first heartbeat/check-in within 30s of wake
# - allow up to 3 wake attempts before giving up
CHECKIN_DEADLINE_AFTER_WAKE = timedelta(seconds=30)
MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN = 3
AGENT_SESSION_PREFIX = "agent"
DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY: dict[str, bool] = {

View File

@@ -0,0 +1,167 @@
"""Unified agent lifecycle orchestration.
This module centralizes DB-backed lifecycle transitions so call sites do not
duplicate provisioning/wake/state logic.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import HTTPException, status
from sqlmodel import col, select
from app.core.time import utcnow
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services.openclaw.constants import CHECKIN_DEADLINE_AFTER_WAKE
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
from app.services.openclaw.gateway_rpc import OpenClawGatewayError
from app.services.openclaw.lifecycle_queue import (
QueuedAgentLifecycleReconcile,
enqueue_lifecycle_reconcile,
)
from app.services.openclaw.provisioning import OpenClawGatewayProvisioner
from app.services.organizations import get_org_owner_user
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
from app.models.users import User
class AgentLifecycleOrchestrator(OpenClawDBService):
"""Single lifecycle writer for agent provision/update transitions."""
def __init__(self, session: AsyncSession) -> None:
super().__init__(session)
async def _lock_agent(self, *, agent_id: UUID) -> Agent:
statement = select(Agent).where(col(Agent.id) == agent_id).with_for_update()
agent = (await self.session.exec(statement)).first()
if agent is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Agent not found")
return agent
async def run_lifecycle(
self,
*,
gateway: Gateway,
agent_id: UUID,
board: Board | None,
user: User | None,
action: str,
auth_token: str | None = None,
force_bootstrap: bool = False,
reset_session: bool = False,
wake: bool = True,
deliver_wakeup: bool = True,
wakeup_verb: str | None = None,
clear_confirm_token: bool = False,
raise_gateway_errors: bool = True,
) -> Agent:
"""Provision or update any agent under a per-agent lock."""
locked = await self._lock_agent(agent_id=agent_id)
template_user = user
if board is None and template_user is None:
template_user = await get_org_owner_user(
self.session,
organization_id=gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=(
"Organization owner not found "
"(required for gateway agent USER.md rendering)."
),
)
raw_token = auth_token or mint_agent_token(locked)
mark_provision_requested(
locked,
action=action,
status="updating" if action == "update" else "provisioning",
)
locked.lifecycle_generation += 1
locked.last_provision_error = None
locked.checkin_deadline_at = utcnow() + CHECKIN_DEADLINE_AFTER_WAKE if wake else None
if wake:
locked.wake_attempts += 1
locked.last_wake_sent_at = utcnow()
self.session.add(locked)
await self.session.flush()
if not gateway.url:
await self.session.commit()
await self.session.refresh(locked)
return locked
try:
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=locked,
gateway=gateway,
board=board,
auth_token=raw_token,
user=template_user,
action=action,
force_bootstrap=force_bootstrap,
reset_session=reset_session,
wake=wake,
deliver_wakeup=deliver_wakeup,
wakeup_verb=wakeup_verb,
)
except OpenClawGatewayError as exc:
locked.last_provision_error = str(exc)
locked.updated_at = utcnow()
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
return locked
except (OSError, RuntimeError, ValueError) as exc:
locked.last_provision_error = str(exc)
locked.updated_at = utcnow()
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing gateway provisioning.",
) from exc
return locked
mark_provision_complete(
locked,
status="online",
clear_confirm_token=clear_confirm_token,
)
locked.last_provision_error = None
locked.checkin_deadline_at = utcnow() + CHECKIN_DEADLINE_AFTER_WAKE if wake else None
self.session.add(locked)
await self.session.commit()
await self.session.refresh(locked)
if wake and locked.checkin_deadline_at is not None:
enqueue_lifecycle_reconcile(
QueuedAgentLifecycleReconcile(
agent_id=locked.id,
gateway_id=locked.gateway_id,
board_id=locked.board_id,
generation=locked.lifecycle_generation,
checkin_deadline_at=locked.checkin_deadline_at,
)
)
return locked

View File

@@ -0,0 +1,122 @@
"""Queue payload helpers for stuck-agent lifecycle reconciliation."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from uuid import UUID
from app.core.config import settings
from app.core.logging import get_logger
from app.core.time import utcnow
from app.services.queue import QueuedTask, enqueue_task_with_delay
from app.services.queue import requeue_if_failed as generic_requeue_if_failed
logger = get_logger(__name__)
TASK_TYPE = "agent_lifecycle_reconcile"
@dataclass(frozen=True)
class QueuedAgentLifecycleReconcile:
"""Queued payload metadata for lifecycle reconciliation checks."""
agent_id: UUID
gateway_id: UUID
board_id: UUID | None
generation: int
checkin_deadline_at: datetime
attempts: int = 0
def _task_from_payload(payload: QueuedAgentLifecycleReconcile) -> QueuedTask:
return QueuedTask(
task_type=TASK_TYPE,
payload={
"agent_id": str(payload.agent_id),
"gateway_id": str(payload.gateway_id),
"board_id": str(payload.board_id) if payload.board_id is not None else None,
"generation": payload.generation,
"checkin_deadline_at": payload.checkin_deadline_at.isoformat(),
},
created_at=utcnow(),
attempts=payload.attempts,
)
def decode_lifecycle_task(task: QueuedTask) -> QueuedAgentLifecycleReconcile:
if task.task_type not in {TASK_TYPE, "legacy"}:
raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}")
payload: dict[str, Any] = task.payload
raw_board_id = payload.get("board_id")
board_id = UUID(raw_board_id) if isinstance(raw_board_id, str) and raw_board_id else None
raw_deadline = payload.get("checkin_deadline_at")
if not isinstance(raw_deadline, str):
raise ValueError("checkin_deadline_at is required")
return QueuedAgentLifecycleReconcile(
agent_id=UUID(str(payload["agent_id"])),
gateway_id=UUID(str(payload["gateway_id"])),
board_id=board_id,
generation=int(payload["generation"]),
checkin_deadline_at=datetime.fromisoformat(raw_deadline),
attempts=int(payload.get("attempts", task.attempts)),
)
def enqueue_lifecycle_reconcile(payload: QueuedAgentLifecycleReconcile) -> bool:
"""Enqueue a delayed reconcile check keyed to the expected check-in deadline."""
now = utcnow()
delay_seconds = max(0.0, (payload.checkin_deadline_at - now).total_seconds())
queued = _task_from_payload(payload)
ok = enqueue_task_with_delay(
queued,
settings.rq_queue_name,
delay_seconds=delay_seconds,
redis_url=settings.rq_redis_url,
)
if ok:
logger.info(
"lifecycle.queue.enqueued",
extra={
"agent_id": str(payload.agent_id),
"generation": payload.generation,
"delay_seconds": delay_seconds,
"attempt": payload.attempts,
},
)
return ok
def defer_lifecycle_reconcile(
task: QueuedTask,
*,
delay_seconds: float,
) -> bool:
"""Defer a reconcile task without incrementing retry attempts."""
payload = decode_lifecycle_task(task)
deferred = QueuedAgentLifecycleReconcile(
agent_id=payload.agent_id,
gateway_id=payload.gateway_id,
board_id=payload.board_id,
generation=payload.generation,
checkin_deadline_at=payload.checkin_deadline_at,
attempts=task.attempts,
)
queued = _task_from_payload(deferred)
return enqueue_task_with_delay(
queued,
settings.rq_queue_name,
delay_seconds=max(0.0, delay_seconds),
redis_url=settings.rq_redis_url,
)
def requeue_lifecycle_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool:
"""Requeue a failed lifecycle task with capped retries."""
return generic_requeue_if_failed(
task,
settings.rq_queue_name,
max_retries=settings.rq_dispatch_max_retries,
redis_url=settings.rq_redis_url,
delay_seconds=max(0.0, delay_seconds),
)

View File

@@ -0,0 +1,140 @@
"""Worker handlers for lifecycle reconciliation tasks."""
from __future__ import annotations
import asyncio
from app.core.logging import get_logger
from app.core.time import utcnow
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
from app.services.openclaw.constants import MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.lifecycle_queue import decode_lifecycle_task, defer_lifecycle_reconcile
from app.services.queue import QueuedTask
logger = get_logger(__name__)
_RECONCILE_TIMEOUT_SECONDS = 60.0
def _has_checked_in_since_wake(agent: Agent) -> bool:
if agent.last_seen_at is None:
return False
if agent.last_wake_sent_at is None:
return True
return agent.last_seen_at >= agent.last_wake_sent_at
async def process_lifecycle_queue_task(task: QueuedTask) -> None:
"""Re-run lifecycle provisioning when an agent misses post-provision check-in."""
payload = decode_lifecycle_task(task)
now = utcnow()
async with async_session_maker() as session:
agent = await Agent.objects.by_id(payload.agent_id).first(session)
if agent is None:
logger.info(
"lifecycle.reconcile.skip_missing_agent",
extra={"agent_id": str(payload.agent_id)},
)
return
# Ignore stale queue messages after a newer lifecycle generation.
if agent.lifecycle_generation != payload.generation:
logger.info(
"lifecycle.reconcile.skip_stale_generation",
extra={
"agent_id": str(agent.id),
"queued_generation": payload.generation,
"current_generation": agent.lifecycle_generation,
},
)
return
if _has_checked_in_since_wake(agent):
logger.info(
"lifecycle.reconcile.skip_not_stuck",
extra={"agent_id": str(agent.id), "status": agent.status},
)
return
deadline = agent.checkin_deadline_at or payload.checkin_deadline_at
if agent.status == "deleting":
logger.info(
"lifecycle.reconcile.skip_deleting",
extra={"agent_id": str(agent.id)},
)
return
if now < deadline:
delay = max(0.0, (deadline - now).total_seconds())
if not defer_lifecycle_reconcile(task, delay_seconds=delay):
msg = "Failed to defer lifecycle reconcile task"
raise RuntimeError(msg)
logger.info(
"lifecycle.reconcile.deferred",
extra={"agent_id": str(agent.id), "delay_seconds": delay},
)
return
if agent.wake_attempts >= MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN:
agent.status = "offline"
agent.checkin_deadline_at = None
agent.last_provision_error = (
"Agent did not check in after wake; max wake attempts reached"
)
agent.updated_at = utcnow()
session.add(agent)
await session.commit()
logger.warning(
"lifecycle.reconcile.max_attempts_reached",
extra={
"agent_id": str(agent.id),
"wake_attempts": agent.wake_attempts,
"max_attempts": MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN,
},
)
return
gateway = await Gateway.objects.by_id(agent.gateway_id).first(session)
if gateway is None:
logger.warning(
"lifecycle.reconcile.skip_missing_gateway",
extra={"agent_id": str(agent.id), "gateway_id": str(agent.gateway_id)},
)
return
board: Board | None = None
if agent.board_id is not None:
board = await Board.objects.by_id(agent.board_id).first(session)
if board is None:
logger.warning(
"lifecycle.reconcile.skip_missing_board",
extra={"agent_id": str(agent.id), "board_id": str(agent.board_id)},
)
return
orchestrator = AgentLifecycleOrchestrator(session)
await asyncio.wait_for(
orchestrator.run_lifecycle(
gateway=gateway,
agent_id=agent.id,
board=board,
user=None,
action="update",
auth_token=None,
force_bootstrap=False,
reset_session=True,
wake=True,
deliver_wakeup=True,
wakeup_verb="updated",
clear_confirm_token=True,
raise_gateway_errors=True,
),
timeout=_RECONCILE_TIMEOUT_SECONDS,
)
logger.info(
"lifecycle.reconcile.retriggered",
extra={"agent_id": str(agent.id), "generation": payload.generation},
)

View File

@@ -110,27 +110,42 @@ def _heartbeat_config(agent: Agent) -> dict[str, Any]:
def _channel_heartbeat_visibility_patch(config_data: dict[str, Any]) -> dict[str, Any] | None:
"""Build a minimal patch ensuring channel default heartbeat visibility is configured.
Gateways may have existing channel config; we only want to fill missing keys rather than
overwrite operator intent. Returns `None` if no change is needed, otherwise returns a shallow
patch dict suitable for a config merge."""
channels = config_data.get("channels")
if not isinstance(channels, dict):
return {"defaults": {"heartbeat": DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY.copy()}}
defaults = channels.get("defaults")
if not isinstance(defaults, dict):
return {"defaults": {"heartbeat": DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY.copy()}}
heartbeat = defaults.get("heartbeat")
if not isinstance(heartbeat, dict):
return {"defaults": {"heartbeat": DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY.copy()}}
merged = dict(heartbeat)
changed = False
for key, value in DEFAULT_CHANNEL_HEARTBEAT_VISIBILITY.items():
if key not in merged:
merged[key] = value
changed = True
if not changed:
return None
return {"defaults": {"heartbeat": merged}}
def _template_env() -> Environment:
"""Create the Jinja environment used for gateway template rendering.
Note: we intentionally disable auto-escaping so markdown/plaintext templates render verbatim.
"""
return Environment(
loader=FileSystemLoader(_templates_root()),
# Render markdown verbatim (HTML escaping makes it harder for agents to read).
@@ -145,19 +160,34 @@ def _heartbeat_template_name(agent: Agent) -> str:
def _workspace_path(agent: Agent, workspace_root: str) -> str:
"""Return the absolute on-disk workspace directory for an agent.
Why this exists:
- We derive the folder name from a stable *agent key* (ultimately rooted in ids/session keys)
rather than display names to avoid collisions.
- We preserve a historical gateway-main naming quirk to avoid moving existing directories.
This path is later interpolated into template files (TOOLS.md, etc.) that agents treat as the
source of truth for where to read/write.
"""
if not workspace_root:
msg = "gateway_workspace_root is required"
raise ValueError(msg)
root = workspace_root.rstrip("/")
# Use agent key derived from session key when possible. This prevents collisions for
# lead agents (session key includes board id) even if multiple boards share the same
# display name (e.g. "Lead Agent").
key = _agent_key(agent)
# Backwards-compat: gateway-main agents historically used session keys that encoded
# "gateway-<id>" while the gateway agent id is "mc-gateway-<id>".
# Keep the on-disk workspace path stable so existing provisioned files aren't moved.
if key.startswith("mc-gateway-"):
key = key.removeprefix("mc-")
return f"{root}/workspace-{slugify(key)}"
@@ -333,6 +363,7 @@ def _build_context(
"board_goal_confirmed": str(board.goal_confirmed).lower(),
"board_rule_require_approval_for_done": str(board.require_approval_for_done).lower(),
"board_rule_require_review_before_done": str(board.require_review_before_done).lower(),
"board_rule_comment_required_for_review": str(board.comment_required_for_review).lower(),
"board_rule_block_status_changes_with_pending_approval": str(
board.block_status_changes_with_pending_approval
).lower(),

View File

@@ -52,8 +52,6 @@ from app.services.openclaw.constants import (
OFFLINE_AFTER,
)
from app.services.openclaw.db_agent_state import (
mark_provision_complete,
mark_provision_requested,
mint_agent_token,
)
from app.services.openclaw.db_service import OpenClawDBService
@@ -74,6 +72,7 @@ from app.services.openclaw.internal.session_keys import (
board_agent_session_key,
board_lead_session_key,
)
from app.services.openclaw.lifecycle_orchestrator import AgentLifecycleOrchestrator
from app.services.openclaw.policies import OpenClawAuthorizationPolicy
from app.services.openclaw.provisioning import (
OpenClawGatewayControlPlane,
@@ -143,7 +142,6 @@ class OpenClawProvisioningService(OpenClawDBService):
def __init__(self, session: AsyncSession) -> None:
super().__init__(session)
self._gateway = OpenClawGatewayProvisioner()
@staticmethod
def lead_session_key(board: Board) -> str:
@@ -213,25 +211,25 @@ class OpenClawProvisioningService(OpenClawDBService):
openclaw_session_id=self.lead_session_key(board),
)
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action=config_options.action, status="provisioning")
await self.add_commit_refresh(agent)
# Strict behavior: provisioning errors surface to the caller. The DB row exists
# so a later retry can succeed with the same deterministic identity/session key.
await self._gateway.apply_agent_lifecycle(
agent=agent,
agent = await AgentLifecycleOrchestrator(self.session).run_lifecycle(
gateway=request.gateway,
agent_id=agent.id,
board=board,
auth_token=raw_token,
user=request.user,
action=config_options.action,
auth_token=raw_token,
force_bootstrap=False,
reset_session=False,
wake=True,
deliver_wakeup=True,
wakeup_verb=None,
clear_confirm_token=False,
raise_gateway_errors=True,
)
mark_provision_complete(agent, status="online")
await self.add_commit_refresh(agent)
return agent, True
async def sync_gateway_templates(
@@ -298,7 +296,6 @@ class OpenClawProvisioningService(OpenClawDBService):
control_plane=control_plane,
backoff=GatewayBackoff(timeout_s=10 * 60, timeout_context="template sync"),
options=options,
provisioner=self._gateway,
)
if not await _ping_gateway(ctx, result):
return result
@@ -352,7 +349,6 @@ class _SyncContext:
control_plane: OpenClawGatewayControlPlane
backoff: GatewayBackoff
options: GatewayTemplateSyncOptions
provisioner: OpenClawGatewayProvisioner
def _parse_tools_md(content: str) -> dict[str, str]:
@@ -584,18 +580,26 @@ async def _sync_one_agent(
try:
async def _do_provision() -> bool:
await ctx.provisioner.apply_agent_lifecycle(
agent=agent,
gateway=ctx.gateway,
board=board,
auth_token=auth_token,
user=ctx.options.user,
action="update",
force_bootstrap=ctx.options.force_bootstrap,
overwrite=ctx.options.overwrite,
reset_session=ctx.options.reset_sessions,
wake=False,
)
try:
await AgentLifecycleOrchestrator(ctx.session).run_lifecycle(
gateway=ctx.gateway,
agent_id=agent.id,
board=board,
user=ctx.options.user,
action="update",
auth_token=auth_token,
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
wake=False,
deliver_wakeup=False,
wakeup_verb="updated",
clear_confirm_token=False,
raise_gateway_errors=True,
)
except HTTPException as exc:
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
raise OpenClawGatewayError(str(exc.detail)) from exc
raise
return True
await ctx.backoff.run(_do_provision)
@@ -613,6 +617,15 @@ async def _sync_one_agent(
message=f"Failed to sync templates: {exc}",
)
return False
except HTTPException as exc:
result.agents_skipped += 1
_append_sync_error(
result,
agent=agent,
board=board,
message=f"Failed to sync templates: {exc.detail}",
)
return False
else:
return False
@@ -655,18 +668,26 @@ async def _sync_main_agent(
try:
async def _do_provision_main() -> bool:
await ctx.provisioner.apply_agent_lifecycle(
agent=main_agent,
gateway=ctx.gateway,
board=None,
auth_token=token,
user=ctx.options.user,
action="update",
force_bootstrap=ctx.options.force_bootstrap,
overwrite=ctx.options.overwrite,
reset_session=ctx.options.reset_sessions,
wake=False,
)
try:
await AgentLifecycleOrchestrator(ctx.session).run_lifecycle(
gateway=ctx.gateway,
agent_id=main_agent.id,
board=None,
user=ctx.options.user,
action="update",
auth_token=token,
force_bootstrap=ctx.options.force_bootstrap,
reset_session=ctx.options.reset_sessions,
wake=False,
deliver_wakeup=False,
wakeup_verb="updated",
clear_confirm_token=False,
raise_gateway_errors=True,
)
except HTTPException as exc:
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
raise OpenClawGatewayError(str(exc.detail)) from exc
raise
return True
await ctx.backoff.run(_do_provision_main)
@@ -679,6 +700,12 @@ async def _sync_main_agent(
agent=main_agent,
message=f"Failed to sync gateway agent templates: {exc}",
)
except HTTPException as exc:
_append_sync_error(
result,
agent=main_agent,
message=f"Failed to sync gateway agent templates: {exc.detail}",
)
else:
result.main_updated = True
return stop_sync
@@ -1038,7 +1065,6 @@ class AgentLifecycleService(OpenClawDBService):
) -> tuple[Agent, str]:
agent = Agent.model_validate(data)
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="provision", status="provisioning")
agent.openclaw_session_id = self.resolve_session_key(agent)
await self.add_commit_refresh(agent)
return agent, raw_token
@@ -1068,92 +1094,63 @@ class AgentLifecycleService(OpenClawDBService):
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="board is required for non-main agent provisioning",
)
template_user = user
if target.is_main_agent and template_user is None:
template_user = await get_org_owner_user(
self.session,
organization_id=target.gateway.organization_id,
)
if template_user is None:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
detail=(
"User context is required to provision the gateway main agent "
"(org owner not found)."
),
)
await OpenClawGatewayProvisioner().apply_agent_lifecycle(
agent=agent,
provisioned = await AgentLifecycleOrchestrator(self.session).run_lifecycle(
gateway=target.gateway,
agent_id=agent.id,
board=target.board if not target.is_main_agent else None,
auth_token=auth_token,
user=template_user,
user=user,
action=action,
auth_token=auth_token,
force_bootstrap=force_bootstrap,
reset_session=True,
wake=True,
deliver_wakeup=True,
wakeup_verb=wakeup_verb,
clear_confirm_token=True,
raise_gateway_errors=raise_gateway_errors,
)
mark_provision_complete(agent, status="online", clear_confirm_token=True)
self.session.add(agent)
await self.session.commit()
record_activity(
self.session,
event_type=f"agent.{action}.direct",
message=f"{action.capitalize()}d directly for {agent.name}.",
agent_id=agent.id,
message=f"{action.capitalize()}d directly for {provisioned.name}.",
agent_id=provisioned.id,
)
record_activity(
self.session,
event_type="agent.wakeup.sent",
message=f"Wakeup message sent to {agent.name}.",
agent_id=agent.id,
message=f"Wakeup message sent to {provisioned.name}.",
agent_id=provisioned.id,
)
await self.session.commit()
self.logger.info(
"agent.provision.success action=%s agent_id=%s",
action,
agent.id,
provisioned.id,
)
except OpenClawGatewayError as exc:
except HTTPException as exc:
self.record_instruction_failure(
self.session,
agent,
str(exc),
str(exc.detail),
action,
)
await self.session.commit()
self.logger.error(
"agent.provision.gateway_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc),
)
if exc.status_code == status.HTTP_502_BAD_GATEWAY:
self.logger.error(
"agent.provision.gateway_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc.detail),
)
else:
self.logger.critical(
"agent.provision.runtime_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc.detail),
)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Gateway {action} failed: {exc}",
) from exc
except (OSError, RuntimeError, ValueError) as exc: # pragma: no cover
self.record_instruction_failure(
self.session,
agent,
str(exc),
action,
)
await self.session.commit()
self.logger.critical(
"agent.provision.runtime_error action=%s agent_id=%s error=%s",
action,
agent.id,
str(exc),
)
if raise_gateway_errors:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Unexpected error {action}ing agent provisioning.",
) from exc
raise
async def provision_new_agent(
self,
@@ -1315,7 +1312,6 @@ class AgentLifecycleService(OpenClawDBService):
@staticmethod
def mark_agent_update_pending(agent: Agent) -> str:
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="update", status="updating")
return raw_token
async def provision_updated_agent(
@@ -1390,7 +1386,6 @@ class AgentLifecycleService(OpenClawDBService):
return
raw_token = mint_agent_token(agent)
mark_provision_requested(agent, action="provision", status="provisioning")
await self.add_commit_refresh(agent)
board = await self.require_board(
str(agent.board_id) if agent.board_id else None,
@@ -1436,6 +1431,10 @@ class AgentLifecycleService(OpenClawDBService):
elif agent.status == "provisioning":
agent.status = "online"
agent.last_seen_at = utcnow()
# Successful check-in ends the current wake escalation cycle.
agent.wake_attempts = 0
agent.checkin_deadline_at = None
agent.last_provision_error = None
agent.updated_at = utcnow()
self.record_heartbeat(self.session, agent)
self.session.add(agent)

View File

@@ -150,6 +150,32 @@ def enqueue_task(
return False
def enqueue_task_with_delay(
task: QueuedTask,
queue_name: str,
*,
delay_seconds: float,
redis_url: str | None = None,
) -> bool:
"""Enqueue a task immediately or schedule it for delayed delivery."""
delay = max(0.0, float(delay_seconds))
if delay == 0:
return enqueue_task(task, queue_name, redis_url=redis_url)
try:
return _schedule_for_later(task, queue_name, delay, redis_url=redis_url)
except Exception as exc:
logger.warning(
"rq.queue.schedule_failed",
extra={
"task_type": task.task_type,
"queue_name": queue_name,
"delay_seconds": delay,
"error": str(exc),
},
)
return False
def _coerce_datetime(raw: object | None) -> datetime:
if raw is None:
return datetime.now(UTC)

View File

@@ -9,6 +9,11 @@ from dataclasses import dataclass
from app.core.config import settings
from app.core.logging import get_logger
from app.services.openclaw.lifecycle_queue import TASK_TYPE as LIFECYCLE_RECONCILE_TASK_TYPE
from app.services.openclaw.lifecycle_queue import (
requeue_lifecycle_queue_task,
)
from app.services.openclaw.lifecycle_reconcile import process_lifecycle_queue_task
from app.services.queue import QueuedTask, dequeue_task
from app.services.webhooks.dispatch import (
process_webhook_queue_task,
@@ -17,6 +22,7 @@ from app.services.webhooks.dispatch import (
from app.services.webhooks.queue import TASK_TYPE as WEBHOOK_TASK_TYPE
logger = get_logger(__name__)
_WORKER_BLOCK_TIMEOUT_SECONDS = 5.0
@dataclass(frozen=True)
@@ -27,6 +33,14 @@ class _TaskHandler:
_TASK_HANDLERS: dict[str, _TaskHandler] = {
LIFECYCLE_RECONCILE_TASK_TYPE: _TaskHandler(
handler=process_lifecycle_queue_task,
attempts_to_delay=lambda attempts: min(
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)),
settings.rq_dispatch_retry_max_seconds,
),
requeue=lambda task, delay: requeue_lifecycle_queue_task(task, delay_seconds=delay),
),
WEBHOOK_TASK_TYPE: _TaskHandler(
handler=process_webhook_queue_task,
attempts_to_delay=lambda attempts: min(
@@ -115,7 +129,8 @@ async def _run_worker_loop() -> None:
try:
await flush_queue(
block=True,
block_timeout=0,
# Keep a finite timeout so scheduled tasks are periodically drained.
block_timeout=_WORKER_BLOCK_TIMEOUT_SECONDS,
)
except Exception:
logger.exception(

View File

@@ -0,0 +1,45 @@
"""Add agent lifecycle metadata columns.
Revision ID: e3a1b2c4d5f6
Revises: b497b348ebb4
Create Date: 2026-02-24 00:00:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "e3a1b2c4d5f6"
down_revision = "b497b348ebb4"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Add lifecycle generation, wake tracking, and failure metadata."""
op.add_column(
"agents",
sa.Column("lifecycle_generation", sa.Integer(), nullable=False, server_default="0"),
)
op.add_column(
"agents",
sa.Column("wake_attempts", sa.Integer(), nullable=False, server_default="0"),
)
op.add_column("agents", sa.Column("last_wake_sent_at", sa.DateTime(), nullable=True))
op.add_column("agents", sa.Column("checkin_deadline_at", sa.DateTime(), nullable=True))
op.add_column("agents", sa.Column("last_provision_error", sa.Text(), nullable=True))
op.alter_column("agents", "lifecycle_generation", server_default=None)
op.alter_column("agents", "wake_attempts", server_default=None)
def downgrade() -> None:
"""Remove lifecycle generation, wake tracking, and failure metadata."""
op.drop_column("agents", "last_provision_error")
op.drop_column("agents", "checkin_deadline_at")
op.drop_column("agents", "last_wake_sent_at")
op.drop_column("agents", "wake_attempts")
op.drop_column("agents", "lifecycle_generation")

View File

@@ -0,0 +1,43 @@
"""add comment-required-for-review board rule
Revision ID: f1b2c3d4e5a6
Revises: e3a1b2c4d5f6
Create Date: 2026-02-25 00:00:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "f1b2c3d4e5a6"
down_revision = "e3a1b2c4d5f6"
branch_labels = None
depends_on = None
def upgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
board_columns = {column["name"] for column in inspector.get_columns("boards")}
if "comment_required_for_review" not in board_columns:
op.add_column(
"boards",
sa.Column(
"comment_required_for_review",
sa.Boolean(),
nullable=False,
server_default=sa.false(),
),
)
def downgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
board_columns = {column["name"] for column in inspector.get_columns("boards")}
if "comment_required_for_review" in board_columns:
op.drop_column("boards", "comment_required_for_review")

View File

@@ -35,12 +35,19 @@ curl -fsS "{{ base_url }}/healthz" >/dev/null
5) Ensure today's daily file exists: `memory/YYYY-MM-DD.md`.
{% if is_lead %}
6) Initialize current delivery status in `MEMORY.md`:
6) Immediately check in to Mission Control (do this before any task orchestration):
```bash
curl -s -X POST "{{ base_url }}/api/v1/agent/heartbeat" \
-H "X-Agent-Token: {{ auth_token }}"
```
7) Initialize current delivery status in `MEMORY.md`:
- set objective if missing
- set state to `Working` (or `Waiting` if external dependency exists)
- set one concrete next step
7) Add one line to `MEMORY.md` noting bootstrap completion date.
8) Add one line to `MEMORY.md` noting bootstrap completion date.
{% else %}
6) If any fields are blank, leave them blank. Do not invent values.

View File

@@ -35,12 +35,20 @@ jq -r '
## Schedule
- If a heartbeat schedule is configured, send a lightweight check-in only.
- On first cycle after wake/bootstrap, run heartbeat check-in immediately (do not wait for cadence).
- Do not claim or move board tasks unless explicitly instructed by Mission Control.
- If you have any pending `LEAD REQUEST: ASK USER` messages in OpenClaw chat, handle them promptly (see AGENTS.md).
## Heartbeat checklist
1) Check in:
1) Check in immediately:
- Use the `agent-main` heartbeat endpoint (`POST /api/v1/agent/heartbeat`).
- Startup check-in example:
```bash
curl -s -X POST "{{ base_url }}/api/v1/agent/heartbeat" \
-H "X-Agent-Token: {{ auth_token }}"
```
- If check-in fails due to 5xx/network, stop and retry next heartbeat.
- During that failure window, do **not** write memory updates (`MEMORY.md`, daily memory files).
@@ -117,6 +125,7 @@ jq -r '
## Schedule
- Heartbeat cadence is controlled by gateway heartbeat config.
- On first cycle after wake/bootstrap, run heartbeat check-in immediately (do not wait for cadence).
- Keep cadence conservative unless there is a clear latency need.
## Non-Negotiable Rules
@@ -153,6 +162,7 @@ Before execution:
### Board Rule Snapshot
- `require_review_before_done`: `{{ board_rule_require_review_before_done }}`
- `require_approval_for_done`: `{{ board_rule_require_approval_for_done }}`
- `comment_required_for_review`: `{{ board_rule_comment_required_for_review }}`
- `block_status_changes_with_pending_approval`: `{{ board_rule_block_status_changes_with_pending_approval }}`
- `only_lead_can_change_status`: `{{ board_rule_only_lead_can_change_status }}`
- `max_agents`: `{{ board_rule_max_agents }}`

View File

@@ -133,6 +133,7 @@ This avoids relying on startup hooks to populate `api/openapi.json`.
- `workspace_path`
- `board_rule_require_approval_for_done`
- `board_rule_require_review_before_done`
- `board_rule_comment_required_for_review`
- `board_rule_block_status_changes_with_pending_approval`
- `board_rule_only_lead_can_change_status`
- `board_rule_max_agents`

View File

@@ -69,11 +69,15 @@ async def test_update_board_notifies_agents_when_added_to_group(
async def _fake_notify(**_kwargs: Any) -> None:
calls["notify"] += 1
async def _fake_lead_notify(**_kwargs: Any) -> None:
return None
async def _fake_get_by_id(*_args: Any, **_kwargs: Any) -> BoardGroup:
return group
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_notify)
monkeypatch.setattr(boards, "_notify_lead_on_board_update", _fake_lead_notify)
monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id)
updated = await boards.update_board(
@@ -108,12 +112,16 @@ async def test_update_board_notifies_agents_when_removed_from_group(
async def _fake_leave(**_kwargs: Any) -> None:
calls["leave"] += 1
async def _fake_lead_notify(**_kwargs: Any) -> None:
return None
async def _fake_get_by_id(*_args: Any, **_kwargs: Any) -> BoardGroup:
return group
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_join)
monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_leave)
monkeypatch.setattr(boards, "_notify_lead_on_board_update", _fake_lead_notify)
monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id)
updated = await boards.update_board(
@@ -151,6 +159,9 @@ async def test_update_board_notifies_agents_when_moved_between_groups(
async def _fake_leave(**_kwargs: Any) -> None:
calls["leave"] += 1
async def _fake_lead_notify(**_kwargs: Any) -> None:
return None
async def _fake_get_by_id(_session: Any, _model: Any, obj_id: UUID) -> BoardGroup | None:
if obj_id == old_group_id:
return old_group
@@ -161,6 +172,7 @@ async def test_update_board_notifies_agents_when_moved_between_groups(
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_join)
monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_leave)
monkeypatch.setattr(boards, "_notify_lead_on_board_update", _fake_lead_notify)
monkeypatch.setattr(boards.crud, "get_by_id", _fake_get_by_id)
updated = await boards.update_board(
@@ -192,9 +204,13 @@ async def test_update_board_does_not_notify_when_group_unchanged(
async def _fake_notify(**_kwargs: Any) -> None:
calls["notify"] += 1
async def _fake_lead_notify(**_kwargs: Any) -> None:
return None
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
monkeypatch.setattr(boards, "_notify_agents_on_board_group_addition", _fake_notify)
monkeypatch.setattr(boards, "_notify_agents_on_board_group_removal", _fake_notify)
monkeypatch.setattr(boards, "_notify_lead_on_board_update", _fake_lead_notify)
updated = await boards.update_board(
payload=payload,
@@ -206,6 +222,66 @@ async def test_update_board_does_not_notify_when_group_unchanged(
assert calls["notify"] == 0
@pytest.mark.asyncio
async def test_update_board_notifies_lead_when_fields_change(
monkeypatch: pytest.MonkeyPatch,
) -> None:
board = _board(board_group_id=None)
session = _FakeSession()
payload = BoardUpdate(name="Platform X")
calls: dict[str, object] = {"count": 0, "changes": {}}
async def _fake_apply_board_update(**kwargs: Any) -> Board:
target: Board = kwargs["board"]
target.name = "Platform X"
return target
async def _fake_lead_notify(**kwargs: Any) -> None:
calls["count"] = int(calls["count"]) + 1
calls["changes"] = kwargs["changed_fields"]
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
monkeypatch.setattr(boards, "_notify_lead_on_board_update", _fake_lead_notify)
updated = await boards.update_board(
payload=payload,
session=session, # type: ignore[arg-type]
board=board,
)
assert updated.name == "Platform X"
assert calls["count"] == 1
assert calls["changes"] == {"name": ("Platform", "Platform X")}
@pytest.mark.asyncio
async def test_update_board_skips_lead_notify_when_no_effective_change(
monkeypatch: pytest.MonkeyPatch,
) -> None:
board = _board(board_group_id=None)
session = _FakeSession()
payload = BoardUpdate(name="Platform")
calls = {"lead_notify": 0}
async def _fake_apply_board_update(**kwargs: Any) -> Board:
return kwargs["board"]
async def _fake_lead_notify(**_kwargs: Any) -> None:
calls["lead_notify"] += 1
monkeypatch.setattr(boards, "_apply_board_update", _fake_apply_board_update)
monkeypatch.setattr(boards, "_notify_lead_on_board_update", _fake_lead_notify)
updated = await boards.update_board(
payload=payload,
session=session, # type: ignore[arg-type]
board=board,
)
assert updated.name == "Platform"
assert calls["lead_notify"] == 0
@pytest.mark.asyncio
async def test_notify_agents_on_board_group_addition_fanout_and_records_results(
monkeypatch: pytest.MonkeyPatch,

View File

@@ -86,6 +86,7 @@ def test_board_rule_toggles_have_expected_defaults() -> None:
)
assert created.require_approval_for_done is True
assert created.require_review_before_done is False
assert created.comment_required_for_review is False
assert created.block_status_changes_with_pending_approval is False
assert created.only_lead_can_change_status is False
assert created.max_agents == 1
@@ -93,12 +94,14 @@ def test_board_rule_toggles_have_expected_defaults() -> None:
updated = BoardUpdate(
require_approval_for_done=False,
require_review_before_done=True,
comment_required_for_review=True,
block_status_changes_with_pending_approval=True,
only_lead_can_change_status=True,
max_agents=3,
)
assert updated.require_approval_for_done is False
assert updated.require_review_before_done is True
assert updated.comment_required_for_review is True
assert updated.block_status_changes_with_pending_approval is True
assert updated.only_lead_can_change_status is True
assert updated.max_agents == 3

View File

@@ -0,0 +1,126 @@
# ruff: noqa: INP001
"""Queue payload helpers for lifecycle reconcile tasks."""
from __future__ import annotations
from datetime import timedelta
from uuid import uuid4
import pytest
from app.core.time import utcnow
from app.services.openclaw.lifecycle_queue import (
QueuedAgentLifecycleReconcile,
decode_lifecycle_task,
defer_lifecycle_reconcile,
enqueue_lifecycle_reconcile,
)
from app.services.queue import QueuedTask
def test_enqueue_lifecycle_reconcile_uses_delayed_enqueue(
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}
def _fake_enqueue_with_delay(
task: QueuedTask,
queue_name: str,
*,
delay_seconds: float,
redis_url: str | None = None,
) -> bool:
captured["task"] = task
captured["queue_name"] = queue_name
captured["delay_seconds"] = delay_seconds
captured["redis_url"] = redis_url
return True
monkeypatch.setattr(
"app.services.openclaw.lifecycle_queue.enqueue_task_with_delay",
_fake_enqueue_with_delay,
)
payload = QueuedAgentLifecycleReconcile(
agent_id=uuid4(),
gateway_id=uuid4(),
board_id=uuid4(),
generation=7,
checkin_deadline_at=utcnow() + timedelta(seconds=30),
attempts=0,
)
assert enqueue_lifecycle_reconcile(payload) is True
task = captured["task"]
assert isinstance(task, QueuedTask)
assert task.task_type == "agent_lifecycle_reconcile"
assert float(captured["delay_seconds"]) > 0
def test_defer_lifecycle_reconcile_keeps_attempt_count(
monkeypatch: pytest.MonkeyPatch,
) -> None:
captured: dict[str, object] = {}
def _fake_enqueue_with_delay(
task: QueuedTask,
queue_name: str,
*,
delay_seconds: float,
redis_url: str | None = None,
) -> bool:
captured["task"] = task
captured["queue_name"] = queue_name
captured["delay_seconds"] = delay_seconds
captured["redis_url"] = redis_url
return True
monkeypatch.setattr(
"app.services.openclaw.lifecycle_queue.enqueue_task_with_delay",
_fake_enqueue_with_delay,
)
deadline = utcnow() + timedelta(minutes=1)
task = QueuedTask(
task_type="agent_lifecycle_reconcile",
payload={
"agent_id": str(uuid4()),
"gateway_id": str(uuid4()),
"board_id": None,
"generation": 3,
"checkin_deadline_at": deadline.isoformat(),
},
created_at=utcnow(),
attempts=2,
)
assert defer_lifecycle_reconcile(task, delay_seconds=12) is True
deferred_task = captured["task"]
assert isinstance(deferred_task, QueuedTask)
assert deferred_task.attempts == 2
assert float(captured["delay_seconds"]) == 12
def test_decode_lifecycle_task_roundtrip() -> None:
deadline = utcnow() + timedelta(minutes=3)
agent_id = uuid4()
gateway_id = uuid4()
board_id = uuid4()
task = QueuedTask(
task_type="agent_lifecycle_reconcile",
payload={
"agent_id": str(agent_id),
"gateway_id": str(gateway_id),
"board_id": str(board_id),
"generation": 5,
"checkin_deadline_at": deadline.isoformat(),
},
created_at=utcnow(),
attempts=1,
)
decoded = decode_lifecycle_task(task)
assert decoded.agent_id == agent_id
assert decoded.gateway_id == gateway_id
assert decoded.board_id == board_id
assert decoded.generation == 5
assert decoded.checkin_deadline_at == deadline
assert decoded.attempts == 1

View File

@@ -0,0 +1,53 @@
# ruff: noqa: INP001
"""Lifecycle reconcile state helpers."""
from __future__ import annotations
from datetime import timedelta
from uuid import uuid4
from app.core.time import utcnow
from app.models.agents import Agent
from app.services.openclaw.constants import (
CHECKIN_DEADLINE_AFTER_WAKE,
MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN,
)
from app.services.openclaw.lifecycle_reconcile import _has_checked_in_since_wake
def _agent(*, last_seen_offset_s: int | None, last_wake_offset_s: int | None) -> Agent:
now = utcnow()
return Agent(
name="reconcile-test",
gateway_id=uuid4(),
last_seen_at=(
(now + timedelta(seconds=last_seen_offset_s))
if last_seen_offset_s is not None
else None
),
last_wake_sent_at=(
(now + timedelta(seconds=last_wake_offset_s))
if last_wake_offset_s is not None
else None
),
)
def test_checked_in_since_wake_when_last_seen_after_wake() -> None:
agent = _agent(last_seen_offset_s=5, last_wake_offset_s=0)
assert _has_checked_in_since_wake(agent) is True
def test_not_checked_in_since_wake_when_last_seen_before_wake() -> None:
agent = _agent(last_seen_offset_s=-5, last_wake_offset_s=0)
assert _has_checked_in_since_wake(agent) is False
def test_not_checked_in_since_wake_when_missing_last_seen() -> None:
agent = _agent(last_seen_offset_s=None, last_wake_offset_s=0)
assert _has_checked_in_since_wake(agent) is False
def test_lifecycle_convergence_policy_constants() -> None:
assert CHECKIN_DEADLINE_AFTER_WAKE == timedelta(seconds=30)
assert MAX_WAKE_ATTEMPTS_WITHOUT_CHECKIN == 3

View File

@@ -0,0 +1,11 @@
# ruff: noqa: INP001
"""Queue worker registration tests for lifecycle reconcile tasks."""
from __future__ import annotations
from app.services.openclaw.lifecycle_queue import TASK_TYPE as LIFECYCLE_TASK_TYPE
from app.services.queue_worker import _TASK_HANDLERS
def test_worker_registers_lifecycle_reconcile_handler() -> None:
assert LIFECYCLE_TASK_TYPE in _TASK_HANDLERS

View File

@@ -0,0 +1,147 @@
from __future__ import annotations
import pytest
from fastapi import FastAPI, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.testclient import TestClient
from app.core.security_headers import SecurityHeadersMiddleware
@pytest.mark.asyncio
async def test_security_headers_middleware_passes_through_non_http_scope() -> None:
called = False
async def app(scope, receive, send): # type: ignore[no-untyped-def]
_ = receive
_ = send
nonlocal called
called = scope["type"] == "websocket"
middleware = SecurityHeadersMiddleware(app, x_frame_options="SAMEORIGIN")
await middleware({"type": "websocket", "headers": []}, lambda: None, lambda _: None)
assert called is True
@pytest.mark.asyncio
async def test_security_headers_middleware_appends_lowercase_raw_header_names() -> None:
sent_messages: list[dict[str, object]] = []
async def app(scope, receive, send): # type: ignore[no-untyped-def]
_ = scope
_ = receive
await send({"type": "http.response.start", "status": 200, "headers": []})
await send({"type": "http.response.body", "body": b"", "more_body": False})
async def capture(message): # type: ignore[no-untyped-def]
sent_messages.append(message)
middleware = SecurityHeadersMiddleware(app, x_frame_options="SAMEORIGIN")
await middleware(
{"type": "http", "method": "GET", "path": "/", "headers": []}, lambda: None, capture
)
response_start = next(
message for message in sent_messages if message.get("type") == "http.response.start"
)
headers = response_start.get("headers")
assert isinstance(headers, list)
header_names = {name for name, _value in headers}
assert b"x-frame-options" in header_names
assert b"X-Frame-Options" not in header_names
def test_security_headers_middleware_injects_configured_headers() -> None:
app = FastAPI()
app.add_middleware(
SecurityHeadersMiddleware,
x_content_type_options="nosniff",
x_frame_options="SAMEORIGIN",
referrer_policy="strict-origin-when-cross-origin",
permissions_policy="camera=(), microphone=(), geolocation=()",
)
@app.get("/ok")
def ok() -> dict[str, bool]:
return {"ok": True}
response = TestClient(app).get("/ok")
assert response.status_code == 200
assert response.headers["x-content-type-options"] == "nosniff"
assert response.headers["x-frame-options"] == "SAMEORIGIN"
assert response.headers["referrer-policy"] == "strict-origin-when-cross-origin"
assert response.headers["permissions-policy"] == "camera=(), microphone=(), geolocation=()"
def test_security_headers_middleware_does_not_override_existing_values() -> None:
app = FastAPI()
app.add_middleware(
SecurityHeadersMiddleware,
x_content_type_options="nosniff",
x_frame_options="SAMEORIGIN",
referrer_policy="strict-origin-when-cross-origin",
permissions_policy="camera=(), microphone=(), geolocation=()",
)
@app.get("/already-set")
def already_set(response: Response) -> dict[str, bool]:
response.headers["X-Frame-Options"] = "ALLOWALL"
response.headers["Referrer-Policy"] = "unsafe-url"
return {"ok": True}
response = TestClient(app).get("/already-set")
assert response.status_code == 200
assert response.headers["x-content-type-options"] == "nosniff"
assert response.headers["x-frame-options"] == "ALLOWALL"
assert response.headers["referrer-policy"] == "unsafe-url"
assert response.headers["permissions-policy"] == "camera=(), microphone=(), geolocation=()"
def test_security_headers_middleware_includes_headers_on_cors_preflight() -> None:
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
SecurityHeadersMiddleware,
x_content_type_options="nosniff",
)
@app.get("/ok")
def ok() -> dict[str, bool]:
return {"ok": True}
response = TestClient(app).options(
"/ok",
headers={
"Origin": "https://example.com",
"Access-Control-Request-Method": "GET",
},
)
assert response.status_code == 200
assert response.headers["x-content-type-options"] == "nosniff"
def test_security_headers_middleware_skips_blank_config_values() -> None:
app = FastAPI()
app.add_middleware(SecurityHeadersMiddleware)
@app.get("/ok")
def ok() -> dict[str, bool]:
return {"ok": True}
response = TestClient(app).get("/ok")
assert response.status_code == 200
assert response.headers.get("x-content-type-options") is None
assert response.headers.get("x-frame-options") is None
assert response.headers.get("referrer-policy") is None
assert response.headers.get("permissions-policy") is None

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from typing import Any
from uuid import uuid4
import pytest
@@ -11,6 +12,7 @@ from sqlmodel.ext.asyncio.session import AsyncSession
from app.api import tasks as tasks_api
from app.api.deps import ActorContext
from app.core.time import utcnow
from app.models.activity_events import ActivityEvent
from app.models.agents import Agent
from app.models.boards import Board
from app.models.gateways import Gateway
@@ -326,7 +328,7 @@ async def test_non_lead_agent_forbidden_for_lead_only_patch_fields() -> None:
@pytest.mark.asyncio
async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
async def test_non_lead_agent_moves_task_to_review_and_reassigns_to_lead() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
@@ -334,6 +336,7 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
board_id = uuid4()
gateway_id = uuid4()
worker_id = uuid4()
lead_id = uuid4()
task_id = uuid4()
in_progress_at = utcnow()
@@ -365,6 +368,16 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
status="online",
),
)
session.add(
Agent(
id=lead_id,
name="Lead Agent",
board_id=board_id,
gateway_id=gateway_id,
status="online",
is_board_lead=True,
),
)
session.add(
Task(
id=task_id,
@@ -391,7 +404,7 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
)
assert updated.status == "review"
assert updated.assigned_agent_id is None
assert updated.assigned_agent_id == lead_id
assert updated.in_progress_at is None
refreshed_task = (
@@ -399,6 +412,268 @@ async def test_non_lead_agent_moves_task_to_review_and_task_unassigns() -> None:
).first()
assert refreshed_task is not None
assert refreshed_task.previous_in_progress_at == in_progress_at
assert refreshed_task.assigned_agent_id == lead_id
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_non_lead_agent_move_to_review_reassigns_to_lead_and_sends_review_message(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
worker_id = uuid4()
lead_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
),
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
),
)
session.add(
Agent(
id=worker_id,
name="worker",
board_id=board_id,
gateway_id=gateway_id,
status="online",
),
)
session.add(
Agent(
id=lead_id,
name="Lead Agent",
board_id=board_id,
gateway_id=gateway_id,
status="online",
is_board_lead=True,
openclaw_session_id="lead-session",
),
)
session.add(
Task(
id=task_id,
board_id=board_id,
title="assigned task",
description="done and ready",
status="in_progress",
assigned_agent_id=worker_id,
in_progress_at=utcnow(),
),
)
await session.commit()
sent: dict[str, str] = {}
class _FakeDispatch:
def __init__(self, _session: AsyncSession) -> None:
pass
async def optional_gateway_config_for_board(self, _board: Board) -> object:
return object()
async def _fake_send_agent_task_message(
*,
dispatch: Any,
session_key: str,
config: Any,
agent_name: str,
message: str,
) -> None:
_ = dispatch, config
sent["session_key"] = session_key
sent["agent_name"] = agent_name
sent["message"] = message
return None
monkeypatch.setattr(tasks_api, "GatewayDispatchService", _FakeDispatch)
monkeypatch.setattr(
tasks_api, "_send_agent_task_message", _fake_send_agent_task_message
)
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
assert task is not None
actor = (await session.exec(select(Agent).where(col(Agent.id) == worker_id))).first()
assert actor is not None
updated = await tasks_api.update_task(
payload=TaskUpdate(status="review", comment="Moving to review."),
task=task,
session=session,
actor=ActorContext(actor_type="agent", agent=actor),
)
assert updated.status == "review"
assert updated.assigned_agent_id == lead_id
assert sent["session_key"] == "lead-session"
assert sent["agent_name"] == "Lead Agent"
assert "TASK READY FOR LEAD REVIEW" in sent["message"]
assert "review the deliverables" in sent["message"]
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_lead_moves_review_task_to_inbox_and_reassigns_last_worker_with_rework_message(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
worker_id = uuid4()
lead_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
),
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
),
)
session.add(
Agent(
id=worker_id,
name="worker",
board_id=board_id,
gateway_id=gateway_id,
status="online",
openclaw_session_id="worker-session",
),
)
session.add(
Agent(
id=lead_id,
name="Lead Agent",
board_id=board_id,
gateway_id=gateway_id,
status="online",
is_board_lead=True,
openclaw_session_id="lead-session",
),
)
session.add(
Task(
id=task_id,
board_id=board_id,
title="assigned task",
description="ready",
status="in_progress",
assigned_agent_id=worker_id,
in_progress_at=utcnow(),
),
)
await session.commit()
sent: list[dict[str, str]] = []
class _FakeDispatch:
def __init__(self, _session: AsyncSession) -> None:
pass
async def optional_gateway_config_for_board(self, _board: Board) -> object:
return object()
async def _fake_send_agent_task_message(
*,
dispatch: Any,
session_key: str,
config: Any,
agent_name: str,
message: str,
) -> None:
_ = dispatch, config
sent.append(
{
"session_key": session_key,
"agent_name": agent_name,
"message": message,
},
)
return None
monkeypatch.setattr(tasks_api, "GatewayDispatchService", _FakeDispatch)
monkeypatch.setattr(
tasks_api, "_send_agent_task_message", _fake_send_agent_task_message
)
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
assert task is not None
worker = (await session.exec(select(Agent).where(col(Agent.id) == worker_id))).first()
assert worker is not None
lead = (await session.exec(select(Agent).where(col(Agent.id) == lead_id))).first()
assert lead is not None
moved_to_review = await tasks_api.update_task(
payload=TaskUpdate(status="review", comment="Ready for review."),
task=task,
session=session,
actor=ActorContext(actor_type="agent", agent=worker),
)
assert moved_to_review.status == "review"
assert moved_to_review.assigned_agent_id == lead_id
session.add(
ActivityEvent(
event_type="task.comment",
task_id=task_id,
agent_id=lead_id,
message="Please update error handling and add tests for edge cases.",
),
)
await session.commit()
review_task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
assert review_task is not None
reverted = await tasks_api.update_task(
payload=TaskUpdate(status="inbox"),
task=review_task,
session=session,
actor=ActorContext(actor_type="agent", agent=lead),
)
assert reverted.status == "inbox"
assert reverted.assigned_agent_id == worker_id
worker_messages = [item for item in sent if item["session_key"] == "worker-session"]
assert worker_messages
final_message = worker_messages[-1]["message"]
assert "CHANGES REQUESTED" in final_message
assert "Please update error handling and add tests for edge cases." in final_message
finally:
await engine.dispose()
@@ -485,7 +760,91 @@ async def test_non_lead_agent_comment_in_review_without_status_does_not_reassign
@pytest.mark.asyncio
async def test_non_lead_agent_moves_to_review_without_comment_or_recent_comment_fails() -> None:
async def test_non_lead_agent_moves_to_review_without_comment_when_rule_disabled() -> None:
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
org_id = uuid4()
board_id = uuid4()
gateway_id = uuid4()
worker_id = uuid4()
lead_id = uuid4()
task_id = uuid4()
session.add(Organization(id=org_id, name="org"))
session.add(
Gateway(
id=gateway_id,
organization_id=org_id,
name="gateway",
url="https://gateway.local",
workspace_root="/tmp/workspace",
),
)
session.add(
Board(
id=board_id,
organization_id=org_id,
name="board",
slug="board",
gateway_id=gateway_id,
comment_required_for_review=False,
),
)
session.add(
Agent(
id=worker_id,
name="worker",
board_id=board_id,
gateway_id=gateway_id,
status="online",
),
)
session.add(
Agent(
id=lead_id,
name="Lead Agent",
board_id=board_id,
gateway_id=gateway_id,
status="online",
is_board_lead=True,
),
)
session.add(
Task(
id=task_id,
board_id=board_id,
title="assigned task",
description="",
status="in_progress",
assigned_agent_id=worker_id,
in_progress_at=utcnow(),
),
)
await session.commit()
task = (await session.exec(select(Task).where(col(Task.id) == task_id))).first()
assert task is not None
actor = (await session.exec(select(Agent).where(col(Agent.id) == worker_id))).first()
assert actor is not None
updated = await tasks_api.update_task(
payload=TaskUpdate(status="review"),
task=task,
session=session,
actor=ActorContext(actor_type="agent", agent=actor),
)
assert updated.status == "review"
assert updated.assigned_agent_id == lead_id
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_non_lead_agent_moves_to_review_without_comment_or_recent_comment_fails_when_rule_enabled() -> (
None
):
engine = await _make_engine()
try:
async with await _make_session(engine) as session:
@@ -512,6 +871,7 @@ async def test_non_lead_agent_moves_to_review_without_comment_or_recent_comment_
name="board",
slug="board",
gateway_id=gateway_id,
comment_required_for_review=True,
),
)
session.add(

View File

@@ -21,6 +21,11 @@ services:
image: redis:7-alpine
ports:
- "${REDIS_PORT:-6379}:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
backend:
build:
@@ -42,7 +47,7 @@ services:
db:
condition: service_healthy
redis:
condition: service_started
condition: service_healthy
ports:
- "${BACKEND_PORT:-8000}:8000"
@@ -75,7 +80,7 @@ services:
- ./backend/.env.example
depends_on:
redis:
condition: service_started
condition: service_healthy
db:
condition: service_healthy
environment:

View File

@@ -10,6 +10,7 @@ This folder is the starting point for Mission Control documentation.
- [Deployment](./deployment/README.md)
- [Production notes](./production/README.md)
- [Troubleshooting](./troubleshooting/README.md)
- [Gateway agent provisioning and check-in troubleshooting](./troubleshooting/gateway-agent-provisioning.md)
- [Gateway WebSocket protocol](./openclaw_gateway_ws.md)
- [OpenClaw baseline configuration](./openclaw_baseline_config.md)

View File

@@ -1,3 +1,3 @@
# Troubleshooting
Placeholder.
- [Gateway agent provisioning and check-in](./gateway-agent-provisioning.md)

View File

@@ -0,0 +1,106 @@
# Gateway Agent Provisioning and Check-In Troubleshooting
This guide explains how agent provisioning converges to a healthy state, and how to debug when an agent appears stuck.
## Fast Convergence Policy
Mission Control now uses a fast convergence policy for wake/check-in:
- Check-in deadline after each wake: **30 seconds**
- Maximum wake attempts without check-in: **3**
- If no check-in after the third attempt: agent is marked **offline** and provisioning escalation stops
This applies to both gateway-main and board agents.
## Expected Lifecycle
1. Mission Control provisions/updates the agent and sends wake.
2. A delayed reconcile task is queued for the check-in deadline.
3. Agent should call heartbeat quickly after startup/bootstrap.
4. If heartbeat arrives:
- `last_seen_at` is updated
- wake escalation state is reset (`wake_attempts=0`, check-in deadline cleared)
5. If heartbeat does not arrive by deadline:
- reconcile re-runs lifecycle (wake again)
- up to 3 total wake attempts
6. If still no heartbeat after 3 attempts:
- agent status becomes `offline`
- `last_provision_error` is set
## Startup Check-In Behavior
Templates now explicitly require immediate first-cycle check-in:
- Main agent heartbeat instructions require immediate check-in after wake/bootstrap.
- Board lead bootstrap requires heartbeat check-in before orchestration.
- Board worker bootstrap already included immediate check-in.
If a gateway still has older templates, run template sync and reprovision/wake.
## What You Should See in Logs
Healthy flow usually includes:
- `lifecycle.queue.enqueued`
- `queue.worker.success` (for lifecycle tasks)
- `lifecycle.reconcile.skip_not_stuck` (after heartbeat lands)
If agent is not checking in:
- `lifecycle.reconcile.deferred` (before deadline)
- `lifecycle.reconcile.retriggered` (retry wake)
- `lifecycle.reconcile.max_attempts_reached` (final fail-safe at attempt 3)
If you do not see lifecycle events at all, verify queue worker health first.
## Common Failure Modes
### Wake was sent, but no check-in arrived
Possible causes:
- Agent process never started or crashed during bootstrap
- Agent ignored startup instructions due to stale templates
- Heartbeat call failed (network/auth/base URL mismatch)
Actions:
1. Confirm current templates were synced to gateway.
2. Re-run provisioning/update to trigger a fresh wake.
3. Verify agent can reach Mission Control API and send heartbeat with `X-Agent-Token`.
### Agent stays provisioning/updating with no retries
Possible causes:
- Queue worker not running
- Queue/Redis mismatch between API process and worker process
Actions:
1. Verify worker process is running continuously.
2. Verify `rq_redis_url` and `rq_queue_name` are identical for API and worker.
3. Check worker logs for dequeue/handler errors.
### Agent ended offline quickly
This is expected when no check-in is received after 3 wake attempts. The system fails fast by design.
Actions:
1. Fix check-in path first (startup, network, token, API reachability).
2. Re-run provisioning/update to start a new attempt cycle.
## Operator Recovery Checklist
1. Ensure queue worker is running.
2. Sync templates for the gateway.
3. Trigger agent update/provision from Mission Control.
4. Watch logs for:
- `lifecycle.queue.enqueued`
- `lifecycle.reconcile.retriggered` (if needed)
- heartbeat activity / `skip_not_stuck`
5. If still failing, capture:
- gateway logs around bootstrap
- worker logs around lifecycle events
- agent `last_provision_error`, `wake_attempts`, `last_seen_at`

View File

@@ -1,28 +1,14 @@
import { defineConfig } from "cypress";
import { clerkSetup } from "@clerk/testing/cypress";
export default defineConfig({
env: {
NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY:
process.env.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY,
// Optional overrides.
CLERK_ORIGIN: process.env.CYPRESS_CLERK_ORIGIN,
CLERK_TEST_EMAIL: process.env.CYPRESS_CLERK_TEST_EMAIL,
CLERK_TEST_OTP: process.env.CYPRESS_CLERK_TEST_OTP,
},
e2e: {
baseUrl: "http://localhost:3000",
specPattern: "cypress/e2e/**/*.cy.{js,jsx,ts,tsx}",
supportFile: "cypress/support/e2e.ts",
// Clerk helpers perform async work inside `cy.then()`. CI can be slow enough
// that Cypress' 4s default command timeout flakes.
defaultCommandTimeout: 20_000,
retries: {
runMode: 2,
openMode: 0,
},
setupNodeEvents(on, config) {
return clerkSetup({ config });
},
},
});

View File

@@ -1,22 +1,11 @@
/// <reference types="cypress" />
// Clerk/Next.js occasionally triggers a hydration mismatch on the SignIn route in CI.
// This is non-deterministic UI noise for these tests; ignore it so assertions can proceed.
Cypress.on("uncaught:exception", (err) => {
if (err.message?.includes("Hydration failed")) {
return false;
}
return true;
});
describe("/activity feed", () => {
const apiBase = "**/api/v1";
const email = Cypress.env("CLERK_TEST_EMAIL") || "jane+clerk_test@example.com";
const originalDefaultCommandTimeout = Cypress.config("defaultCommandTimeout");
beforeEach(() => {
// Clerk's Cypress helpers perform async work inside `cy.then()`.
// CI can be slow enough that the default 4s command timeout flakes.
Cypress.config("defaultCommandTimeout", 20_000);
});
@@ -49,6 +38,30 @@ describe("/activity feed", () => {
function stubBoardBootstrap() {
// Some app bootstraps happen before we get to the /activity call.
// Keep these stable so the page always reaches the activity request.
cy.intercept("GET", `${apiBase}/users/me*`, {
statusCode: 200,
body: {
id: "u1",
clerk_user_id: "local-auth-user",
email: "local@example.com",
name: "Local User",
preferred_name: "Local User",
timezone: "UTC",
},
}).as("usersMe");
cy.intercept("GET", `${apiBase}/organizations/me/list*`, {
statusCode: 200,
body: [
{
id: "org1",
name: "Testing Org",
is_active: true,
role: "owner",
},
],
}).as("orgsList");
cy.intercept("GET", `${apiBase}/organizations/me/member*`, {
statusCode: 200,
body: { organization_id: "org1", role: "owner" },
@@ -77,10 +90,11 @@ describe("/activity feed", () => {
cy.contains(/live feed/i).should("be.visible");
}
it("auth negative: signed-out user is redirected to sign-in", () => {
// SignedOutPanel runs in redirect mode on this page.
it("auth negative: signed-out user sees auth prompt", () => {
cy.visit("/activity");
cy.location("pathname", { timeout: 20_000 }).should("match", /\/sign-in/);
cy.contains(/sign in to view the feed|local authentication/i, {
timeout: 20_000,
}).should("be.visible");
});
it("happy path: renders task comment cards", () => {
@@ -107,10 +121,7 @@ describe("/activity feed", () => {
stubStreamsEmpty();
cy.visit("/sign-in");
cy.clerkLoaded();
cy.clerkSignIn({ strategy: "email_code", identifier: email });
cy.loginWithLocalAuth();
cy.visit("/activity");
assertSignedInAndLanded();
cy.wait("@activityList", { timeout: 20_000 });
@@ -131,10 +142,7 @@ describe("/activity feed", () => {
stubStreamsEmpty();
cy.visit("/sign-in");
cy.clerkLoaded();
cy.clerkSignIn({ strategy: "email_code", identifier: email });
cy.loginWithLocalAuth();
cy.visit("/activity");
assertSignedInAndLanded();
cy.wait("@activityList", { timeout: 20_000 });
@@ -152,10 +160,7 @@ describe("/activity feed", () => {
stubStreamsEmpty();
cy.visit("/sign-in");
cy.clerkLoaded();
cy.clerkSignIn({ strategy: "email_code", identifier: email });
cy.loginWithLocalAuth();
cy.visit("/activity");
assertSignedInAndLanded();
cy.wait("@activityList", { timeout: 20_000 });

View File

@@ -1,6 +1,8 @@
describe("/activity page", () => {
it("signed-out user is redirected to sign-in", () => {
it("signed-out user sees an auth prompt", () => {
cy.visit("/activity");
cy.location("pathname", { timeout: 20_000 }).should("match", /\/sign-in/);
cy.contains(/local authentication|sign in to mission control/i, {
timeout: 20_000,
}).should("be.visible");
});
});

View File

@@ -0,0 +1,304 @@
/// <reference types="cypress" />
describe("/boards/:id task board", () => {
const apiBase = "**/api/v1";
const email = "local-auth-user@example.com";
const originalDefaultCommandTimeout = Cypress.config("defaultCommandTimeout");
beforeEach(() => {
Cypress.config("defaultCommandTimeout", 20_000);
});
afterEach(() => {
Cypress.config("defaultCommandTimeout", originalDefaultCommandTimeout);
});
function stubEmptySse() {
// Keep known board-related SSE endpoints quiet in tests.
const emptySse = {
statusCode: 200,
headers: { "content-type": "text/event-stream" },
body: "",
};
cy.intercept("GET", `${apiBase}/boards/*/tasks/stream*`, emptySse).as(
"tasksStream",
);
cy.intercept("GET", `${apiBase}/boards/*/approvals/stream*`, emptySse).as(
"approvalsStream",
);
cy.intercept("GET", `${apiBase}/boards/*/memory/stream*`, emptySse).as(
"memoryStream",
);
cy.intercept("GET", `${apiBase}/agents/stream*`, emptySse).as("agentsStream");
}
function openEditTaskDialog() {
cy.get('button[title="Edit task"]', { timeout: 20_000 })
.should("be.visible")
.and("not.be.disabled")
.click();
cy.get('[aria-label="Edit task"]', { timeout: 20_000 }).should("be.visible");
}
it("auth negative: signed-out user is shown local auth login", () => {
cy.visit("/boards/b1");
cy.contains("h1", /local authentication/i, { timeout: 30_000 }).should(
"be.visible",
);
});
it("happy path: renders tasks from snapshot and supports create + status update + delete (stubbed)", () => {
stubEmptySse();
cy.intercept("GET", `${apiBase}/organizations/me/member*`, {
statusCode: 200,
body: {
id: "m1",
organization_id: "o1",
user_id: "u1",
role: "owner",
all_boards_read: true,
all_boards_write: true,
created_at: "2026-02-11T00:00:00Z",
updated_at: "2026-02-11T00:00:00Z",
board_access: [{ board_id: "b1", can_read: true, can_write: true }],
},
}).as("membership");
cy.intercept("GET", `${apiBase}/users/me*`, {
statusCode: 200,
body: {
id: "u1",
clerk_user_id: "clerk_u1",
email,
name: "Jane Test",
preferred_name: "Jane",
timezone: "America/New_York",
is_super_admin: false,
},
}).as("me");
cy.intercept("GET", `${apiBase}/organizations/me/list*`, {
statusCode: 200,
body: [
{ id: "o1", name: "Personal", role: "owner", is_active: true },
],
}).as("organizations");
cy.intercept("GET", `${apiBase}/tags*`, {
statusCode: 200,
body: { items: [], total: 0, limit: 200, offset: 0 },
}).as("tags");
cy.intercept("GET", `${apiBase}/organizations/me/custom-fields*`, {
statusCode: 200,
body: [],
}).as("customFields");
cy.intercept("GET", `${apiBase}/boards/b1/snapshot*`, {
statusCode: 200,
body: {
board: {
id: "b1",
name: "Demo Board",
slug: "demo-board",
description: "Demo",
gateway_id: "g1",
board_group_id: null,
board_type: "general",
objective: null,
success_metrics: null,
target_date: null,
goal_confirmed: true,
goal_source: "test",
organization_id: "o1",
created_at: "2026-02-11T00:00:00Z",
updated_at: "2026-02-11T00:00:00Z",
},
tasks: [
{
id: "t1",
board_id: "b1",
title: "Inbox task",
description: "",
status: "inbox",
priority: "medium",
due_at: null,
assigned_agent_id: null,
depends_on_task_ids: [],
created_by_user_id: null,
in_progress_at: null,
created_at: "2026-02-11T00:00:00Z",
updated_at: "2026-02-11T00:00:00Z",
blocked_by_task_ids: [],
is_blocked: false,
assignee: null,
approvals_count: 0,
approvals_pending_count: 0,
},
],
agents: [],
approvals: [],
chat_messages: [],
pending_approvals_count: 0,
},
}).as("snapshot");
cy.intercept("GET", `${apiBase}/boards/b1/group-snapshot*`, {
statusCode: 200,
body: { group: null, boards: [] },
}).as("groupSnapshot");
cy.intercept("POST", `${apiBase}/boards/b1/tasks`, (req) => {
// Minimal assertion the UI sends expected fields.
expect(req.body).to.have.property("title");
req.reply({
statusCode: 200,
body: {
id: "t2",
board_id: "b1",
title: req.body.title,
description: req.body.description ?? "",
status: "inbox",
priority: req.body.priority ?? "medium",
due_at: null,
assigned_agent_id: null,
depends_on_task_ids: [],
created_by_user_id: null,
in_progress_at: null,
created_at: "2026-02-11T00:00:00Z",
updated_at: "2026-02-11T00:00:00Z",
blocked_by_task_ids: [],
is_blocked: false,
assignee: null,
approvals_count: 0,
approvals_pending_count: 0,
},
});
}).as("createTask");
cy.intercept("PATCH", `${apiBase}/boards/b1/tasks/t1`, (req) => {
expect(req.body).to.have.property("status");
req.reply({
statusCode: 200,
body: {
id: "t1",
board_id: "b1",
title: "Inbox task",
description: "",
status: req.body.status,
priority: "medium",
due_at: null,
assigned_agent_id: null,
depends_on_task_ids: [],
created_by_user_id: null,
in_progress_at: null,
created_at: "2026-02-11T00:00:00Z",
updated_at: "2026-02-11T00:00:01Z",
blocked_by_task_ids: [],
is_blocked: false,
assignee: null,
approvals_count: 0,
approvals_pending_count: 0,
},
});
}).as("updateTask");
cy.intercept("DELETE", `${apiBase}/boards/b1/tasks/t1`, {
statusCode: 200,
body: { ok: true },
}).as("deleteTask");
cy.intercept("GET", `${apiBase}/boards/b1/tasks/t1/comments*`, {
statusCode: 200,
body: { items: [], total: 0, limit: 200, offset: 0 },
}).as("taskComments");
cy.loginWithLocalAuth();
cy.visit("/boards/b1");
cy.waitForAppLoaded();
cy.wait([
"@snapshot",
"@groupSnapshot",
"@membership",
"@me",
"@organizations",
"@tags",
"@customFields",
]);
// Existing task visible.
cy.contains("Inbox task").should("be.visible");
// Open create task flow.
// Board page uses an icon-only button with aria-label="New task".
cy.get('button[aria-label="New task"]')
.should("be.visible")
.and("not.be.disabled")
.click();
cy.contains('[role="dialog"]', "New task")
.should("be.visible")
.within(() => {
cy.contains("label", "Title").parent().find("input").type("New task");
cy.contains("button", /^Create task$/)
.should("be.visible")
.and("not.be.disabled")
.click();
});
cy.wait(["@createTask"]);
cy.contains("New task").should("be.visible");
// Open edit task dialog.
cy.contains("Inbox task").scrollIntoView().should("be.visible").click();
cy.wait(["@taskComments"]);
cy.contains(/task detail/i).should("be.visible");
openEditTaskDialog();
// Change status via Status select.
cy.get('[aria-label="Edit task"]').within(() => {
cy.contains("label", "Status")
.parent()
.within(() => {
cy.get('[role="combobox"]').first().should("be.visible").click();
});
});
cy.contains("In progress").should("be.visible").click();
cy.contains("button", /save changes/i)
.should("be.visible")
.and("not.be.disabled")
.click();
cy.wait(["@updateTask"]);
cy.get('[aria-label="Edit task"]').should("not.exist");
// Save closes the edit dialog; reopen it from task detail.
cy.contains(/task detail/i).should("be.visible");
openEditTaskDialog();
// Delete task via delete dialog.
cy.get('[aria-label="Edit task"]').within(() => {
cy.contains("button", /^Delete task$/)
.scrollIntoView()
.should("be.visible")
.and("not.be.disabled")
.click();
});
cy.get('[aria-label="Delete task"]').should("be.visible");
cy.get('[aria-label="Delete task"]').within(() => {
cy.contains("button", /^Delete task$/)
.scrollIntoView()
.should("be.visible")
.and("not.be.disabled")
.click();
});
cy.wait(["@deleteTask"]);
cy.contains("Inbox task").should("not.exist");
});
});

View File

@@ -0,0 +1,102 @@
/// <reference types="cypress" />
describe("/boards", () => {
const apiBase = "**/api/v1";
const email = "local-auth-user@example.com";
const originalDefaultCommandTimeout = Cypress.config("defaultCommandTimeout");
beforeEach(() => {
Cypress.config("defaultCommandTimeout", 20_000);
});
afterEach(() => {
Cypress.config("defaultCommandTimeout", originalDefaultCommandTimeout);
});
it("auth negative: signed-out user is shown local auth login", () => {
cy.visit("/boards");
cy.contains("h1", /local authentication/i, { timeout: 30_000 }).should(
"be.visible",
);
});
it("happy path: signed-in user sees boards list", () => {
cy.intercept("GET", `${apiBase}/organizations/me/member*`, {
statusCode: 200,
body: {
id: "m1",
organization_id: "o1",
user_id: "u1",
role: "owner",
all_boards_read: true,
all_boards_write: true,
created_at: "2026-02-11T00:00:00Z",
updated_at: "2026-02-11T00:00:00Z",
board_access: [],
},
}).as("membership");
cy.intercept("GET", `${apiBase}/users/me*`, {
statusCode: 200,
body: {
id: "u1",
clerk_user_id: "clerk_u1",
email,
name: "Jane Test",
preferred_name: "Jane",
timezone: "America/New_York",
is_super_admin: false,
},
}).as("me");
cy.intercept("GET", `${apiBase}/organizations/me/list*`, {
statusCode: 200,
body: [
{ id: "o1", name: "Personal", role: "owner", is_active: true },
],
}).as("organizations");
cy.intercept("GET", `${apiBase}/boards*`, {
statusCode: 200,
body: {
items: [
{
id: "b1",
name: "Demo Board",
slug: "demo-board",
description: "Demo",
gateway_id: "g1",
board_group_id: null,
board_type: "general",
objective: null,
success_metrics: null,
target_date: null,
goal_confirmed: true,
goal_source: "test",
organization_id: "o1",
created_at: "2026-02-11T00:00:00Z",
updated_at: "2026-02-11T00:00:00Z",
},
],
total: 1,
limit: 200,
offset: 0,
},
}).as("boards");
cy.intercept("GET", `${apiBase}/board-groups*`, {
statusCode: 200,
body: { items: [], total: 0, limit: 200, offset: 0 },
}).as("boardGroups");
cy.loginWithLocalAuth();
cy.visit("/boards");
cy.waitForAppLoaded();
cy.wait(["@membership", "@me", "@organizations", "@boards", "@boardGroups"]);
cy.contains(/boards/i).should("be.visible");
cy.contains("Demo Board").should("be.visible");
});
});

View File

@@ -1,16 +0,0 @@
describe("Clerk login", () => {
it("user can sign in via Clerk testing commands", () => {
const email = Cypress.env("CLERK_TEST_EMAIL") || "jane+clerk_test@example.com";
// Prereq per Clerk docs: visit a non-protected page that loads Clerk.
cy.visit("/sign-in");
cy.clerkLoaded();
cy.clerkSignIn({ strategy: "email_code", identifier: email });
// After login, user should be able to access protected route.
cy.visit("/activity");
cy.waitForAppLoaded();
cy.contains(/live feed/i).should("be.visible");
});
});

View File

@@ -0,0 +1,49 @@
describe("Local auth login", () => {
it("user with local auth token can access protected route", () => {
cy.intercept("GET", "**/api/v1/users/me*", {
statusCode: 200,
body: {
id: "u1",
clerk_user_id: "local-auth-user",
email: "local@example.com",
name: "Local User",
preferred_name: "Local User",
timezone: "UTC",
},
}).as("usersMe");
cy.intercept("GET", "**/api/v1/organizations/me/list*", {
statusCode: 200,
body: [
{
id: "org1",
name: "Testing Org",
is_active: true,
role: "owner",
},
],
}).as("orgsList");
cy.intercept("GET", "**/api/v1/organizations/me/member*", {
statusCode: 200,
body: { organization_id: "org1", role: "owner" },
}).as("orgMeMember");
cy.intercept("GET", "**/api/v1/boards*", {
statusCode: 200,
body: {
items: [{ id: "b1", name: "Testing", updated_at: "2026-02-07T00:00:00Z" }],
},
}).as("boardsList");
cy.intercept("GET", "**/api/v1/boards/b1/snapshot*", {
statusCode: 200,
body: { tasks: [], agents: [], approvals: [], chat_messages: [] },
}).as("boardSnapshot");
cy.loginWithLocalAuth();
cy.visit("/activity");
cy.waitForAppLoaded();
cy.contains(/live feed/i).should("be.visible");
});
});

View File

@@ -1,36 +1,88 @@
describe("Organizations (PR #61)", () => {
const email = Cypress.env("CLERK_TEST_EMAIL") || "jane+clerk_test@example.com";
const apiBase = "**/api/v1";
it("negative: signed-out user is redirected to sign-in when opening /organization", () => {
function stubOrganizationApis() {
cy.intercept("GET", `${apiBase}/users/me*`, {
statusCode: 200,
body: {
id: "u1",
clerk_user_id: "local-auth-user",
email: "local@example.com",
name: "Local User",
preferred_name: "Local User",
timezone: "UTC",
},
}).as("usersMe");
cy.intercept("GET", `${apiBase}/organizations/me/list*`, {
statusCode: 200,
body: [
{
id: "org1",
name: "Testing Org",
is_active: true,
role: "member",
},
],
}).as("orgsList");
cy.intercept("GET", `${apiBase}/organizations/me/member*`, {
statusCode: 200,
body: {
id: "membership-1",
user_id: "u1",
organization_id: "org1",
role: "member",
},
}).as("orgMembership");
cy.intercept("GET", `${apiBase}/organizations/me`, {
statusCode: 200,
body: { id: "org1", name: "Testing Org" },
}).as("orgMe");
cy.intercept("GET", `${apiBase}/organizations/me/members*`, {
statusCode: 200,
body: {
items: [
{
id: "membership-1",
user_id: "u1",
role: "member",
user: {
id: "u1",
email: "local@example.com",
name: "Local User",
preferred_name: "Local User",
},
},
],
},
}).as("orgMembers");
cy.intercept("GET", `${apiBase}/boards*`, {
statusCode: 200,
body: { items: [] },
}).as("boardsList");
}
it("negative: signed-out user sees auth prompt when opening /organization", () => {
cy.visit("/organization");
cy.location("pathname", { timeout: 30_000 }).should("match", /\/sign-in/);
cy.contains(/sign in to manage your organization|local authentication/i, {
timeout: 30_000,
}).should("be.visible");
});
it("positive: signed-in user can view /organization and sees correct invite permissions", () => {
// Story (positive): a signed-in user can reach the organization page.
// Story (negative within flow): non-admin users cannot invite members.
cy.visit("/sign-in");
cy.clerkLoaded();
cy.clerkSignIn({ strategy: "email_code", identifier: email });
stubOrganizationApis();
cy.loginWithLocalAuth();
cy.visit("/organization");
cy.waitForAppLoaded();
cy.contains(/members\s*&\s*invites/i).should("be.visible");
// Deterministic assertion across roles:
// - if user is admin: invite button enabled
// - else: invite button disabled with the correct tooltip
cy.contains("button", /invite member/i)
.should("be.visible")
.then(($btn) => {
const isDisabled = $btn.is(":disabled");
if (isDisabled) {
cy.wrap($btn)
.should("have.attr", "title")
.and("match", /only organization admins can invite/i);
} else {
cy.wrap($btn).should("not.be.disabled");
}
});
.should("be.disabled")
.and("have.attr", "title")
.and("match", /only organization admins can invite/i);
});
});

View File

@@ -1,46 +1,9 @@
/// <reference types="cypress" />
type ClerkOtpLoginOptions = {
clerkOrigin: string;
email: string;
otp: string;
};
const APP_LOAD_TIMEOUT_MS = 30_000;
function getEnv(name: string, fallback?: string): string {
const value = Cypress.env(name) as string | undefined;
if (value) return value;
if (fallback !== undefined) return fallback;
throw new Error(
`Missing Cypress env var ${name}. ` +
`Set it via CYPRESS_${name}=... in CI/local before running Clerk login tests.`,
);
}
function clerkOriginFromPublishableKey(): string {
const key = getEnv("NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY");
// pk_test_<base64(domain$)> OR pk_live_<...>
const m = /^pk_(?:test|live)_(.+)$/.exec(key);
if (!m) throw new Error(`Unexpected Clerk publishable key format: ${key}`);
const decoded = atob(m[1]); // e.g. beloved-ghost-73.clerk.accounts.dev$
const domain = decoded.replace(/\$$/, "");
// Some flows redirect to *.accounts.dev (no clerk. subdomain)
const normalized = domain.replace(".clerk.accounts.dev", ".accounts.dev");
return `https://${normalized}`;
}
function normalizeOrigin(value: string): string {
try {
const url = new URL(value);
return url.origin;
} catch {
return value.replace(/\/$/, "");
}
}
const LOCAL_AUTH_STORAGE_KEY = "mc_local_auth_token";
const DEFAULT_LOCAL_AUTH_TOKEN =
"cypress-local-auth-token-0123456789-0123456789-0123456789x";
Cypress.Commands.add("waitForAppLoaded", () => {
cy.get("[data-cy='route-loader']", {
@@ -52,153 +15,19 @@ Cypress.Commands.add("waitForAppLoaded", () => {
}).should("have.attr", "aria-hidden", "true");
});
Cypress.Commands.add("loginWithClerkOtp", () => {
const clerkOrigin = normalizeOrigin(
getEnv("CLERK_ORIGIN", clerkOriginFromPublishableKey()),
);
const email = getEnv("CLERK_TEST_EMAIL", "jane+clerk_test@example.com");
const otp = getEnv("CLERK_TEST_OTP", "424242");
Cypress.Commands.add("loginWithLocalAuth", (token = DEFAULT_LOCAL_AUTH_TOKEN) => {
cy.visit("/", {
onBeforeLoad(win) {
win.sessionStorage.setItem(LOCAL_AUTH_STORAGE_KEY, token);
},
});
});
const opts: ClerkOtpLoginOptions = { clerkOrigin, email, otp };
// Navigate to a dedicated sign-in route that renders Clerk SignIn top-level.
// Cypress cannot reliably drive Clerk modal/iframe flows.
cy.visit("/sign-in");
const emailSelector =
'input[type="email"], input[name="identifier"], input[autocomplete="email"]';
const otpSelector =
'input[autocomplete="one-time-code"], input[name*="code"], input[name^="code"], input[name^="code."], input[inputmode="numeric"]';
const continueSelector = 'button[type="submit"], button';
const methodSelector = /email|code|otp|send code|verification|verify|use email/i;
const fillEmailStep = (email: string) => {
cy.get(emailSelector, { timeout: 20_000 })
.first()
.clear()
.type(email, { delay: 10 });
cy.contains(continueSelector, /continue|sign in|send|next/i, { timeout: 20_000 })
.should("be.visible")
.click({ force: true });
};
const maybeSelectEmailCodeMethod = () => {
cy.get("body").then(($body) => {
const hasOtp = $body.find(otpSelector).length > 0;
if (hasOtp) return;
const candidates = $body
.find("button,a")
.toArray()
.filter((el) => methodSelector.test((el.textContent || "").trim()));
if (candidates.length > 0) {
cy.wrap(candidates[0]).click({ force: true });
}
});
};
const waitForOtpOrMethod = () => {
cy.get("body", { timeout: 60_000 }).should(($body) => {
const hasOtp = $body.find(otpSelector).length > 0;
const hasMethod = $body
.find("button,a")
.toArray()
.some((el) => methodSelector.test((el.textContent || "").trim()));
expect(
hasOtp || hasMethod,
"waiting for OTP input or verification method UI",
).to.equal(true);
});
};
const fillOtpAndSubmit = (otp: string) => {
waitForOtpOrMethod();
maybeSelectEmailCodeMethod();
cy.get(otpSelector, { timeout: 60_000 }).first().clear().type(otp, { delay: 10 });
cy.get("body").then(($body) => {
const hasSubmit = $body
.find(continueSelector)
.toArray()
.some((el) => /verify|continue|sign in|confirm/i.test(el.textContent || ""));
if (hasSubmit) {
cy.contains(continueSelector, /verify|continue|sign in|confirm/i, { timeout: 20_000 })
.should("be.visible")
.click({ force: true });
}
});
};
// Clerk SignIn can start on our app origin and then redirect to Clerk-hosted UI.
// Do email step first, then decide where the OTP step lives based on the *current* origin.
fillEmailStep(opts.email);
cy.location("origin", { timeout: 60_000 }).then((origin) => {
const current = normalizeOrigin(origin);
if (current === opts.clerkOrigin) {
cy.origin(
opts.clerkOrigin,
{ args: { otp: opts.otp } },
({ otp }) => {
const otpSelector =
'input[autocomplete="one-time-code"], input[name*="code"], input[name^="code"], input[name^="code."], input[inputmode="numeric"]';
const continueSelector = 'button[type="submit"], button';
const methodSelector = /email|code|otp|send code|verification|verify|use email/i;
const maybeSelectEmailCodeMethod = () => {
cy.get("body").then(($body) => {
const hasOtp = $body.find(otpSelector).length > 0;
if (hasOtp) return;
const candidates = $body
.find("button,a")
.toArray()
.filter((el) => methodSelector.test((el.textContent || "").trim()));
if (candidates.length > 0) {
cy.wrap(candidates[0]).click({ force: true });
}
});
};
const waitForOtpOrMethod = () => {
cy.get("body", { timeout: 60_000 }).should(($body) => {
const hasOtp = $body.find(otpSelector).length > 0;
const hasMethod = $body
.find("button,a")
.toArray()
.some((el) => methodSelector.test((el.textContent || "").trim()));
expect(
hasOtp || hasMethod,
"waiting for OTP input or verification method UI",
).to.equal(true);
});
};
waitForOtpOrMethod();
maybeSelectEmailCodeMethod();
cy.get(otpSelector, { timeout: 60_000 }).first().clear().type(otp, { delay: 10 });
cy.get("body").then(($body) => {
const hasSubmit = $body
.find(continueSelector)
.toArray()
.some((el) => /verify|continue|sign in|confirm/i.test(el.textContent || ""));
if (hasSubmit) {
cy.contains(continueSelector, /verify|continue|sign in|confirm/i, { timeout: 20_000 })
.should("be.visible")
.click({ force: true });
}
});
},
);
} else {
fillOtpAndSubmit(opts.otp);
}
Cypress.Commands.add("logoutLocalAuth", () => {
cy.visit("/", {
onBeforeLoad(win) {
win.sessionStorage.removeItem(LOCAL_AUTH_STORAGE_KEY);
},
});
});
@@ -212,15 +41,14 @@ declare global {
waitForAppLoaded(): Chainable<void>;
/**
* Logs in via the real Clerk SignIn page using deterministic OTP credentials.
*
* Optional env vars (CYPRESS_*):
* - CLERK_ORIGIN (e.g. https://<subdomain>.accounts.dev)
* - NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY (used to derive origin when CLERK_ORIGIN not set)
* - CLERK_TEST_EMAIL (default: jane+clerk_test@example.com)
* - CLERK_TEST_OTP (default: 424242)
* Seeds session storage with a local auth token for local-auth mode.
*/
loginWithClerkOtp(): Chainable<void>;
loginWithLocalAuth(token?: string): Chainable<void>;
/**
* Clears local auth token from session storage.
*/
logoutLocalAuth(): Chainable<void>;
}
}
}

View File

@@ -5,6 +5,15 @@
import { addClerkCommands } from "@clerk/testing/cypress";
// Clerk/Next.js occasionally throws a non-deterministic hydration mismatch
// on /sign-in. Ignore this known UI noise so E2E assertions can proceed.
Cypress.on("uncaught:exception", (err) => {
if (err?.message?.includes("Hydration failed")) {
return false;
}
return true;
});
addClerkCommands({ Cypress, cy });
import "./commands";

View File

@@ -23,6 +23,7 @@ export interface BoardCreate {
goal_source?: string | null;
require_approval_for_done?: boolean;
require_review_before_done?: boolean;
comment_required_for_review?: boolean;
block_status_changes_with_pending_approval?: boolean;
only_lead_can_change_status?: boolean;
/** @minimum 0 */

View File

@@ -23,6 +23,7 @@ export interface BoardRead {
goal_source?: string | null;
require_approval_for_done?: boolean;
require_review_before_done?: boolean;
comment_required_for_review?: boolean;
block_status_changes_with_pending_approval?: boolean;
only_lead_can_change_status?: boolean;
/** @minimum 0 */

View File

@@ -23,6 +23,7 @@ export interface BoardUpdate {
goal_source?: string | null;
require_approval_for_done?: boolean | null;
require_review_before_done?: boolean | null;
comment_required_for_review?: boolean | null;
block_status_changes_with_pending_approval?: boolean | null;
only_lead_can_change_status?: boolean | null;
max_agents?: number | null;

View File

@@ -62,19 +62,34 @@ vi.mock("@clerk/nextjs", () => {
describe("/activity auth boundary", () => {
it("renders without ClerkProvider runtime errors when publishable key is a placeholder", () => {
const previousAuthMode = process.env.NEXT_PUBLIC_AUTH_MODE;
const previousPublishableKey =
process.env.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY;
// Simulate CI/secretless env where an arbitrary placeholder value may be present.
// AuthProvider should treat this as disabled, and the auth wrappers must not render
// Clerk SignedOut/SignedIn components.
process.env.NEXT_PUBLIC_AUTH_MODE = "local";
process.env.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY = "placeholder";
window.sessionStorage.clear();
render(
<AuthProvider>
<QueryProvider>
<ActivityPage />
</QueryProvider>
</AuthProvider>,
);
try {
render(
<AuthProvider>
<QueryProvider>
<ActivityPage />
</QueryProvider>
</AuthProvider>,
);
expect(screen.getByText(/sign in to view the feed/i)).toBeInTheDocument();
expect(
screen.getByRole("heading", { name: /local authentication/i }),
).toBeInTheDocument();
expect(screen.getByLabelText(/access token/i)).toBeInTheDocument();
} finally {
process.env.NEXT_PUBLIC_AUTH_MODE = previousAuthMode;
process.env.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY = previousPublishableKey;
window.sessionStorage.clear();
}
});
});

View File

@@ -59,16 +59,31 @@ vi.mock("@clerk/nextjs", () => {
describe("/approvals auth boundary", () => {
it("renders without ClerkProvider runtime errors when publishable key is a placeholder", () => {
const previousAuthMode = process.env.NEXT_PUBLIC_AUTH_MODE;
const previousPublishableKey =
process.env.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY;
process.env.NEXT_PUBLIC_AUTH_MODE = "local";
process.env.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY = "placeholder";
window.sessionStorage.clear();
render(
<AuthProvider>
<QueryProvider>
<GlobalApprovalsPage />
</QueryProvider>
</AuthProvider>,
);
try {
render(
<AuthProvider>
<QueryProvider>
<GlobalApprovalsPage />
</QueryProvider>
</AuthProvider>,
);
expect(screen.getByText(/sign in to view approvals/i)).toBeInTheDocument();
expect(
screen.getByRole("heading", { name: /local authentication/i }),
).toBeInTheDocument();
expect(screen.getByLabelText(/access token/i)).toBeInTheDocument();
} finally {
process.env.NEXT_PUBLIC_AUTH_MODE = previousAuthMode;
process.env.NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY = previousPublishableKey;
window.sessionStorage.clear();
}
});
});

View File

@@ -291,6 +291,9 @@ export default function EditBoardPage() {
const [requireReviewBeforeDone, setRequireReviewBeforeDone] = useState<
boolean | undefined
>(undefined);
const [commentRequiredForReview, setCommentRequiredForReview] = useState<
boolean | undefined
>(undefined);
const [
blockStatusChangesWithPendingApproval,
setBlockStatusChangesWithPendingApproval,
@@ -504,6 +507,8 @@ export default function EditBoardPage() {
requireApprovalForDone ?? baseBoard?.require_approval_for_done ?? true;
const resolvedRequireReviewBeforeDone =
requireReviewBeforeDone ?? baseBoard?.require_review_before_done ?? false;
const resolvedCommentRequiredForReview =
commentRequiredForReview ?? baseBoard?.comment_required_for_review ?? false;
const resolvedBlockStatusChangesWithPendingApproval =
blockStatusChangesWithPendingApproval ??
baseBoard?.block_status_changes_with_pending_approval ??
@@ -588,6 +593,7 @@ export default function EditBoardPage() {
setObjective(updated.objective ?? "");
setRequireApprovalForDone(updated.require_approval_for_done ?? true);
setRequireReviewBeforeDone(updated.require_review_before_done ?? false);
setCommentRequiredForReview(updated.comment_required_for_review ?? false);
setBlockStatusChangesWithPendingApproval(
updated.block_status_changes_with_pending_approval ?? false,
);
@@ -656,6 +662,7 @@ export default function EditBoardPage() {
: resolvedObjective.trim() || null,
require_approval_for_done: resolvedRequireApprovalForDone,
require_review_before_done: resolvedRequireReviewBeforeDone,
comment_required_for_review: resolvedCommentRequiredForReview,
block_status_changes_with_pending_approval:
resolvedBlockStatusChangesWithPendingApproval,
only_lead_can_change_status: resolvedOnlyLeadCanChangeStatus,
@@ -1016,6 +1023,42 @@ export default function EditBoardPage() {
</span>
</span>
</div>
<div className="flex items-start gap-3 rounded-lg border border-slate-200 px-3 py-3">
<button
type="button"
role="switch"
aria-checked={resolvedCommentRequiredForReview}
aria-label="Require comment for review"
onClick={() =>
setCommentRequiredForReview(
!resolvedCommentRequiredForReview,
)
}
disabled={isLoading}
className={`mt-0.5 inline-flex h-6 w-11 shrink-0 items-center rounded-full border transition ${
resolvedCommentRequiredForReview
? "border-emerald-600 bg-emerald-600"
: "border-slate-300 bg-slate-200"
} ${isLoading ? "cursor-not-allowed opacity-60" : "cursor-pointer"}`}
>
<span
className={`inline-block h-5 w-5 rounded-full bg-white shadow-sm transition ${
resolvedCommentRequiredForReview
? "translate-x-5"
: "translate-x-0.5"
}`}
/>
</button>
<span className="space-y-1">
<span className="block text-sm font-medium text-slate-900">
Require comment for review
</span>
<span className="block text-xs text-slate-600">
Require a task comment when moving status to{" "}
<code>review</code>.
</span>
</span>
</div>
<div className="flex items-start gap-3 rounded-lg border border-slate-200 px-3 py-3">
<button
type="button"

View File

@@ -3,10 +3,17 @@
import { useSearchParams } from "next/navigation";
import { SignIn } from "@clerk/nextjs";
import { isLocalAuthMode } from "@/auth/localAuth";
import { resolveSignInRedirectUrl } from "@/auth/redirects";
import { LocalAuthLogin } from "@/components/organisms/LocalAuthLogin";
export default function SignInPage() {
const searchParams = useSearchParams();
if (isLocalAuthMode()) {
return <LocalAuthLogin />;
}
const forceRedirectUrl = resolveSignInRedirectUrl(
searchParams.get("redirect_url"),
);

View File

@@ -86,6 +86,9 @@ describe("BoardOnboardingChat polling", () => {
);
await screen.findByText("What should we prioritize?");
await waitFor(() => {
expect(screen.getByRole("button", { name: "Option A" })).toBeEnabled();
});
const callsBeforeWait = getOnboardingMock.mock.calls.length;
await act(async () => {

View File

@@ -46,7 +46,9 @@ describe("validateGatewayUrl", () => {
});
it("accepts userinfo URLs with explicit port", () => {
expect(validateGatewayUrl("ws://user:pass@gateway.example.com:8080")).toBeNull();
expect(
validateGatewayUrl("ws://user:pass@gateway.example.com:8080"),
).toBeNull();
});
it("accepts userinfo URLs with IPv6 host and explicit port", () => {