diff --git a/backend/app/services/webhooks/__init__.py b/backend/app/services/webhooks/__init__.py index d4c7fc60..38aefa4b 100644 --- a/backend/app/services/webhooks/__init__.py +++ b/backend/app/services/webhooks/__init__.py @@ -1,3 +1,22 @@ -"""Webhook queueing and dispatch utilities.""" +"""Webhook queueing + dispatch utilities. -__all__ = ["dispatch", "queue", "scheduler"] +Prefer importing from this package when used by other modules. +""" + +from app.services.webhooks.dispatch import run_flush_webhook_delivery_queue +from app.services.webhooks.queue import ( + QueuedWebhookDelivery, + dequeue_webhook_delivery, + enqueue_webhook_delivery, + requeue_if_failed, +) +from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule + +__all__ = [ + "QueuedWebhookDelivery", + "bootstrap_webhook_dispatch_schedule", + "dequeue_webhook_delivery", + "enqueue_webhook_delivery", + "requeue_if_failed", + "run_flush_webhook_delivery_queue", +] diff --git a/backend/app/services/webhooks/scheduler.py b/backend/app/services/webhooks/scheduler.py index d96744f3..df24c9ba 100644 --- a/backend/app/services/webhooks/scheduler.py +++ b/backend/app/services/webhooks/scheduler.py @@ -1,24 +1,34 @@ -"""Webhook dispatch scheduler bootstrap for rq-scheduler.""" +"""Webhook dispatch scheduler bootstrap for rq-scheduler. + +This module is typically run once at container start to ensure the recurring +job exists (idempotent registration). +""" from __future__ import annotations +import time from datetime import datetime, timedelta, timezone from redis import Redis from rq_scheduler import Scheduler # type: ignore[import-untyped] from app.core.config import settings -from app.services.webhooks import dispatch +from app.core.logging import get_logger +from app.services.webhooks.dispatch import run_flush_webhook_delivery_queue + +logger = get_logger(__name__) -def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None: - """Register a recurring queue-flush job and keep it idempotent.""" - connection = Redis.from_url(settings.webhook_redis_url) - scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection) +def bootstrap_webhook_dispatch_schedule( + interval_seconds: int | None = None, + *, + max_attempts: int = 5, + retry_sleep_seconds: float = 1.0, +) -> None: + """Register a recurring queue-flush job and keep it idempotent. - for job in scheduler.get_jobs(): - if job.id == settings.webhook_dispatch_schedule_id: - scheduler.cancel(job) + Retries Redis connectivity to avoid crashing on transient startup ordering. + """ effective_interval_seconds = ( settings.webhook_dispatch_schedule_interval_seconds @@ -26,11 +36,48 @@ def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> else interval_seconds ) - scheduler.schedule( - datetime.now(tz=timezone.utc) + timedelta(seconds=5), - func=dispatch.run_flush_webhook_delivery_queue, - interval=effective_interval_seconds, - repeat=None, - id=settings.webhook_dispatch_schedule_id, - queue_name=settings.webhook_queue_name, - ) + last_exc: Exception | None = None + for attempt in range(1, max_attempts + 1): + try: + connection = Redis.from_url(settings.webhook_redis_url) + connection.ping() + scheduler = Scheduler( + queue_name=settings.webhook_queue_name, + connection=connection, + ) + + for job in scheduler.get_jobs(): + if job.id == settings.webhook_dispatch_schedule_id: + scheduler.cancel(job) + + scheduler.schedule( + datetime.now(tz=timezone.utc) + timedelta(seconds=5), + func=run_flush_webhook_delivery_queue, + interval=effective_interval_seconds, + repeat=None, + id=settings.webhook_dispatch_schedule_id, + queue_name=settings.webhook_queue_name, + ) + logger.info( + "webhook.scheduler.bootstrapped", + extra={ + "schedule_id": settings.webhook_dispatch_schedule_id, + "queue_name": settings.webhook_queue_name, + "interval_seconds": effective_interval_seconds, + }, + ) + return + except Exception as exc: + last_exc = exc + logger.warning( + "webhook.scheduler.bootstrap_failed", + extra={ + "attempt": attempt, + "max_attempts": max_attempts, + "error": str(exc), + }, + ) + if attempt < max_attempts: + time.sleep(retry_sleep_seconds * attempt) + + raise RuntimeError("Failed to bootstrap webhook dispatch schedule") from last_exc diff --git a/compose.yml b/compose.yml index 9d97674e..12bc95d5 100644 --- a/compose.yml +++ b/compose.yml @@ -104,7 +104,7 @@ services: WEBHOOK_REDIS_URL: redis://redis:6379/0 WEBHOOK_QUEUE_NAME: webhook-dispatch WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch - WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900 + WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: ${WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS:-900} restart: unless-stopped volumes: