Fix webhook dispatch throttle + split queue/list config

This commit is contained in:
Abhimanyu Saharan
2026-02-14 17:58:50 +00:00
parent 4557fcc8ae
commit ad6ff33d16
7 changed files with 59 additions and 11 deletions

View File

@@ -21,7 +21,17 @@ CLERK_LEEWAY=10.0
DB_AUTO_MIGRATE=false
# Webhook queue / worker
WEBHOOK_REDIS_URL=redis://localhost:6379/0
WEBHOOK_QUEUE_NAME=webhook-dispatch
# RQ queue that runs the batch dispatch job.
WEBHOOK_LEADS_RQ_QUEUE_NAME=webhook-dispatch
# Redis list key that stores queued webhook deliveries for batching.
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY=webhook-dispatch
# Backwards compat (deprecated): if set, used as both queue name + list key unless the
# split settings above are explicitly set.
WEBHOOK_QUEUE_NAME=
WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0
WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900

View File

@@ -55,7 +55,17 @@ class Settings(BaseSettings):
# Webhook queueing / dispatch
webhook_redis_url: str = "redis://localhost:6379/0"
# NOTE: Deprecated. Historically used for both the Redis list key *and* the RQ queue name.
# Prefer `webhook_leads_batch_redis_list_key` + `webhook_leads_rq_queue_name`.
webhook_queue_name: str = "webhook-dispatch"
# RQ queue that runs the batch dispatch job.
webhook_leads_rq_queue_name: str = "webhook-dispatch"
# Redis list key that stores queued webhook deliveries for batching.
webhook_leads_batch_redis_list_key: str = "webhook-dispatch"
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
webhook_dispatch_throttle_seconds: float = 2.0
webhook_dispatch_schedule_interval_seconds: int = 900
@@ -70,6 +80,13 @@ class Settings(BaseSettings):
@model_validator(mode="after")
def _defaults(self) -> Self:
# Backwards compatibility: If WEBHOOK_QUEUE_NAME was set (legacy), and the
# newer split settings were not explicitly set, mirror it.
if "webhook_queue_name" in self.model_fields_set:
if "webhook_leads_rq_queue_name" not in self.model_fields_set:
self.webhook_leads_rq_queue_name = self.webhook_queue_name
if "webhook_leads_batch_redis_list_key" not in self.model_fields_set:
self.webhook_leads_batch_redis_list_key = self.webhook_queue_name
if self.auth_mode == AuthMode.CLERK:
if not self.clerk_secret_key.strip():
raise ValueError(

View File

@@ -206,7 +206,7 @@ async def flush_webhook_delivery_queue() -> None:
},
)
requeue_if_failed(item)
time.sleep(settings.webhook_dispatch_throttle_seconds)
await asyncio.sleep(settings.webhook_dispatch_throttle_seconds)
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})

View File

@@ -51,7 +51,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
"""Persist webhook metadata in a Redis queue for batch dispatch."""
try:
client = _redis_client()
client.lpush(settings.webhook_queue_name, payload.to_json())
client.lpush(settings.webhook_leads_batch_redis_list_key, payload.to_json())
logger.info(
"webhook.queue.enqueued",
extra={
@@ -78,7 +78,10 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
"""Pop one queued webhook delivery payload."""
client = _redis_client()
raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
raw = cast(
str | bytes | None,
client.rpop(settings.webhook_leads_batch_redis_list_key),
)
if raw is None:
return None
if isinstance(raw, bytes):

View File

@@ -14,7 +14,7 @@ from app.services.webhooks import dispatch
def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None:
"""Register a recurring queue-flush job and keep it idempotent."""
connection = Redis.from_url(settings.webhook_redis_url)
scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection)
scheduler = Scheduler(queue_name=settings.webhook_leads_rq_queue_name, connection=connection)
for job in scheduler.get_jobs():
if job.id == settings.webhook_dispatch_schedule_id:
@@ -32,5 +32,5 @@ def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) ->
interval=effective_interval_seconds,
repeat=None,
id=settings.webhook_dispatch_schedule_id,
queue_name=settings.webhook_queue_name,
queue_name=settings.webhook_leads_rq_queue_name,
)

View File

@@ -123,7 +123,11 @@ async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest.
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: throttles.append(seconds))
async def _sleep(seconds: float) -> None:
throttles.append(float(seconds))
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()
@@ -148,7 +152,11 @@ async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.Monk
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
async def _sleep(_: float) -> None:
return None
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()
@@ -180,7 +188,11 @@ async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.Mo
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
async def _sleep(_: float) -> None:
return None
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()

View File

@@ -38,6 +38,8 @@ services:
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
depends_on:
db:
condition: service_healthy
@@ -70,6 +72,8 @@ services:
build:
context: .
dockerfile: backend/Dockerfile
# NOTE: keep explicit queue name to preserve historical defaults.
# Can be changed by setting WEBHOOK_LEADS_RQ_QUEUE_NAME and updating this command.
command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"]
env_file:
- ./backend/.env.example
@@ -83,7 +87,8 @@ services:
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
restart: unless-stopped
@@ -102,7 +107,8 @@ services:
- webhook-worker
environment:
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900
restart: unless-stopped