feat(gateway): Add cron job provisioning
Create a mission control runner cron job via the gateway HTTP API\nand ensure it is present on startup. Adds cron helpers, job\nbuilder, and a startup hook.\n\nCo-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -4,9 +4,10 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from urllib.parse import urlencode, urlparse, urlunparse
|
from urllib.parse import quote, urlencode, urlparse, urlunparse
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
|
import httpx
|
||||||
import websockets
|
import websockets
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
@@ -31,6 +32,48 @@ def _build_gateway_url() -> str:
|
|||||||
return urlunparse(parsed._replace(query=query))
|
return urlunparse(parsed._replace(query=query))
|
||||||
|
|
||||||
|
|
||||||
|
def _build_gateway_http_url() -> str:
|
||||||
|
base_url = settings.openclaw_gateway_url or "ws://127.0.0.1:18789"
|
||||||
|
parsed = urlparse(base_url)
|
||||||
|
if parsed.scheme in {"http", "https"}:
|
||||||
|
scheme = parsed.scheme
|
||||||
|
elif parsed.scheme == "wss":
|
||||||
|
scheme = "https"
|
||||||
|
else:
|
||||||
|
scheme = "http"
|
||||||
|
return urlunparse(
|
||||||
|
parsed._replace(scheme=scheme, path="", params="", query="", fragment="")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _gateway_headers() -> dict[str, str]:
|
||||||
|
headers: dict[str, str] = {}
|
||||||
|
if settings.openclaw_gateway_token:
|
||||||
|
headers["Authorization"] = f"Bearer {settings.openclaw_gateway_token}"
|
||||||
|
return headers
|
||||||
|
|
||||||
|
|
||||||
|
async def _http_request(method: str, path: str, payload: dict[str, Any] | None = None) -> Any:
|
||||||
|
base_url = _build_gateway_http_url().rstrip("/")
|
||||||
|
url = f"{base_url}{path}"
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10) as client:
|
||||||
|
response = await client.request(
|
||||||
|
method, url, json=payload, headers=_gateway_headers()
|
||||||
|
)
|
||||||
|
if response.status_code >= 400:
|
||||||
|
raise OpenClawGatewayError(
|
||||||
|
f"{response.status_code}: {response.text or 'Gateway error'}"
|
||||||
|
)
|
||||||
|
if not response.content:
|
||||||
|
return None
|
||||||
|
return response.json()
|
||||||
|
except OpenClawGatewayError:
|
||||||
|
raise
|
||||||
|
except Exception as exc: # pragma: no cover - transport errors
|
||||||
|
raise OpenClawGatewayError(str(exc)) from exc
|
||||||
|
|
||||||
|
|
||||||
async def _await_response(ws: websockets.WebSocketClientProtocol, request_id: str) -> Any:
|
async def _await_response(ws: websockets.WebSocketClientProtocol, request_id: str) -> Any:
|
||||||
while True:
|
while True:
|
||||||
raw = await ws.recv()
|
raw = await ws.recv()
|
||||||
@@ -137,3 +180,16 @@ async def ensure_session(session_key: str, label: str | None = None) -> Any:
|
|||||||
if label:
|
if label:
|
||||||
params["label"] = label
|
params["label"] = label
|
||||||
return await openclaw_call("sessions.patch", params)
|
return await openclaw_call("sessions.patch", params)
|
||||||
|
|
||||||
|
|
||||||
|
async def list_cron_jobs() -> Any:
|
||||||
|
return await _http_request("GET", "/api/v1/cron/jobs")
|
||||||
|
|
||||||
|
|
||||||
|
async def upsert_cron_job(job: dict[str, Any]) -> Any:
|
||||||
|
return await _http_request("POST", "/api/v1/cron/jobs", payload=job)
|
||||||
|
|
||||||
|
|
||||||
|
async def delete_cron_job(name: str) -> Any:
|
||||||
|
safe_name = quote(name, safe="")
|
||||||
|
return await _http_request("DELETE", f"/api/v1/cron/jobs/{safe_name}")
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ from app.api.tasks import router as tasks_router
|
|||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.core.logging import configure_logging
|
from app.core.logging import configure_logging
|
||||||
from app.db.session import init_db
|
from app.db.session import init_db
|
||||||
|
from app.services.cron_jobs import ensure_mission_control_cron_job
|
||||||
|
|
||||||
configure_logging()
|
configure_logging()
|
||||||
|
|
||||||
@@ -29,8 +30,9 @@ if origins:
|
|||||||
|
|
||||||
|
|
||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
def on_startup() -> None:
|
async def on_startup() -> None:
|
||||||
init_db()
|
init_db()
|
||||||
|
await ensure_mission_control_cron_job()
|
||||||
|
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
|
|||||||
65
backend/app/services/cron_jobs.py
Normal file
65
backend/app/services/cron_jobs.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from app.integrations.openclaw_gateway import (
|
||||||
|
OpenClawGatewayError,
|
||||||
|
list_cron_jobs,
|
||||||
|
upsert_cron_job,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
MISSION_CONTROL_CRON_NAME = "mission-control-runner/10m"
|
||||||
|
|
||||||
|
|
||||||
|
def _mission_control_runner_message() -> str:
|
||||||
|
return (
|
||||||
|
"You are the Mission Control Runner agent.\n\n"
|
||||||
|
"On this scheduled tick:\n"
|
||||||
|
"- Run the HEARTBEAT.md procedure for Mission Control (check-in, list boards, "
|
||||||
|
"list tasks).\n"
|
||||||
|
"- If any task is already in_progress, stop (do not claim another).\n"
|
||||||
|
"- Otherwise, find the oldest inbox task across all boards, claim it by moving "
|
||||||
|
"to in_progress.\n"
|
||||||
|
"- Execute the task fully.\n"
|
||||||
|
"- When complete, move it to review.\n"
|
||||||
|
"- If no inbox tasks exist, do nothing.\n"
|
||||||
|
"Only update Mission Control (no chat messages)."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def build_mission_control_cron_job() -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"name": MISSION_CONTROL_CRON_NAME,
|
||||||
|
"schedule": {"kind": "every", "everyMs": 600000},
|
||||||
|
"sessionTarget": "isolated",
|
||||||
|
"enabled": True,
|
||||||
|
"payload": {"kind": "agentTurn", "message": _mission_control_runner_message()},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def ensure_mission_control_cron_job() -> None:
|
||||||
|
try:
|
||||||
|
payload = await list_cron_jobs()
|
||||||
|
except OpenClawGatewayError as exc:
|
||||||
|
logger.warning("Gateway cron list failed: %s", exc)
|
||||||
|
return
|
||||||
|
|
||||||
|
jobs: list[dict[str, Any]] = []
|
||||||
|
if isinstance(payload, list):
|
||||||
|
jobs = payload
|
||||||
|
elif isinstance(payload, dict):
|
||||||
|
jobs = list(payload.get("jobs", []))
|
||||||
|
|
||||||
|
job = build_mission_control_cron_job()
|
||||||
|
if any(item.get("name") == job["name"] for item in jobs):
|
||||||
|
logger.info("Updating gateway cron job: %s", job["name"])
|
||||||
|
else:
|
||||||
|
logger.info("Creating gateway cron job: %s", job["name"])
|
||||||
|
|
||||||
|
try:
|
||||||
|
await upsert_cron_job(job)
|
||||||
|
except OpenClawGatewayError as exc:
|
||||||
|
logger.warning("Gateway cron upsert failed: %s", exc)
|
||||||
Reference in New Issue
Block a user