feat: add RQ-based webhook dispatch queue and delayed worker

This commit is contained in:
Abhimanyu Saharan
2026-02-14 06:21:50 +00:00
parent 42a41f64bc
commit b987db58b8
12 changed files with 679 additions and 23 deletions

View File

@@ -0,0 +1,3 @@
"""Webhook queueing and dispatch utilities."""
__all__ = ["dispatch", "queue", "scheduler"]

View File

@@ -0,0 +1,213 @@
"""Webhook dispatch worker routines."""
from __future__ import annotations
import asyncio
import time
from sqlalchemy import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from uuid import UUID
from app.core.config import settings
from app.core.logging import get_logger
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.webhooks.queue import QueuedWebhookDelivery, dequeue_webhook_delivery, requeue_if_failed
logger = get_logger(__name__)
def _build_payload_preview(payload_value: object) -> str:
if isinstance(payload_value, str):
return payload_value
try:
import json
return json.dumps(payload_value, indent=2, ensure_ascii=True)
except TypeError:
return str(payload_value)
def _payload_preview(payload_value: object) -> str:
preview = _build_payload_preview(payload_value)
if len(preview) <= 1600:
return preview
return f"{preview[:1597]}..."
def _webhook_message(
*,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> str:
preview = _payload_preview(payload.payload)
return (
"WEBHOOK EVENT RECEIVED\n"
f"Board: {board.name}\n"
f"Webhook ID: {webhook.id}\n"
f"Payload ID: {payload.id}\n"
f"Instruction: {webhook.description}\n\n"
"Take action:\n"
"1) Triage this payload against the webhook instruction.\n"
"2) Create/update tasks as needed.\n"
f"3) Reference payload ID {payload.id} in task descriptions.\n\n"
"Payload preview:\n"
f"{preview}\n\n"
"To inspect board memory entries:\n"
f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false"
)
async def _notify_lead(
*,
session: AsyncSession,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> None:
lead = (
await Agent.objects.filter_by(board_id=board.id)
.filter(col(Agent.is_board_lead).is_(True))
.first(session)
)
if lead is None or not lead.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
message = _webhook_message(board=board, webhook=webhook, payload=payload)
await dispatch.try_send_agent_message(
session_key=lead.openclaw_session_id,
config=config,
agent_name=lead.name,
message=message,
deliver=False,
)
async def _load_webhook_payload(
*,
session: AsyncSession,
payload_id: UUID,
webhook_id: UUID,
board_id: UUID,
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
payload = (
await session.exec(
select(BoardWebhookPayload)
.where(col(BoardWebhookPayload.id) == payload_id)
.where(col(BoardWebhookPayload.board_id) == board_id)
.where(col(BoardWebhookPayload.webhook_id) == webhook_id),
)
).first()
if payload is None:
logger.warning(
"webhook.queue.payload_missing",
extra={
"payload_id": str(payload_id),
"webhook_id": str(webhook_id),
"board_id": str(board_id),
},
)
return None
board = await Board.objects.by_id(board_id).first(session)
if board is None:
logger.warning(
"webhook.queue.board_missing",
extra={"board_id": str(board_id), "payload_id": str(payload_id)},
)
return None
webhook = (
await session.exec(
select(BoardWebhook)
.where(col(BoardWebhook.id) == webhook_id)
.where(col(BoardWebhook.board_id) == board_id),
)
).first()
if webhook is None:
logger.warning(
"webhook.queue.webhook_missing",
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
)
return None
return board, webhook, payload
async def _process_single_item(item: QueuedWebhookDelivery) -> None:
async with async_session_maker() as session:
loaded = await _load_webhook_payload(
session=session,
payload_id=item.payload_id,
webhook_id=item.webhook_id,
board_id=item.board_id,
)
if loaded is None:
return
board, webhook, payload = loaded
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
await session.commit()
async def flush_webhook_delivery_queue() -> None:
"""Consume queued webhook events and notify board leads in a throttled batch."""
processed = 0
while True:
try:
item = dequeue_webhook_delivery()
except Exception:
logger.exception("webhook.dispatch.dequeue_failed")
continue
if item is None:
break
try:
await _process_single_item(item)
processed += 1
logger.info(
"webhook.dispatch.success",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
},
)
except Exception as exc:
logger.exception(
"webhook.dispatch.failed",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
"error": str(exc),
},
)
requeue_if_failed(item)
time.sleep(settings.webhook_dispatch_throttle_seconds)
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
def run_flush_webhook_delivery_queue() -> None:
"""RQ entrypoint for running the async queue flush from worker jobs."""
logger.info(
"webhook.dispatch.batch_started",
extra={"throttle_seconds": settings.webhook_dispatch_throttle_seconds},
)
start = time.time()
asyncio.run(flush_webhook_delivery_queue())
elapsed_ms = int((time.time() - start) * 1000)
logger.info("webhook.dispatch.batch_finished", extra={"duration_ms": elapsed_ms})

View File

@@ -0,0 +1,134 @@
"""Webhook queue persistence and delivery helpers."""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from uuid import UUID
import redis
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
@dataclass(frozen=True)
class QueuedWebhookDelivery:
"""Payload metadata stored for deferred webhook lead dispatch."""
board_id: UUID
webhook_id: UUID
payload_id: UUID
payload_event: str | None
received_at: datetime
attempts: int = 0
def to_json(self) -> str:
return json.dumps(
{
"board_id": str(self.board_id),
"webhook_id": str(self.webhook_id),
"payload_id": str(self.payload_id),
"payload_event": self.payload_event,
"received_at": self.received_at.isoformat(),
"attempts": self.attempts,
},
sort_keys=True,
)
def _redis_client() -> redis.Redis:
return redis.Redis.from_url(settings.webhook_redis_url)
def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
"""Persist webhook metadata in a Redis queue for batch dispatch."""
try:
client = _redis_client()
client.lpush(settings.webhook_queue_name, payload.to_json())
logger.info(
"webhook.queue.enqueued",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempt": payload.attempts,
},
)
return True
except Exception as exc:
logger.warning(
"webhook.queue.enqueue_failed",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"error": str(exc),
},
)
return False
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
"""Pop one queued webhook delivery payload."""
client = _redis_client()
raw = client.rpop(settings.webhook_queue_name)
if raw is None:
return None
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
try:
payload: dict[str, Any] = json.loads(raw)
event = payload.get("payload_event")
if event is not None:
event = str(event)
return QueuedWebhookDelivery(
board_id=UUID(payload["board_id"]),
webhook_id=UUID(payload["webhook_id"]),
payload_id=UUID(payload["payload_id"]),
payload_event=event,
received_at=datetime.fromisoformat(payload["received_at"]),
attempts=int(payload.get("attempts", 0)),
)
except Exception as exc:
logger.error(
"webhook.queue.dequeue_failed",
extra={"raw_payload": str(raw), "error": str(exc)},
)
raise
def _requeue_with_attempt(payload: QueuedWebhookDelivery) -> None:
payload = QueuedWebhookDelivery(
board_id=payload.board_id,
webhook_id=payload.webhook_id,
payload_id=payload.payload_id,
payload_event=payload.payload_event,
received_at=payload.received_at,
attempts=payload.attempts + 1,
)
enqueue_webhook_delivery(payload)
def requeue_if_failed(payload: QueuedWebhookDelivery) -> bool:
"""Requeue payload delivery with capped retries.
Returns True if requeued.
"""
if payload.attempts >= settings.webhook_dispatch_max_retries:
logger.warning(
"webhook.queue.drop_failed_delivery",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempts": payload.attempts,
},
)
return False
_requeue_with_attempt(payload)
return True

View File

@@ -0,0 +1,30 @@
"""Webhook dispatch scheduler bootstrap for rq-scheduler."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from redis import Redis
from rq_scheduler import Scheduler
from app.core.config import settings
from app.services.webhooks import dispatch
def bootstrap_webhook_dispatch_schedule(interval_seconds: int = 900) -> None:
"""Register a recurring queue-flush job and keep it idempotent."""
connection = Redis.from_url(settings.webhook_redis_url)
scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection)
for job in scheduler.get_jobs():
if job.id == settings.webhook_dispatch_schedule_id:
scheduler.cancel(job)
scheduler.schedule(
datetime.now(tz=timezone.utc) + timedelta(seconds=5),
func=dispatch.run_flush_webhook_delivery_queue,
interval=interval_seconds,
repeat=None,
id=settings.webhook_dispatch_schedule_id,
queue_name=settings.webhook_queue_name,
)