7 Commits

Author SHA1 Message Date
Abhimanyu Saharan
ad6ff33d16 Fix webhook dispatch throttle + split queue/list config 2026-02-14 17:58:50 +00:00
Abhimanyu Saharan
4557fcc8ae test: add webhook dispatch worker coverage for process loop 2026-02-14 06:38:19 +00:00
Abhimanyu Saharan
554b73184a chore: decouple webhook-worker from backend service dependency 2026-02-14 06:36:46 +00:00
Abhimanyu Saharan
912387bd1c chore: centralize webhook scheduler interval config 2026-02-14 06:36:09 +00:00
Abhimanyu Saharan
d01365abfb fix: resolve mypy typing issues in webhook queue dispatch 2026-02-14 06:33:35 +00:00
Abhimanyu Saharan
e535f377ff fix: remove unused UUID import in webhook dispatch test 2026-02-14 06:30:11 +00:00
Abhimanyu Saharan
b987db58b8 feat: add RQ-based webhook dispatch queue and delayed worker 2026-02-14 06:23:55 +00:00
12 changed files with 869 additions and 23 deletions

View File

@@ -12,12 +12,27 @@ BASE_URL=
AUTH_MODE=local
# REQUIRED when AUTH_MODE=local (must be non-placeholder and at least 50 chars).
LOCAL_AUTH_TOKEN=
# Clerk (auth only; used when AUTH_MODE=clerk)
CLERK_SECRET_KEY=
CLERK_API_URL=https://api.clerk.com
CLERK_VERIFY_IAT=true
CLERK_LEEWAY=10.0
# Database
DB_AUTO_MIGRATE=false
# Webhook queue / worker
WEBHOOK_REDIS_URL=redis://localhost:6379/0
# RQ queue that runs the batch dispatch job.
WEBHOOK_LEADS_RQ_QUEUE_NAME=webhook-dispatch
# Redis list key that stores queued webhook deliveries for batching.
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY=webhook-dispatch
# Backwards compat (deprecated): if set, used as both queue name + list key unless the
# split settings above are explicitly set.
WEBHOOK_QUEUE_NAME=
WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0
WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900
WEBHOOK_DISPATCH_MAX_RETRIES=3

View File

@@ -29,6 +29,7 @@ from app.schemas.board_webhooks import (
from app.schemas.common import OkResponse
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.webhooks.queue import QueuedWebhookDelivery, enqueue_webhook_delivery
if TYPE_CHECKING:
from collections.abc import Sequence
@@ -166,6 +167,12 @@ def _captured_headers(request: Request) -> dict[str, str] | None:
return captured or None
def _extract_webhook_event(headers: dict[str, str] | None) -> str | None:
if not headers:
return None
return headers.get("x-github-event") or headers.get("x-event-type")
def _payload_preview(
value: dict[str, object] | list[object] | str | int | float | bool | None,
) -> str:
@@ -412,6 +419,7 @@ async def ingest_board_webhook(
)
content_type = request.headers.get("content-type")
headers = _captured_headers(request)
payload_value = _decode_payload(
await request.body(),
content_type=content_type,
@@ -420,7 +428,7 @@ async def ingest_board_webhook(
board_id=board.id,
webhook_id=webhook.id,
payload=payload_value,
headers=_captured_headers(request),
headers=headers,
source_ip=request.client.host if request.client else None,
content_type=content_type,
)
@@ -438,12 +446,25 @@ async def ingest_board_webhook(
)
session.add(memory)
await session.commit()
await _notify_lead_on_webhook_payload(
session=session,
board=board,
webhook=webhook,
payload=payload,
enqueued = enqueue_webhook_delivery(
QueuedWebhookDelivery(
board_id=board.id,
webhook_id=webhook.id,
payload_id=payload.id,
payload_event=_extract_webhook_event(headers),
received_at=payload.received_at,
),
)
if not enqueued:
# Preserve historical behavior by still notifying synchronously if queueing fails.
await _notify_lead_on_webhook_payload(
session=session,
board=board,
webhook=webhook,
payload=payload,
)
return BoardWebhookIngestResponse(
board_id=board.id,
webhook_id=webhook.id,

View File

@@ -53,6 +53,24 @@ class Settings(BaseSettings):
# Database lifecycle
db_auto_migrate: bool = False
# Webhook queueing / dispatch
webhook_redis_url: str = "redis://localhost:6379/0"
# NOTE: Deprecated. Historically used for both the Redis list key *and* the RQ queue name.
# Prefer `webhook_leads_batch_redis_list_key` + `webhook_leads_rq_queue_name`.
webhook_queue_name: str = "webhook-dispatch"
# RQ queue that runs the batch dispatch job.
webhook_leads_rq_queue_name: str = "webhook-dispatch"
# Redis list key that stores queued webhook deliveries for batching.
webhook_leads_batch_redis_list_key: str = "webhook-dispatch"
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
webhook_dispatch_throttle_seconds: float = 2.0
webhook_dispatch_schedule_interval_seconds: int = 900
webhook_dispatch_max_retries: int = 3
# Logging
log_level: str = "INFO"
log_format: str = "text"
@@ -62,6 +80,13 @@ class Settings(BaseSettings):
@model_validator(mode="after")
def _defaults(self) -> Self:
# Backwards compatibility: If WEBHOOK_QUEUE_NAME was set (legacy), and the
# newer split settings were not explicitly set, mirror it.
if "webhook_queue_name" in self.model_fields_set:
if "webhook_leads_rq_queue_name" not in self.model_fields_set:
self.webhook_leads_rq_queue_name = self.webhook_queue_name
if "webhook_leads_batch_redis_list_key" not in self.model_fields_set:
self.webhook_leads_batch_redis_list_key = self.webhook_queue_name
if self.auth_mode == AuthMode.CLERK:
if not self.clerk_secret_key.strip():
raise ValueError(

View File

@@ -0,0 +1,3 @@
"""Webhook queueing and dispatch utilities."""
__all__ = ["dispatch", "queue", "scheduler"]

View File

@@ -0,0 +1,222 @@
"""Webhook dispatch worker routines."""
from __future__ import annotations
import asyncio
import time
from sqlmodel.ext.asyncio.session import AsyncSession
from uuid import UUID
from app.core.config import settings
from app.core.logging import get_logger
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.webhooks.queue import (
QueuedWebhookDelivery,
dequeue_webhook_delivery,
requeue_if_failed,
)
logger = get_logger(__name__)
def _build_payload_preview(payload_value: object) -> str:
if isinstance(payload_value, str):
return payload_value
try:
import json
return json.dumps(payload_value, indent=2, ensure_ascii=True)
except TypeError:
return str(payload_value)
def _payload_preview(payload_value: object) -> str:
preview = _build_payload_preview(payload_value)
if len(preview) <= 1600:
return preview
return f"{preview[:1597]}..."
def _webhook_message(
*,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> str:
preview = _payload_preview(payload.payload)
return (
"WEBHOOK EVENT RECEIVED\n"
f"Board: {board.name}\n"
f"Webhook ID: {webhook.id}\n"
f"Payload ID: {payload.id}\n"
f"Instruction: {webhook.description}\n\n"
"Take action:\n"
"1) Triage this payload against the webhook instruction.\n"
"2) Create/update tasks as needed.\n"
f"3) Reference payload ID {payload.id} in task descriptions.\n\n"
"Payload preview:\n"
f"{preview}\n\n"
"To inspect board memory entries:\n"
f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false"
)
async def _notify_lead(
*,
session: AsyncSession,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> None:
lead = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(session)
if lead is None or not lead.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
message = _webhook_message(board=board, webhook=webhook, payload=payload)
await dispatch.try_send_agent_message(
session_key=lead.openclaw_session_id,
config=config,
agent_name=lead.name,
message=message,
deliver=False,
)
async def _load_webhook_payload(
*,
session: AsyncSession,
payload_id: UUID,
webhook_id: UUID,
board_id: UUID,
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
payload = await session.get(BoardWebhookPayload, payload_id)
if payload is None:
logger.warning(
"webhook.queue.payload_missing",
extra={
"payload_id": str(payload_id),
"webhook_id": str(webhook_id),
"board_id": str(board_id),
},
)
return None
if payload.board_id != board_id or payload.webhook_id != webhook_id:
logger.warning(
"webhook.queue.payload_mismatch",
extra={
"payload_id": str(payload_id),
"payload_webhook_id": str(payload.webhook_id),
"payload_board_id": str(payload.board_id),
},
)
return None
board = await Board.objects.by_id(board_id).first(session)
if board is None:
logger.warning(
"webhook.queue.board_missing",
extra={"board_id": str(board_id), "payload_id": str(payload_id)},
)
return None
webhook = await session.get(BoardWebhook, webhook_id)
if webhook is None:
logger.warning(
"webhook.queue.webhook_missing",
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
)
return None
if webhook.board_id != board_id:
logger.warning(
"webhook.queue.webhook_board_mismatch",
extra={
"webhook_id": str(webhook_id),
"payload_board_id": str(payload.board_id),
"expected_board_id": str(board_id),
},
)
return None
return board, webhook, payload
async def _process_single_item(item: QueuedWebhookDelivery) -> None:
async with async_session_maker() as session:
loaded = await _load_webhook_payload(
session=session,
payload_id=item.payload_id,
webhook_id=item.webhook_id,
board_id=item.board_id,
)
if loaded is None:
return
board, webhook, payload = loaded
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
await session.commit()
async def flush_webhook_delivery_queue() -> None:
"""Consume queued webhook events and notify board leads in a throttled batch."""
processed = 0
while True:
try:
item = dequeue_webhook_delivery()
except Exception:
logger.exception("webhook.dispatch.dequeue_failed")
continue
if item is None:
break
try:
await _process_single_item(item)
processed += 1
logger.info(
"webhook.dispatch.success",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
},
)
except Exception as exc:
logger.exception(
"webhook.dispatch.failed",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
"error": str(exc),
},
)
requeue_if_failed(item)
await asyncio.sleep(settings.webhook_dispatch_throttle_seconds)
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
def run_flush_webhook_delivery_queue() -> None:
"""RQ entrypoint for running the async queue flush from worker jobs."""
logger.info(
"webhook.dispatch.batch_started",
extra={"throttle_seconds": settings.webhook_dispatch_throttle_seconds},
)
start = time.time()
asyncio.run(flush_webhook_delivery_queue())
elapsed_ms = int((time.time() - start) * 1000)
logger.info("webhook.dispatch.batch_finished", extra={"duration_ms": elapsed_ms})

View File

@@ -0,0 +1,139 @@
"""Webhook queue persistence and delivery helpers."""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from uuid import UUID
from typing import cast
import redis
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
@dataclass(frozen=True)
class QueuedWebhookDelivery:
"""Payload metadata stored for deferred webhook lead dispatch."""
board_id: UUID
webhook_id: UUID
payload_id: UUID
payload_event: str | None
received_at: datetime
attempts: int = 0
def to_json(self) -> str:
return json.dumps(
{
"board_id": str(self.board_id),
"webhook_id": str(self.webhook_id),
"payload_id": str(self.payload_id),
"payload_event": self.payload_event,
"received_at": self.received_at.isoformat(),
"attempts": self.attempts,
},
sort_keys=True,
)
def _redis_client() -> redis.Redis:
return redis.Redis.from_url(settings.webhook_redis_url)
def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
"""Persist webhook metadata in a Redis queue for batch dispatch."""
try:
client = _redis_client()
client.lpush(settings.webhook_leads_batch_redis_list_key, payload.to_json())
logger.info(
"webhook.queue.enqueued",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempt": payload.attempts,
},
)
return True
except Exception as exc:
logger.warning(
"webhook.queue.enqueue_failed",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"error": str(exc),
},
)
return False
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
"""Pop one queued webhook delivery payload."""
client = _redis_client()
raw = cast(
str | bytes | None,
client.rpop(settings.webhook_leads_batch_redis_list_key),
)
if raw is None:
return None
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
try:
payload: dict[str, Any] = json.loads(raw)
event = payload.get("payload_event")
if event is not None:
event = str(event)
return QueuedWebhookDelivery(
board_id=UUID(payload["board_id"]),
webhook_id=UUID(payload["webhook_id"]),
payload_id=UUID(payload["payload_id"]),
payload_event=event,
received_at=datetime.fromisoformat(payload["received_at"]),
attempts=int(payload.get("attempts", 0)),
)
except Exception as exc:
logger.error(
"webhook.queue.dequeue_failed",
extra={"raw_payload": str(raw), "error": str(exc)},
)
raise
def _requeue_with_attempt(payload: QueuedWebhookDelivery) -> None:
payload = QueuedWebhookDelivery(
board_id=payload.board_id,
webhook_id=payload.webhook_id,
payload_id=payload.payload_id,
payload_event=payload.payload_event,
received_at=payload.received_at,
attempts=payload.attempts + 1,
)
enqueue_webhook_delivery(payload)
def requeue_if_failed(payload: QueuedWebhookDelivery) -> bool:
"""Requeue payload delivery with capped retries.
Returns True if requeued.
"""
if payload.attempts >= settings.webhook_dispatch_max_retries:
logger.warning(
"webhook.queue.drop_failed_delivery",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempts": payload.attempts,
},
)
return False
_requeue_with_attempt(payload)
return True

View File

@@ -0,0 +1,36 @@
"""Webhook dispatch scheduler bootstrap for rq-scheduler."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from redis import Redis
from rq_scheduler import Scheduler # type: ignore[import-untyped]
from app.core.config import settings
from app.services.webhooks import dispatch
def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None:
"""Register a recurring queue-flush job and keep it idempotent."""
connection = Redis.from_url(settings.webhook_redis_url)
scheduler = Scheduler(queue_name=settings.webhook_leads_rq_queue_name, connection=connection)
for job in scheduler.get_jobs():
if job.id == settings.webhook_dispatch_schedule_id:
scheduler.cancel(job)
effective_interval_seconds = (
settings.webhook_dispatch_schedule_interval_seconds
if interval_seconds is None
else interval_seconds
)
scheduler.schedule(
datetime.now(tz=timezone.utc) + timedelta(seconds=5),
func=dispatch.run_flush_webhook_delivery_queue,
interval=effective_interval_seconds,
repeat=None,
id=settings.webhook_dispatch_schedule_id,
queue_name=settings.webhook_leads_rq_queue_name,
)

View File

@@ -25,6 +25,9 @@ dependencies = [
"sse-starlette==3.2.0",
"uvicorn[standard]==0.40.0",
"websockets==16.0",
"redis==6.3.0",
"rq==2.6.0",
"rq-scheduler==0.14.0",
]
[project.optional-dependencies]

View File

@@ -23,6 +23,7 @@ from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organizations import Organization
from app.services.webhooks.queue import QueuedWebhookDelivery
async def _make_engine() -> AsyncEngine:
@@ -112,7 +113,7 @@ async def _seed_webhook(
@pytest.mark.asyncio
async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
async def test_ingest_board_webhook_stores_payload_and_enqueues_for_lead_dispatch(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
@@ -122,16 +123,23 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
expire_on_commit=False,
)
app = _build_test_app(session_maker)
enqueued: list[dict[str, object]] = []
sent_messages: list[dict[str, str]] = []
async with session_maker() as session:
board, webhook = await _seed_webhook(session, enabled=True)
async def _fake_optional_gateway_config_for_board(
self: board_webhooks.GatewayDispatchService,
_board: Board,
) -> object:
return object()
def _fake_enqueue(payload: QueuedWebhookDelivery) -> bool:
enqueued.append(
{
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempts": payload.attempts,
"event": payload.payload_event,
},
)
return True
async def _fake_try_send_agent_message(
self: board_webhooks.GatewayDispatchService,
@@ -145,7 +153,7 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
del self, config, deliver
sent_messages.append(
{
"session_key": session_key,
"session_id": session_key,
"agent_name": agent_name,
"message": message,
},
@@ -153,9 +161,9 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
return None
monkeypatch.setattr(
board_webhooks.GatewayDispatchService,
"optional_gateway_config_for_board",
_fake_optional_gateway_config_for_board,
board_webhooks,
"enqueue_webhook_delivery",
_fake_enqueue,
)
monkeypatch.setattr(
board_webhooks.GatewayDispatchService,
@@ -204,11 +212,12 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
assert f"payload:{payload_id}" in memory_items[0].tags
assert f"Payload ID: {payload_id}" in memory_items[0].content
assert len(sent_messages) == 1
assert sent_messages[0]["session_key"] == "lead:session:key"
assert "WEBHOOK EVENT RECEIVED" in sent_messages[0]["message"]
assert str(payload_id) in sent_messages[0]["message"]
assert webhook.description in sent_messages[0]["message"]
assert len(enqueued) == 1
assert enqueued[0]["board_id"] == str(board.id)
assert enqueued[0]["webhook_id"] == str(webhook.id)
assert enqueued[0]["payload_id"] == str(payload_id)
assert len(sent_messages) == 0
finally:
await engine.dispose()

View File

@@ -0,0 +1,213 @@
# ruff: noqa: INP001
"""Webhook queue and dispatch worker tests."""
from __future__ import annotations
from datetime import UTC, datetime
from uuid import UUID, uuid4
import pytest
from app.services.webhooks import dispatch
from app.services.webhooks.queue import (
QueuedWebhookDelivery,
dequeue_webhook_delivery,
enqueue_webhook_delivery,
requeue_if_failed,
)
class _FakeRedis:
def __init__(self) -> None:
self.values: list[str] = []
def lpush(self, key: str, value: str) -> None:
self.values.insert(0, value)
def rpop(self, key: str) -> str | None:
if not self.values:
return None
return self.values.pop()
@pytest.mark.parametrize("attempts", [0, 1, 2])
def test_webhook_queue_roundtrip(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None:
fake = _FakeRedis()
def _fake_redis() -> _FakeRedis:
return fake
board_id = uuid4()
webhook_id = uuid4()
payload_id = uuid4()
payload = QueuedWebhookDelivery(
board_id=board_id,
webhook_id=webhook_id,
payload_id=payload_id,
payload_event="push",
received_at=datetime.now(UTC),
attempts=attempts,
)
monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis)
assert enqueue_webhook_delivery(payload)
dequeued = dequeue_webhook_delivery()
assert dequeued is not None
assert dequeued.board_id == board_id
assert dequeued.webhook_id == webhook_id
assert dequeued.payload_id == payload_id
assert dequeued.payload_event == "push"
assert dequeued.attempts == attempts
@pytest.mark.parametrize("attempts", [0, 1, 2, 3])
def test_requeue_respects_retry_cap(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None:
fake = _FakeRedis()
def _fake_redis() -> _FakeRedis:
return fake
monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis)
payload = QueuedWebhookDelivery(
board_id=uuid4(),
webhook_id=uuid4(),
payload_id=uuid4(),
payload_event="push",
received_at=datetime.now(UTC),
attempts=attempts,
)
if attempts >= 3:
assert requeue_if_failed(payload) is False
assert fake.values == []
else:
assert requeue_if_failed(payload) is True
requeued = dequeue_webhook_delivery()
assert requeued is not None
assert requeued.attempts == attempts + 1
class _FakeQueuedItem:
def __init__(self, attempts: int = 0) -> None:
self.payload_id = uuid4()
self.webhook_id = uuid4()
self.board_id = uuid4()
self.attempts = attempts
def _patch_dequeue(monkeypatch: pytest.MonkeyPatch, items: list[QueuedWebhookDelivery | None]) -> None:
def _dequeue() -> QueuedWebhookDelivery | None:
if not items:
return None
return items.pop(0)
monkeypatch.setattr(dispatch, "dequeue_webhook_delivery", _dequeue)
@pytest.mark.asyncio
async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest.MonkeyPatch) -> None:
items: list[QueuedWebhookDelivery | None] = [
_FakeQueuedItem(),
_FakeQueuedItem(),
None,
]
_patch_dequeue(monkeypatch, items)
processed: list[UUID] = []
throttles: list[float] = []
async def _process(item: QueuedWebhookDelivery) -> None:
processed.append(item.payload_id)
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
async def _sleep(seconds: float) -> None:
throttles.append(float(seconds))
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()
assert len(processed) == 2
assert throttles == [0.0, 0.0]
@pytest.mark.asyncio
async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.MonkeyPatch) -> None:
item = _FakeQueuedItem()
_patch_dequeue(monkeypatch, [item, None])
async def _process(_: QueuedWebhookDelivery) -> None:
raise RuntimeError("boom")
requeued: list[QueuedWebhookDelivery] = []
def _requeue(payload: QueuedWebhookDelivery) -> bool:
requeued.append(payload)
return True
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
async def _sleep(_: float) -> None:
return None
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()
assert len(requeued) == 1
assert requeued[0].payload_id == item.payload_id
@pytest.mark.asyncio
async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.MonkeyPatch) -> None:
item = _FakeQueuedItem()
call_count = 0
def _dequeue() -> QueuedWebhookDelivery | None:
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("dequeue broken")
if call_count == 2:
return item
return None
monkeypatch.setattr(dispatch, "dequeue_webhook_delivery", _dequeue)
processed = 0
async def _process(_: QueuedWebhookDelivery) -> None:
nonlocal processed
processed += 1
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
async def _sleep(_: float) -> None:
return None
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()
assert call_count == 3
assert processed == 1
def test_dispatch_run_entrypoint_calls_async_flush(monkeypatch: pytest.MonkeyPatch) -> None:
called: list[bool] = []
async def _flush() -> None:
called.append(True)
monkeypatch.setattr(dispatch, "flush_webhook_delivery_queue", _flush)
dispatch.run_flush_webhook_delivery_queue()
assert called == [True]

105
backend/uv.lock generated
View File

@@ -279,6 +279,25 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0d/4a/331fe2caf6799d591109bb9c08083080f6de90a823695d412a935622abb2/coverage-7.13.4-py3-none-any.whl", hash = "sha256:1af1641e57cf7ba1bd67d677c9abdbcd6cc2ab7da3bca7fa1e2b7e50e65f2ad0", size = 211242, upload-time = "2026-02-09T12:59:02.032Z" },
]
[[package]]
name = "croniter"
version = "6.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "python-dateutil" },
{ name = "pytz" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ad/2f/44d1ae153a0e27be56be43465e5cb39b9650c781e001e7864389deb25090/croniter-6.0.0.tar.gz", hash = "sha256:37c504b313956114a983ece2c2b07790b1f1094fe9d81cc94739214748255577", size = 64481, upload-time = "2024-12-17T17:17:47.32Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/4b/290b4c3efd6417a8b0c284896de19b1d5855e6dbdb97d2a35e68fa42de85/croniter-6.0.0-py2.py3-none-any.whl", hash = "sha256:2f878c3856f17896979b2a4379ba1f09c83e374931ea15cc835c5dd2eee9b368", size = 25468, upload-time = "2024-12-17T17:17:45.359Z" },
]
[[package]]
name = "crontab"
version = "1.0.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/36/a255b6f5a2e22df03fd2b2f3088974b44b8c9e9407e26b44742cb7cfbf5b/crontab-1.0.5.tar.gz", hash = "sha256:f80e01b4f07219763a9869f926dd17147278e7965a928089bca6d3dc80ae46d5", size = 21963, upload-time = "2025-07-09T17:09:38.264Z" }
[[package]]
name = "cryptography"
version = "45.0.7"
@@ -358,6 +377,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9f/56/13ab06b4f93ca7cac71078fbe37fcea175d3216f31f85c3168a6bbd0bb9a/flake8-7.3.0-py2.py3-none-any.whl", hash = "sha256:b9696257b9ce8beb888cdbe31cf885c90d31928fe202be0889a7cdafad32f01e", size = 57922, upload-time = "2025-06-20T19:31:34.425Z" },
]
[[package]]
name = "freezegun"
version = "1.5.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "python-dateutil" },
]
sdist = { url = "https://files.pythonhosted.org/packages/95/dd/23e2f4e357f8fd3bdff613c1fe4466d21bfb00a6177f238079b17f7b1c84/freezegun-1.5.5.tar.gz", hash = "sha256:ac7742a6cc6c25a2c35e9292dfd554b897b517d2dec26891a2e8debf205cb94a", size = 35914, upload-time = "2025-08-09T10:39:08.338Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5e/2e/b41d8a1a917d6581fc27a35d05561037b048e47df50f27f8ac9c7e27a710/freezegun-1.5.5-py3-none-any.whl", hash = "sha256:cd557f4a75cf074e84bc374249b9dd491eaeacd61376b9eb3c423282211619d2", size = 19266, upload-time = "2025-08-09T10:39:06.636Z" },
]
[[package]]
name = "greenlet"
version = "3.3.1"
@@ -697,6 +728,9 @@ dependencies = [
{ name = "psycopg", extra = ["binary"] },
{ name = "pydantic-settings" },
{ name = "python-dotenv" },
{ name = "redis" },
{ name = "rq" },
{ name = "rq-scheduler" },
{ name = "sqlalchemy", extra = ["asyncio"] },
{ name = "sqlmodel" },
{ name = "sse-starlette" },
@@ -739,6 +773,9 @@ requires-dist = [
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = "==1.3.0" },
{ name = "pytest-cov", marker = "extra == 'dev'", specifier = "==7.0.0" },
{ name = "python-dotenv", specifier = "==1.2.1" },
{ name = "redis", specifier = "==6.3.0" },
{ name = "rq", specifier = "==2.6.0" },
{ name = "rq-scheduler", specifier = "==0.14.0" },
{ name = "ruff", marker = "extra == 'dev'", specifier = "==0.15.0" },
{ name = "sqlalchemy", extras = ["asyncio"], specifier = "==2.0.46" },
{ name = "sqlmodel", specifier = "==0.0.32" },
@@ -1030,6 +1067,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" },
]
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "six" },
]
sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432, upload-time = "2024-03-01T18:36:20.211Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" },
]
[[package]]
name = "python-dotenv"
version = "1.2.1"
@@ -1068,6 +1117,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c6/78/397db326746f0a342855b81216ae1f0a32965deccfd7c830a2dbc66d2483/pytokens-0.4.1-py3-none-any.whl", hash = "sha256:26cef14744a8385f35d0e095dc8b3a7583f6c953c2e3d269c7f82484bf5ad2de", size = 13729, upload-time = "2026-01-30T01:03:45.029Z" },
]
[[package]]
name = "pytz"
version = "2025.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f8/bf/abbd3cdfb8fbc7fb3d4d38d320f2441b1e7cbe29be4f23797b4a2b5d8aac/pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3", size = 320884, upload-time = "2025-03-25T02:25:00.538Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225, upload-time = "2025-03-25T02:24:58.468Z" },
]
[[package]]
name = "pyyaml"
version = "6.0.3"
@@ -1114,6 +1172,44 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" },
]
[[package]]
name = "redis"
version = "6.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/21/cd/030274634a1a052b708756016283ea3d84e91ae45f74d7f5dcf55d753a0f/redis-6.3.0.tar.gz", hash = "sha256:3000dbe532babfb0999cdab7b3e5744bcb23e51923febcfaeb52c8cfb29632ef", size = 4647275, upload-time = "2025-08-05T08:12:31.648Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/df/a7/2fe45801534a187543fc45d28b3844d84559c1589255bc2ece30d92dc205/redis-6.3.0-py3-none-any.whl", hash = "sha256:92f079d656ded871535e099080f70fab8e75273c0236797126ac60242d638e9b", size = 280018, upload-time = "2025-08-05T08:12:30.093Z" },
]
[[package]]
name = "rq"
version = "2.6.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "croniter" },
{ name = "redis" },
]
sdist = { url = "https://files.pythonhosted.org/packages/8e/f5/46e39abc46ff6ff4f3151ee4fd2c1bf7601a8d26bd30fd951c5496b1e6c6/rq-2.6.0.tar.gz", hash = "sha256:92ad55676cda14512c4eea5782f398a102dc3af108bea197c868c4c50c5d3e81", size = 675315, upload-time = "2025-09-06T03:15:12.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cc/66/6cf141584526e3ed5b57a194e09cbdf7058334bd3926bb3f96e2453cf053/rq-2.6.0-py3-none-any.whl", hash = "sha256:be5ccc0f0fc5f32da0999648340e31476368f08067f0c3fce6768d00064edbb5", size = 112533, upload-time = "2025-09-06T03:15:09.894Z" },
]
[[package]]
name = "rq-scheduler"
version = "0.14.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "crontab" },
{ name = "freezegun" },
{ name = "python-dateutil" },
{ name = "rq" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a0/4e/977bbcc1f3b25ed9ea60ec968b13f7147661defe5b2f9272b44fdb1c5549/rq-scheduler-0.14.0.tar.gz", hash = "sha256:2d5a14a1ab217f8693184ebaa1fe03838edcbc70b4f76572721c0b33058cd023", size = 16582, upload-time = "2024-10-29T13:30:32.641Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bb/d0/28cedca9f3b321f30e69d644c2dcd7097ec21570ec9606fde56750621300/rq_scheduler-0.14.0-py2.py3-none-any.whl", hash = "sha256:d4ec221a3d8c11b3ff55e041f09d9af1e17f3253db737b6b97e86ab20fc3dc0d", size = 13874, upload-time = "2024-10-29T13:30:30.449Z" },
]
[[package]]
name = "ruff"
version = "0.15.0"
@@ -1139,6 +1235,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f6/b0/2d823f6e77ebe560f4e397d078487e8d52c1516b331e3521bc75db4272ca/ruff-0.15.0-py3-none-win_arm64.whl", hash = "sha256:c480d632cc0ca3f0727acac8b7d053542d9e114a462a145d0b00e7cd658c515a", size = 10865753, upload-time = "2026-02-03T17:53:03.014Z" },
]
[[package]]
name = "six"
version = "1.17.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" },
]
[[package]]
name = "sqlalchemy"
version = "2.0.46"

View File

@@ -17,6 +17,11 @@ services:
timeout: 3s
retries: 20
redis:
image: redis:7-alpine
ports:
- "${REDIS_PORT:-6379}:6379"
backend:
build:
# Build from repo root so the backend image can include repo-level assets
@@ -32,9 +37,14 @@ services:
DB_AUTO_MIGRATE: ${DB_AUTO_MIGRATE:-true}
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
ports:
- "${BACKEND_PORT:-8000}:8000"
@@ -58,5 +68,50 @@ services:
ports:
- "${FRONTEND_PORT:-3000}:3000"
webhook-worker:
build:
context: .
dockerfile: backend/Dockerfile
# NOTE: keep explicit queue name to preserve historical defaults.
# Can be changed by setting WEBHOOK_LEADS_RQ_QUEUE_NAME and updating this command.
command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"]
env_file:
- ./backend/.env.example
depends_on:
redis:
condition: service_started
db:
condition: service_healthy
environment:
DATABASE_URL: postgresql+psycopg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-mission_control}
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
restart: unless-stopped
webhook-dispatch-cron:
build:
context: .
dockerfile: backend/Dockerfile
command:
- sh
- -c
- |
python -c "from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule; bootstrap_webhook_dispatch_schedule()" && \
rqscheduler -u redis://redis:6379/0 -i 60
depends_on:
- redis
- webhook-worker
environment:
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900
restart: unless-stopped
volumes:
postgres_data: