Files
openclaw-mission-control/backend/app/services/webhooks/dispatch.py

335 lines
10 KiB
Python
Raw Normal View History

"""Webhook dispatch worker routines."""
from __future__ import annotations
import asyncio
import time
from sqlmodel.ext.asyncio.session import AsyncSession
from uuid import UUID
from app.core.config import settings
from app.core.logging import get_logger
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.webhooks.queue import (
QueuedWebhookDelivery,
dequeue_webhook_delivery,
requeue_if_failed,
)
logger = get_logger(__name__)
_ROUTINE_GITHUB_EVENTS = frozenset({"check_run", "check_suite", "workflow_run"})
_SUCCESS_GITHUB_CONCLUSIONS = frozenset({None, "success", "neutral", "skipped"})
# Consider these actionable enough to page the lead / surface in task threads.
_ACTIONABLE_GITHUB_CONCLUSIONS = frozenset(
{
"failure",
"cancelled",
"timed_out",
"action_required",
"stale",
"startup_failure",
},
)
def _as_dict(value: object) -> dict[str, object] | None:
if isinstance(value, dict):
# Keep only string keys; payloads can include non-str keys in edge cases.
normalized: dict[str, object] = {}
for k, v in value.items():
if isinstance(k, str):
normalized[k] = v
return normalized
return None
def _str_or_none(value: object) -> str | None:
if value is None:
return None
if isinstance(value, str):
return value
return str(value)
def _extract_github_conclusion(payload: dict[str, object], *, key: str) -> str | None:
container = _as_dict(payload.get(key))
if not container:
return None
return _str_or_none(container.get("conclusion"))
def _extract_github_status(payload: dict[str, object], *, key: str) -> str | None:
container = _as_dict(payload.get(key))
if not container:
return None
return _str_or_none(container.get("status"))
def _should_suppress_routine_delivery(
*,
payload_event: str | None,
payload_value: object,
) -> bool:
"""Return True if this delivery is routine noise and should not notify leads.
This intentionally only targets high-volume GitHub CI telemetry events.
We still persist the webhook payload + board memory entry for audit/debug.
"""
if not settings.webhook_dispatch_suppress_routine_events:
return False
if payload_event not in _ROUTINE_GITHUB_EVENTS:
return False
payload = _as_dict(payload_value)
if payload is None:
return False
action = _str_or_none(payload.get("action"))
# If GitHub hasn't marked it completed, it's almost always noise.
if action and action != "completed":
return True
if payload_event == "workflow_run":
status = _extract_github_status(payload, key="workflow_run")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="workflow_run")
elif payload_event == "check_run":
status = _extract_github_status(payload, key="check_run")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="check_run")
else: # check_suite
status = _extract_github_status(payload, key="check_suite")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="check_suite")
if conclusion in _SUCCESS_GITHUB_CONCLUSIONS:
return True
# Only page on explicitly non-success conclusions.
return conclusion not in _ACTIONABLE_GITHUB_CONCLUSIONS
def _build_payload_preview(payload_value: object) -> str:
if isinstance(payload_value, str):
return payload_value
try:
import json
return json.dumps(payload_value, indent=2, ensure_ascii=True)
except TypeError:
return str(payload_value)
def _payload_preview(payload_value: object) -> str:
preview = _build_payload_preview(payload_value)
if len(preview) <= 1600:
return preview
return f"{preview[:1597]}..."
def _webhook_message(
*,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> str:
preview = _payload_preview(payload.payload)
return (
"WEBHOOK EVENT RECEIVED\n"
f"Board: {board.name}\n"
f"Webhook ID: {webhook.id}\n"
f"Payload ID: {payload.id}\n"
f"Instruction: {webhook.description}\n\n"
"Take action:\n"
"1) Triage this payload against the webhook instruction.\n"
"2) Create/update tasks as needed.\n"
f"3) Reference payload ID {payload.id} in task descriptions.\n\n"
"Payload preview:\n"
f"{preview}\n\n"
"To inspect board memory entries:\n"
f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false"
)
async def _notify_lead(
*,
session: AsyncSession,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> None:
lead = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(session)
if lead is None or not lead.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
message = _webhook_message(board=board, webhook=webhook, payload=payload)
await dispatch.try_send_agent_message(
session_key=lead.openclaw_session_id,
config=config,
agent_name=lead.name,
message=message,
deliver=False,
)
async def _load_webhook_payload(
*,
session: AsyncSession,
payload_id: UUID,
webhook_id: UUID,
board_id: UUID,
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
payload = await session.get(BoardWebhookPayload, payload_id)
if payload is None:
logger.warning(
"webhook.queue.payload_missing",
extra={
"payload_id": str(payload_id),
"webhook_id": str(webhook_id),
"board_id": str(board_id),
},
)
return None
if payload.board_id != board_id or payload.webhook_id != webhook_id:
logger.warning(
"webhook.queue.payload_mismatch",
extra={
"payload_id": str(payload_id),
"payload_webhook_id": str(payload.webhook_id),
"payload_board_id": str(payload.board_id),
},
)
return None
board = await Board.objects.by_id(board_id).first(session)
if board is None:
logger.warning(
"webhook.queue.board_missing",
extra={"board_id": str(board_id), "payload_id": str(payload_id)},
)
return None
webhook = await session.get(BoardWebhook, webhook_id)
if webhook is None:
logger.warning(
"webhook.queue.webhook_missing",
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
)
return None
if webhook.board_id != board_id:
logger.warning(
"webhook.queue.webhook_board_mismatch",
extra={
"webhook_id": str(webhook_id),
"payload_board_id": str(payload.board_id),
"expected_board_id": str(board_id),
},
)
return None
return board, webhook, payload
async def _process_single_item(item: QueuedWebhookDelivery) -> None:
async with async_session_maker() as session:
loaded = await _load_webhook_payload(
session=session,
payload_id=item.payload_id,
webhook_id=item.webhook_id,
board_id=item.board_id,
)
if loaded is None:
return
board, webhook, payload = loaded
if _should_suppress_routine_delivery(
payload_event=item.payload_event,
payload_value=payload.payload,
):
logger.info(
"webhook.dispatch.suppressed_routine",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"event": item.payload_event,
"attempt": item.attempts,
},
)
return
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
await session.commit()
async def flush_webhook_delivery_queue() -> None:
"""Consume queued webhook events and notify board leads in a throttled batch."""
processed = 0
while True:
try:
item = dequeue_webhook_delivery()
except Exception:
logger.exception("webhook.dispatch.dequeue_failed")
continue
if item is None:
break
try:
await _process_single_item(item)
processed += 1
logger.info(
"webhook.dispatch.success",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
},
)
except Exception as exc:
logger.exception(
"webhook.dispatch.failed",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
"error": str(exc),
},
)
requeue_if_failed(item)
time.sleep(settings.webhook_dispatch_throttle_seconds)
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
def run_flush_webhook_delivery_queue() -> None:
"""RQ entrypoint for running the async queue flush from worker jobs."""
logger.info(
"webhook.dispatch.batch_started",
extra={"throttle_seconds": settings.webhook_dispatch_throttle_seconds},
)
start = time.time()
asyncio.run(flush_webhook_delivery_queue())
elapsed_ms = int((time.time() - start) * 1000)
logger.info("webhook.dispatch.batch_finished", extra={"duration_ms": elapsed_ms})