From b36f75547050f7f8ada7ee64a75bf2e2023afc1d Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Wed, 4 Feb 2026 15:22:52 +0530 Subject: [PATCH] feat(gateway): Add cron job provisioning Create a mission control runner cron job via the gateway HTTP API\nand ensure it is present on startup. Adds cron helpers, job\nbuilder, and a startup hook.\n\nCo-Authored-By: Claude --- backend/app/integrations/openclaw_gateway.py | 58 ++++++++++++++++- backend/app/main.py | 4 +- backend/app/services/cron_jobs.py | 65 ++++++++++++++++++++ 3 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 backend/app/services/cron_jobs.py diff --git a/backend/app/integrations/openclaw_gateway.py b/backend/app/integrations/openclaw_gateway.py index 1793a61d..e52878cc 100644 --- a/backend/app/integrations/openclaw_gateway.py +++ b/backend/app/integrations/openclaw_gateway.py @@ -4,9 +4,10 @@ import asyncio import json from dataclasses import dataclass from typing import Any -from urllib.parse import urlencode, urlparse, urlunparse +from urllib.parse import quote, urlencode, urlparse, urlunparse from uuid import uuid4 +import httpx import websockets from app.core.config import settings @@ -31,6 +32,48 @@ def _build_gateway_url() -> str: return urlunparse(parsed._replace(query=query)) +def _build_gateway_http_url() -> str: + base_url = settings.openclaw_gateway_url or "ws://127.0.0.1:18789" + parsed = urlparse(base_url) + if parsed.scheme in {"http", "https"}: + scheme = parsed.scheme + elif parsed.scheme == "wss": + scheme = "https" + else: + scheme = "http" + return urlunparse( + parsed._replace(scheme=scheme, path="", params="", query="", fragment="") + ) + + +def _gateway_headers() -> dict[str, str]: + headers: dict[str, str] = {} + if settings.openclaw_gateway_token: + headers["Authorization"] = f"Bearer {settings.openclaw_gateway_token}" + return headers + + +async def _http_request(method: str, path: str, payload: dict[str, Any] | None = None) -> Any: + base_url = _build_gateway_http_url().rstrip("/") + url = f"{base_url}{path}" + try: + async with httpx.AsyncClient(timeout=10) as client: + response = await client.request( + method, url, json=payload, headers=_gateway_headers() + ) + if response.status_code >= 400: + raise OpenClawGatewayError( + f"{response.status_code}: {response.text or 'Gateway error'}" + ) + if not response.content: + return None + return response.json() + except OpenClawGatewayError: + raise + except Exception as exc: # pragma: no cover - transport errors + raise OpenClawGatewayError(str(exc)) from exc + + async def _await_response(ws: websockets.WebSocketClientProtocol, request_id: str) -> Any: while True: raw = await ws.recv() @@ -137,3 +180,16 @@ async def ensure_session(session_key: str, label: str | None = None) -> Any: if label: params["label"] = label return await openclaw_call("sessions.patch", params) + + +async def list_cron_jobs() -> Any: + return await _http_request("GET", "/api/v1/cron/jobs") + + +async def upsert_cron_job(job: dict[str, Any]) -> Any: + return await _http_request("POST", "/api/v1/cron/jobs", payload=job) + + +async def delete_cron_job(name: str) -> Any: + safe_name = quote(name, safe="") + return await _http_request("DELETE", f"/api/v1/cron/jobs/{safe_name}") diff --git a/backend/app/main.py b/backend/app/main.py index e7e6bca2..8f7b1e5c 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -12,6 +12,7 @@ from app.api.tasks import router as tasks_router from app.core.config import settings from app.core.logging import configure_logging from app.db.session import init_db +from app.services.cron_jobs import ensure_mission_control_cron_job configure_logging() @@ -29,8 +30,9 @@ if origins: @app.on_event("startup") -def on_startup() -> None: +async def on_startup() -> None: init_db() + await ensure_mission_control_cron_job() @app.get("/health") diff --git a/backend/app/services/cron_jobs.py b/backend/app/services/cron_jobs.py new file mode 100644 index 00000000..05295edd --- /dev/null +++ b/backend/app/services/cron_jobs.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import logging +from typing import Any + +from app.integrations.openclaw_gateway import ( + OpenClawGatewayError, + list_cron_jobs, + upsert_cron_job, +) + +logger = logging.getLogger(__name__) + +MISSION_CONTROL_CRON_NAME = "mission-control-runner/10m" + + +def _mission_control_runner_message() -> str: + return ( + "You are the Mission Control Runner agent.\n\n" + "On this scheduled tick:\n" + "- Run the HEARTBEAT.md procedure for Mission Control (check-in, list boards, " + "list tasks).\n" + "- If any task is already in_progress, stop (do not claim another).\n" + "- Otherwise, find the oldest inbox task across all boards, claim it by moving " + "to in_progress.\n" + "- Execute the task fully.\n" + "- When complete, move it to review.\n" + "- If no inbox tasks exist, do nothing.\n" + "Only update Mission Control (no chat messages)." + ) + + +def build_mission_control_cron_job() -> dict[str, Any]: + return { + "name": MISSION_CONTROL_CRON_NAME, + "schedule": {"kind": "every", "everyMs": 600000}, + "sessionTarget": "isolated", + "enabled": True, + "payload": {"kind": "agentTurn", "message": _mission_control_runner_message()}, + } + + +async def ensure_mission_control_cron_job() -> None: + try: + payload = await list_cron_jobs() + except OpenClawGatewayError as exc: + logger.warning("Gateway cron list failed: %s", exc) + return + + jobs: list[dict[str, Any]] = [] + if isinstance(payload, list): + jobs = payload + elif isinstance(payload, dict): + jobs = list(payload.get("jobs", [])) + + job = build_mission_control_cron_job() + if any(item.get("name") == job["name"] for item in jobs): + logger.info("Updating gateway cron job: %s", job["name"]) + else: + logger.info("Creating gateway cron job: %s", job["name"]) + + try: + await upsert_cron_job(job) + except OpenClawGatewayError as exc: + logger.warning("Gateway cron upsert failed: %s", exc)