diff --git a/backend/.env.example b/backend/.env.example index 6f4b190f..beeccab5 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -21,7 +21,17 @@ CLERK_LEEWAY=10.0 DB_AUTO_MIGRATE=false # Webhook queue / worker WEBHOOK_REDIS_URL=redis://localhost:6379/0 -WEBHOOK_QUEUE_NAME=webhook-dispatch + +# RQ queue that runs the batch dispatch job. +WEBHOOK_LEADS_RQ_QUEUE_NAME=webhook-dispatch + +# Redis list key that stores queued webhook deliveries for batching. +WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY=webhook-dispatch + +# Backwards compat (deprecated): if set, used as both queue name + list key unless the +# split settings above are explicitly set. +WEBHOOK_QUEUE_NAME= + WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0 WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900 diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 91a5603d..ec3d21e4 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -55,7 +55,17 @@ class Settings(BaseSettings): # Webhook queueing / dispatch webhook_redis_url: str = "redis://localhost:6379/0" + + # NOTE: Deprecated. Historically used for both the Redis list key *and* the RQ queue name. + # Prefer `webhook_leads_batch_redis_list_key` + `webhook_leads_rq_queue_name`. webhook_queue_name: str = "webhook-dispatch" + + # RQ queue that runs the batch dispatch job. + webhook_leads_rq_queue_name: str = "webhook-dispatch" + + # Redis list key that stores queued webhook deliveries for batching. + webhook_leads_batch_redis_list_key: str = "webhook-dispatch" + webhook_dispatch_schedule_id: str = "webhook-dispatch-batch" webhook_dispatch_throttle_seconds: float = 2.0 webhook_dispatch_schedule_interval_seconds: int = 900 @@ -70,6 +80,13 @@ class Settings(BaseSettings): @model_validator(mode="after") def _defaults(self) -> Self: + # Backwards compatibility: If WEBHOOK_QUEUE_NAME was set (legacy), and the + # newer split settings were not explicitly set, mirror it. + if "webhook_queue_name" in self.model_fields_set: + if "webhook_leads_rq_queue_name" not in self.model_fields_set: + self.webhook_leads_rq_queue_name = self.webhook_queue_name + if "webhook_leads_batch_redis_list_key" not in self.model_fields_set: + self.webhook_leads_batch_redis_list_key = self.webhook_queue_name if self.auth_mode == AuthMode.CLERK: if not self.clerk_secret_key.strip(): raise ValueError( diff --git a/backend/app/services/webhooks/dispatch.py b/backend/app/services/webhooks/dispatch.py index 9fe8632b..9df61d34 100644 --- a/backend/app/services/webhooks/dispatch.py +++ b/backend/app/services/webhooks/dispatch.py @@ -206,7 +206,7 @@ async def flush_webhook_delivery_queue() -> None: }, ) requeue_if_failed(item) - time.sleep(settings.webhook_dispatch_throttle_seconds) + await asyncio.sleep(settings.webhook_dispatch_throttle_seconds) logger.info("webhook.dispatch.batch_complete", extra={"count": processed}) diff --git a/backend/app/services/webhooks/queue.py b/backend/app/services/webhooks/queue.py index b2d37797..693f8afb 100644 --- a/backend/app/services/webhooks/queue.py +++ b/backend/app/services/webhooks/queue.py @@ -51,7 +51,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool: """Persist webhook metadata in a Redis queue for batch dispatch.""" try: client = _redis_client() - client.lpush(settings.webhook_queue_name, payload.to_json()) + client.lpush(settings.webhook_leads_batch_redis_list_key, payload.to_json()) logger.info( "webhook.queue.enqueued", extra={ @@ -78,7 +78,10 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool: def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None: """Pop one queued webhook delivery payload.""" client = _redis_client() - raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name)) + raw = cast( + str | bytes | None, + client.rpop(settings.webhook_leads_batch_redis_list_key), + ) if raw is None: return None if isinstance(raw, bytes): diff --git a/backend/app/services/webhooks/scheduler.py b/backend/app/services/webhooks/scheduler.py index d96744f3..3fa59155 100644 --- a/backend/app/services/webhooks/scheduler.py +++ b/backend/app/services/webhooks/scheduler.py @@ -14,7 +14,7 @@ from app.services.webhooks import dispatch def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None: """Register a recurring queue-flush job and keep it idempotent.""" connection = Redis.from_url(settings.webhook_redis_url) - scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection) + scheduler = Scheduler(queue_name=settings.webhook_leads_rq_queue_name, connection=connection) for job in scheduler.get_jobs(): if job.id == settings.webhook_dispatch_schedule_id: @@ -32,5 +32,5 @@ def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> interval=effective_interval_seconds, repeat=None, id=settings.webhook_dispatch_schedule_id, - queue_name=settings.webhook_queue_name, + queue_name=settings.webhook_leads_rq_queue_name, ) diff --git a/backend/tests/test_webhook_dispatch.py b/backend/tests/test_webhook_dispatch.py index 61383bc7..c58e33b1 100644 --- a/backend/tests/test_webhook_dispatch.py +++ b/backend/tests/test_webhook_dispatch.py @@ -123,7 +123,11 @@ async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest. monkeypatch.setattr(dispatch, "_process_single_item", _process) monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0) - monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: throttles.append(seconds)) + + async def _sleep(seconds: float) -> None: + throttles.append(float(seconds)) + + monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep) await dispatch.flush_webhook_delivery_queue() @@ -148,7 +152,11 @@ async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.Monk monkeypatch.setattr(dispatch, "_process_single_item", _process) monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue) monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0) - monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None) + + async def _sleep(_: float) -> None: + return None + + monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep) await dispatch.flush_webhook_delivery_queue() @@ -180,7 +188,11 @@ async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.Mo monkeypatch.setattr(dispatch, "_process_single_item", _process) monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0) - monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None) + + async def _sleep(_: float) -> None: + return None + + monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep) await dispatch.flush_webhook_delivery_queue() diff --git a/compose.yml b/compose.yml index 9d97674e..7f779bb8 100644 --- a/compose.yml +++ b/compose.yml @@ -38,6 +38,8 @@ services: AUTH_MODE: ${AUTH_MODE} LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN} WEBHOOK_REDIS_URL: redis://redis:6379/0 + WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch + WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch depends_on: db: condition: service_healthy @@ -70,6 +72,8 @@ services: build: context: . dockerfile: backend/Dockerfile + # NOTE: keep explicit queue name to preserve historical defaults. + # Can be changed by setting WEBHOOK_LEADS_RQ_QUEUE_NAME and updating this command. command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"] env_file: - ./backend/.env.example @@ -83,7 +87,8 @@ services: AUTH_MODE: ${AUTH_MODE} LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN} WEBHOOK_REDIS_URL: redis://redis:6379/0 - WEBHOOK_QUEUE_NAME: webhook-dispatch + WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch + WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch restart: unless-stopped @@ -102,7 +107,8 @@ services: - webhook-worker environment: WEBHOOK_REDIS_URL: redis://redis:6379/0 - WEBHOOK_QUEUE_NAME: webhook-dispatch + WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch + WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900 restart: unless-stopped