diff --git a/backend/.env.example b/backend/.env.example index 6f4b190f..4d1cd99e 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -19,10 +19,8 @@ CLERK_VERIFY_IAT=true CLERK_LEEWAY=10.0 # Database DB_AUTO_MIGRATE=false -# Webhook queue / worker -WEBHOOK_REDIS_URL=redis://localhost:6379/0 -WEBHOOK_QUEUE_NAME=webhook-dispatch -WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0 -WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch -WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900 -WEBHOOK_DISPATCH_MAX_RETRIES=3 +# Generic RQ queue / dispatch settings +RQ_REDIS_URL=redis://localhost:6379/0 +RQ_QUEUE_NAME=default +RQ_DISPATCH_THROTTLE_SECONDS=2.0 +RQ_DISPATCH_MAX_RETRIES=3 diff --git a/backend/app/api/board_webhooks.py b/backend/app/api/board_webhooks.py index a0318960..c4515528 100644 --- a/backend/app/api/board_webhooks.py +++ b/backend/app/api/board_webhooks.py @@ -29,7 +29,7 @@ from app.schemas.board_webhooks import ( from app.schemas.common import OkResponse from app.schemas.pagination import DefaultLimitOffsetPage from app.services.openclaw.gateway_dispatch import GatewayDispatchService -from app.services.webhooks.queue import QueuedWebhookDelivery, enqueue_webhook_delivery +from app.services.webhooks.queue import QueuedInboundDelivery, enqueue_webhook_delivery if TYPE_CHECKING: from collections.abc import Sequence @@ -167,12 +167,6 @@ def _captured_headers(request: Request) -> dict[str, str] | None: return captured or None -def _extract_webhook_event(headers: dict[str, str] | None) -> str | None: - if not headers: - return None - return headers.get("x-github-event") or headers.get("x-event-type") - - def _payload_preview( value: dict[str, object] | list[object] | str | int | float | bool | None, ) -> str: @@ -448,11 +442,10 @@ async def ingest_board_webhook( await session.commit() enqueued = enqueue_webhook_delivery( - QueuedWebhookDelivery( + QueuedInboundDelivery( board_id=board.id, webhook_id=webhook.id, payload_id=payload.id, - payload_event=_extract_webhook_event(headers), received_at=payload.received_at, ), ) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 91a5603d..e78bee95 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -53,13 +53,11 @@ class Settings(BaseSettings): # Database lifecycle db_auto_migrate: bool = False - # Webhook queueing / dispatch - webhook_redis_url: str = "redis://localhost:6379/0" - webhook_queue_name: str = "webhook-dispatch" - webhook_dispatch_schedule_id: str = "webhook-dispatch-batch" - webhook_dispatch_throttle_seconds: float = 2.0 - webhook_dispatch_schedule_interval_seconds: int = 900 - webhook_dispatch_max_retries: int = 3 + # RQ queueing / dispatch + rq_redis_url: str = "redis://localhost:6379/0" + rq_queue_name: str = "default" + rq_dispatch_throttle_seconds: float = 2.0 + rq_dispatch_max_retries: int = 3 # Logging log_level: str = "INFO" diff --git a/backend/app/services/queue.py b/backend/app/services/queue.py new file mode 100644 index 00000000..37105658 --- /dev/null +++ b/backend/app/services/queue.py @@ -0,0 +1,143 @@ +"""Generic Redis-backed queue helpers for RQ-backed background workloads.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import UTC, datetime +from typing import Any, cast + +import redis + +from app.core.config import settings +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +@dataclass(frozen=True) +class QueuedTask: + """Generic queued task envelope.""" + + task_type: str + payload: dict[str, Any] + created_at: datetime + attempts: int = 0 + + def to_json(self) -> str: + return json.dumps( + { + "task_type": self.task_type, + "payload": self.payload, + "created_at": self.created_at.isoformat(), + "attempts": self.attempts, + }, + sort_keys=True, + ) + + +def _redis_client(redis_url: str | None = None) -> redis.Redis: + return redis.Redis.from_url(redis_url or settings.rq_redis_url) + + +def enqueue_task(task: QueuedTask, queue_name: str, *, redis_url: str | None = None) -> bool: + """Persist a task envelope in a Redis list-backed queue.""" + try: + client = _redis_client(redis_url=redis_url) + client.lpush(queue_name, task.to_json()) + logger.info( + "rq.queue.enqueued", + extra={ + "task_type": task.task_type, + "queue_name": queue_name, + "attempt": task.attempts, + }, + ) + return True + except Exception as exc: + logger.warning( + "rq.queue.enqueue_failed", + extra={"task_type": task.task_type, "queue_name": queue_name, "error": str(exc)}, + ) + return False + + +def _coerce_datetime(raw: object | None) -> datetime: + if raw is None: + return datetime.now(UTC) + if isinstance(raw, str): + try: + return datetime.fromisoformat(raw) + except ValueError: + return datetime.now(UTC) + if isinstance(raw, (int, float)): + try: + return datetime.fromtimestamp(raw, tz=UTC) + except (TypeError, ValueError, OverflowError): + return datetime.now(UTC) + return datetime.now(UTC) + + +def dequeue_task(queue_name: str, *, redis_url: str | None = None) -> QueuedTask | None: + """Pop one task envelope from the queue.""" + client = _redis_client(redis_url=redis_url) + raw = cast(str | bytes | None, client.rpop(queue_name)) + if raw is None: + return None + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + + try: + payload: dict[str, Any] = json.loads(raw) + if "task_type" not in payload and "payload" not in payload: + return QueuedTask( + task_type="legacy", + payload=payload, + created_at=_coerce_datetime(payload.get("created_at") or payload.get("received_at")), + attempts=int(payload.get("attempts", 0)), + ) + return QueuedTask( + task_type=str(payload["task_type"]), + payload=payload["payload"], + created_at=datetime.fromisoformat(payload["created_at"]), + attempts=int(payload.get("attempts", 0)), + ) + except Exception as exc: + logger.error( + "rq.queue.dequeue_failed", + extra={"queue_name": queue_name, "raw_payload": str(raw), "error": str(exc)}, + ) + raise + + +def _requeue_with_attempt(task: QueuedTask) -> QueuedTask: + return QueuedTask( + task_type=task.task_type, + payload=task.payload, + created_at=task.created_at, + attempts=task.attempts + 1, + ) + + +def requeue_if_failed( + task: QueuedTask, + queue_name: str, + *, + max_retries: int, + redis_url: str | None = None, +) -> bool: + """Requeue a failed task with capped retries. + + Returns True if requeued. + """ + if task.attempts >= max_retries: + logger.warning( + "rq.queue.drop_failed_task", + extra={ + "task_type": task.task_type, + "queue_name": queue_name, + "attempts": task.attempts, + }, + ) + return False + return enqueue_task(_requeue_with_attempt(task), queue_name, redis_url=redis_url) diff --git a/backend/app/services/webhooks/__init__.py b/backend/app/services/webhooks/__init__.py index 38aefa4b..1b7ed538 100644 --- a/backend/app/services/webhooks/__init__.py +++ b/backend/app/services/webhooks/__init__.py @@ -5,16 +5,14 @@ Prefer importing from this package when used by other modules. from app.services.webhooks.dispatch import run_flush_webhook_delivery_queue from app.services.webhooks.queue import ( - QueuedWebhookDelivery, + QueuedInboundDelivery, dequeue_webhook_delivery, enqueue_webhook_delivery, requeue_if_failed, ) -from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule __all__ = [ - "QueuedWebhookDelivery", - "bootstrap_webhook_dispatch_schedule", + "QueuedInboundDelivery", "dequeue_webhook_delivery", "enqueue_webhook_delivery", "requeue_if_failed", diff --git a/backend/app/services/webhooks/dispatch.py b/backend/app/services/webhooks/dispatch.py index 9fe8632b..f57a681f 100644 --- a/backend/app/services/webhooks/dispatch.py +++ b/backend/app/services/webhooks/dispatch.py @@ -17,7 +17,7 @@ from app.models.board_webhooks import BoardWebhook from app.models.boards import Board from app.services.openclaw.gateway_dispatch import GatewayDispatchService from app.services.webhooks.queue import ( - QueuedWebhookDelivery, + QueuedInboundDelivery, dequeue_webhook_delivery, requeue_if_failed, ) @@ -153,7 +153,7 @@ async def _load_webhook_payload( return board, webhook, payload -async def _process_single_item(item: QueuedWebhookDelivery) -> None: +async def _process_single_item(item: QueuedInboundDelivery) -> None: async with async_session_maker() as session: loaded = await _load_webhook_payload( session=session, @@ -206,7 +206,7 @@ async def flush_webhook_delivery_queue() -> None: }, ) requeue_if_failed(item) - time.sleep(settings.webhook_dispatch_throttle_seconds) + time.sleep(settings.rq_dispatch_throttle_seconds) logger.info("webhook.dispatch.batch_complete", extra={"count": processed}) @@ -214,7 +214,7 @@ def run_flush_webhook_delivery_queue() -> None: """RQ entrypoint for running the async queue flush from worker jobs.""" logger.info( "webhook.dispatch.batch_started", - extra={"throttle_seconds": settings.webhook_dispatch_throttle_seconds}, + extra={"throttle_seconds": settings.rq_dispatch_throttle_seconds}, ) start = time.time() asyncio.run(flush_webhook_delivery_queue()) diff --git a/backend/app/services/webhooks/queue.py b/backend/app/services/webhooks/queue.py index b2d37797..f978c607 100644 --- a/backend/app/services/webhooks/queue.py +++ b/backend/app/services/webhooks/queue.py @@ -2,56 +2,73 @@ from __future__ import annotations -import json from dataclasses import dataclass -from datetime import datetime +from datetime import UTC, datetime from typing import Any from uuid import UUID -from typing import cast - -import redis - from app.core.config import settings from app.core.logging import get_logger +from app.services.queue import QueuedTask, dequeue_task, enqueue_task, requeue_if_failed as generic_requeue_if_failed logger = get_logger(__name__) +TASK_TYPE = "webhook_delivery" @dataclass(frozen=True) -class QueuedWebhookDelivery: +class QueuedInboundDelivery: """Payload metadata stored for deferred webhook lead dispatch.""" board_id: UUID webhook_id: UUID payload_id: UUID - payload_event: str | None received_at: datetime attempts: int = 0 - def to_json(self) -> str: - return json.dumps( - { - "board_id": str(self.board_id), - "webhook_id": str(self.webhook_id), - "payload_id": str(self.payload_id), - "payload_event": self.payload_event, - "received_at": self.received_at.isoformat(), - "attempts": self.attempts, - }, - sort_keys=True, + +def _task_from_payload(payload: QueuedInboundDelivery) -> QueuedTask: + return QueuedTask( + task_type=TASK_TYPE, + payload={ + "board_id": str(payload.board_id), + "webhook_id": str(payload.webhook_id), + "payload_id": str(payload.payload_id), + "received_at": payload.received_at.isoformat(), + }, + created_at=payload.received_at, + attempts=payload.attempts, + ) + + +def _payload_from_task(task: QueuedTask) -> QueuedInboundDelivery: + if task.task_type not in {TASK_TYPE, "legacy"}: + raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}") + + payload: dict[str, Any] = task.payload + if task.task_type == "legacy": + received_at = payload.get("received_at") or payload.get("created_at") + return QueuedInboundDelivery( + board_id=UUID(payload["board_id"]), + webhook_id=UUID(payload["webhook_id"]), + payload_id=UUID(payload["payload_id"]), + received_at=datetime.fromisoformat(received_at) if isinstance(received_at, str) else datetime.now(UTC), + attempts=int(payload.get("attempts", task.attempts)), ) - -def _redis_client() -> redis.Redis: - return redis.Redis.from_url(settings.webhook_redis_url) + return QueuedInboundDelivery( + board_id=UUID(payload["board_id"]), + webhook_id=UUID(payload["webhook_id"]), + payload_id=UUID(payload["payload_id"]), + received_at=datetime.fromisoformat(payload["received_at"]), + attempts=int(payload.get("attempts", task.attempts)), + ) -def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool: +def enqueue_webhook_delivery(payload: QueuedInboundDelivery) -> bool: """Persist webhook metadata in a Redis queue for batch dispatch.""" try: - client = _redis_client() - client.lpush(settings.webhook_queue_name, payload.to_json()) + queued = _task_from_payload(payload) + enqueue_task(queued, settings.rq_queue_name, redis_url=settings.rq_redis_url) logger.info( "webhook.queue.enqueued", extra={ @@ -75,62 +92,44 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool: return False -def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None: +def dequeue_webhook_delivery() -> QueuedInboundDelivery | None: """Pop one queued webhook delivery payload.""" - client = _redis_client() - raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name)) - if raw is None: - return None - if isinstance(raw, bytes): - raw = raw.decode("utf-8") try: - payload: dict[str, Any] = json.loads(raw) - event = payload.get("payload_event") - if event is not None: - event = str(event) - return QueuedWebhookDelivery( - board_id=UUID(payload["board_id"]), - webhook_id=UUID(payload["webhook_id"]), - payload_id=UUID(payload["payload_id"]), - payload_event=event, - received_at=datetime.fromisoformat(payload["received_at"]), - attempts=int(payload.get("attempts", 0)), - ) + task = dequeue_task(settings.rq_queue_name, redis_url=settings.rq_redis_url) + if task is None: + return None + return _payload_from_task(task) except Exception as exc: logger.error( "webhook.queue.dequeue_failed", - extra={"raw_payload": str(raw), "error": str(exc)}, + extra={ + "queue_name": settings.rq_queue_name, + "error": str(exc), + }, ) raise -def _requeue_with_attempt(payload: QueuedWebhookDelivery) -> None: - payload = QueuedWebhookDelivery( - board_id=payload.board_id, - webhook_id=payload.webhook_id, - payload_id=payload.payload_id, - payload_event=payload.payload_event, - received_at=payload.received_at, - attempts=payload.attempts + 1, - ) - enqueue_webhook_delivery(payload) - - -def requeue_if_failed(payload: QueuedWebhookDelivery) -> bool: +def requeue_if_failed(payload: QueuedInboundDelivery) -> bool: """Requeue payload delivery with capped retries. Returns True if requeued. """ - if payload.attempts >= settings.webhook_dispatch_max_retries: + try: + return generic_requeue_if_failed( + _task_from_payload(payload), + settings.rq_queue_name, + max_retries=settings.rq_dispatch_max_retries, + redis_url=settings.rq_redis_url, + ) + except Exception as exc: logger.warning( - "webhook.queue.drop_failed_delivery", + "webhook.queue.requeue_failed", extra={ "board_id": str(payload.board_id), "webhook_id": str(payload.webhook_id), "payload_id": str(payload.payload_id), - "attempts": payload.attempts, + "error": str(exc), }, ) - return False - _requeue_with_attempt(payload) - return True + raise diff --git a/backend/app/services/webhooks/scheduler.py b/backend/app/services/webhooks/scheduler.py deleted file mode 100644 index df24c9ba..00000000 --- a/backend/app/services/webhooks/scheduler.py +++ /dev/null @@ -1,83 +0,0 @@ -"""Webhook dispatch scheduler bootstrap for rq-scheduler. - -This module is typically run once at container start to ensure the recurring -job exists (idempotent registration). -""" - -from __future__ import annotations - -import time -from datetime import datetime, timedelta, timezone - -from redis import Redis -from rq_scheduler import Scheduler # type: ignore[import-untyped] - -from app.core.config import settings -from app.core.logging import get_logger -from app.services.webhooks.dispatch import run_flush_webhook_delivery_queue - -logger = get_logger(__name__) - - -def bootstrap_webhook_dispatch_schedule( - interval_seconds: int | None = None, - *, - max_attempts: int = 5, - retry_sleep_seconds: float = 1.0, -) -> None: - """Register a recurring queue-flush job and keep it idempotent. - - Retries Redis connectivity to avoid crashing on transient startup ordering. - """ - - effective_interval_seconds = ( - settings.webhook_dispatch_schedule_interval_seconds - if interval_seconds is None - else interval_seconds - ) - - last_exc: Exception | None = None - for attempt in range(1, max_attempts + 1): - try: - connection = Redis.from_url(settings.webhook_redis_url) - connection.ping() - scheduler = Scheduler( - queue_name=settings.webhook_queue_name, - connection=connection, - ) - - for job in scheduler.get_jobs(): - if job.id == settings.webhook_dispatch_schedule_id: - scheduler.cancel(job) - - scheduler.schedule( - datetime.now(tz=timezone.utc) + timedelta(seconds=5), - func=run_flush_webhook_delivery_queue, - interval=effective_interval_seconds, - repeat=None, - id=settings.webhook_dispatch_schedule_id, - queue_name=settings.webhook_queue_name, - ) - logger.info( - "webhook.scheduler.bootstrapped", - extra={ - "schedule_id": settings.webhook_dispatch_schedule_id, - "queue_name": settings.webhook_queue_name, - "interval_seconds": effective_interval_seconds, - }, - ) - return - except Exception as exc: - last_exc = exc - logger.warning( - "webhook.scheduler.bootstrap_failed", - extra={ - "attempt": attempt, - "max_attempts": max_attempts, - "error": str(exc), - }, - ) - if attempt < max_attempts: - time.sleep(retry_sleep_seconds * attempt) - - raise RuntimeError("Failed to bootstrap webhook dispatch schedule") from last_exc diff --git a/backend/pyproject.toml b/backend/pyproject.toml index f1f5704e..c5ce49e5 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -27,7 +27,6 @@ dependencies = [ "websockets==16.0", "redis==6.3.0", "rq==2.6.0", - "rq-scheduler==0.14.0", ] [project.optional-dependencies] diff --git a/backend/tests/test_board_webhooks_api.py b/backend/tests/test_board_webhooks_api.py index 26643424..d23686e5 100644 --- a/backend/tests/test_board_webhooks_api.py +++ b/backend/tests/test_board_webhooks_api.py @@ -23,7 +23,7 @@ from app.models.board_webhooks import BoardWebhook from app.models.boards import Board from app.models.gateways import Gateway from app.models.organizations import Organization -from app.services.webhooks.queue import QueuedWebhookDelivery +from app.services.webhooks.queue import QueuedInboundDelivery async def _make_engine() -> AsyncEngine: @@ -129,14 +129,13 @@ async def test_ingest_board_webhook_stores_payload_and_enqueues_for_lead_dispatc async with session_maker() as session: board, webhook = await _seed_webhook(session, enabled=True) - def _fake_enqueue(payload: QueuedWebhookDelivery) -> bool: + def _fake_enqueue(payload: QueuedInboundDelivery) -> bool: enqueued.append( { "board_id": str(payload.board_id), "webhook_id": str(payload.webhook_id), "payload_id": str(payload.payload_id), "attempts": payload.attempts, - "event": payload.payload_event, }, ) return True diff --git a/backend/tests/test_queue.py b/backend/tests/test_queue.py new file mode 100644 index 00000000..ea845c49 --- /dev/null +++ b/backend/tests/test_queue.py @@ -0,0 +1,103 @@ +# ruff: noqa: INP001 +"""Generic RQ queue helper tests.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime + +import pytest + +from app.services.queue import QueuedTask, dequeue_task, enqueue_task, requeue_if_failed + + +class _FakeRedis: + def __init__(self) -> None: + self.values: list[str] = [] + + def lpush(self, key: str, value: str) -> None: + del key + self.values.insert(0, value) + + def rpop(self, key: str) -> str | None: + del key + if not self.values: + return None + return self.values.pop() + + +@pytest.mark.parametrize("attempts", [0, 1, 2]) +def test_generic_queue_roundtrip(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None: + fake = _FakeRedis() + + def _fake_redis(*, redis_url: str | None = None) -> _FakeRedis: + return fake + + monkeypatch.setattr("app.services.queue._redis_client", _fake_redis) + payload = QueuedTask( + task_type="generic-task", + payload={"name": "webhook.delivery"}, + created_at=datetime.now(UTC), + attempts=attempts, + ) + + assert enqueue_task(payload, "generic-queue") + item = dequeue_task("generic-queue") + assert item is not None + assert item.task_type == payload.task_type + assert item.payload == payload.payload + assert item.attempts == attempts + + +@pytest.mark.parametrize("attempts", [0, 1, 2, 3]) +def test_generic_requeue_respects_retry_cap(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None: + fake = _FakeRedis() + + def _fake_redis(*, redis_url: str | None = None) -> _FakeRedis: + return fake + + monkeypatch.setattr("app.services.queue._redis_client", _fake_redis) + payload = QueuedTask( + task_type="generic-task", + payload={"attempt": attempts}, + created_at=datetime.now(UTC), + attempts=attempts, + ) + + if attempts >= 3: + assert requeue_if_failed(payload, "generic-queue", max_retries=3) is False + assert fake.values == [] + else: + assert requeue_if_failed(payload, "generic-queue", max_retries=3) is True + requeued = dequeue_task("generic-queue") + assert requeued is not None + assert requeued.attempts == attempts + 1 + + +def test_dequeue_task_tolerates_legacy_payload_without_envelope(monkeypatch: pytest.MonkeyPatch) -> None: + fake = _FakeRedis() + + def _fake_redis(*, redis_url: str | None = None) -> _FakeRedis: + return fake + + monkeypatch.setattr("app.services.queue._redis_client", _fake_redis) + created_at = datetime.now(UTC) + fake.values.append( + json.dumps( + { + "board_id": "6f3ab1ec-3ef6-4f4d-a6a7-e2d6e5d6f7a8", + "webhook_id": "e5cf5d2a-3f7d-4f3a-b2b0-b3b4f6f3a8ad", + "payload_id": "3f1f0b9e-4f7a-4fbe-b0f1-1a6f0f4f9e70", + "payload_event": "push", + "received_at": created_at.isoformat(), + "attempts": 2, + } + ) + ) + + task = dequeue_task("generic-queue") + + assert task is not None + assert task.task_type == "legacy" + assert task.attempts == 2 + assert task.payload["board_id"] == "6f3ab1ec-3ef6-4f4d-a6a7-e2d6e5d6f7a8" diff --git a/backend/tests/test_webhook_dispatch.py b/backend/tests/test_webhook_dispatch.py index 61383bc7..0c3bd61d 100644 --- a/backend/tests/test_webhook_dispatch.py +++ b/backend/tests/test_webhook_dispatch.py @@ -3,6 +3,7 @@ from __future__ import annotations +import json from datetime import UTC, datetime from uuid import UUID, uuid4 @@ -10,7 +11,7 @@ import pytest from app.services.webhooks import dispatch from app.services.webhooks.queue import ( - QueuedWebhookDelivery, + QueuedInboundDelivery, dequeue_webhook_delivery, enqueue_webhook_delivery, requeue_if_failed, @@ -34,22 +35,21 @@ class _FakeRedis: def test_webhook_queue_roundtrip(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None: fake = _FakeRedis() - def _fake_redis() -> _FakeRedis: + def _fake_redis(*, redis_url: str | None = None) -> _FakeRedis: return fake board_id = uuid4() webhook_id = uuid4() payload_id = uuid4() - payload = QueuedWebhookDelivery( + payload = QueuedInboundDelivery( board_id=board_id, webhook_id=webhook_id, payload_id=payload_id, - payload_event="push", received_at=datetime.now(UTC), attempts=attempts, ) - monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis) + monkeypatch.setattr("app.services.queue._redis_client", _fake_redis) assert enqueue_webhook_delivery(payload) dequeued = dequeue_webhook_delivery() @@ -57,24 +57,54 @@ def test_webhook_queue_roundtrip(monkeypatch: pytest.MonkeyPatch, attempts: int) assert dequeued.board_id == board_id assert dequeued.webhook_id == webhook_id assert dequeued.payload_id == payload_id - assert dequeued.payload_event == "push" assert dequeued.attempts == attempts +def test_webhook_queue_dequeue_legacy_payload(monkeypatch: pytest.MonkeyPatch) -> None: + fake = _FakeRedis() + + def _fake_redis(*, redis_url: str | None = None) -> _FakeRedis: + return fake + + payload_id = uuid4() + board_id = uuid4() + webhook_id = uuid4() + received_at = datetime.now(UTC) + fake.values.append( + json.dumps( + { + "board_id": str(board_id), + "webhook_id": str(webhook_id), + "payload_id": str(payload_id), + "received_at": received_at.isoformat(), + "attempts": 2, + } + ) + ) + + monkeypatch.setattr("app.services.queue._redis_client", _fake_redis) + dequeued = dequeue_webhook_delivery() + + assert dequeued is not None + assert dequeued.board_id == board_id + assert dequeued.webhook_id == webhook_id + assert dequeued.payload_id == payload_id + assert dequeued.attempts == 2 + + @pytest.mark.parametrize("attempts", [0, 1, 2, 3]) def test_requeue_respects_retry_cap(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None: fake = _FakeRedis() - def _fake_redis() -> _FakeRedis: + def _fake_redis(*, redis_url: str | None = None) -> _FakeRedis: return fake - monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis) + monkeypatch.setattr("app.services.queue._redis_client", _fake_redis) - payload = QueuedWebhookDelivery( + payload = QueuedInboundDelivery( board_id=uuid4(), webhook_id=uuid4(), payload_id=uuid4(), - payload_event="push", received_at=datetime.now(UTC), attempts=attempts, ) @@ -97,8 +127,8 @@ class _FakeQueuedItem: self.attempts = attempts -def _patch_dequeue(monkeypatch: pytest.MonkeyPatch, items: list[QueuedWebhookDelivery | None]) -> None: - def _dequeue() -> QueuedWebhookDelivery | None: +def _patch_dequeue(monkeypatch: pytest.MonkeyPatch, items: list[QueuedInboundDelivery | None]) -> None: + def _dequeue() -> QueuedInboundDelivery | None: if not items: return None return items.pop(0) @@ -108,7 +138,7 @@ def _patch_dequeue(monkeypatch: pytest.MonkeyPatch, items: list[QueuedWebhookDel @pytest.mark.asyncio async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest.MonkeyPatch) -> None: - items: list[QueuedWebhookDelivery | None] = [ + items: list[QueuedInboundDelivery | None] = [ _FakeQueuedItem(), _FakeQueuedItem(), None, @@ -118,11 +148,11 @@ async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest. processed: list[UUID] = [] throttles: list[float] = [] - async def _process(item: QueuedWebhookDelivery) -> None: + async def _process(item: QueuedInboundDelivery) -> None: processed.append(item.payload_id) monkeypatch.setattr(dispatch, "_process_single_item", _process) - monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0) + monkeypatch.setattr(dispatch.settings, "rq_dispatch_throttle_seconds", 0) monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: throttles.append(seconds)) await dispatch.flush_webhook_delivery_queue() @@ -136,18 +166,18 @@ async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.Monk item = _FakeQueuedItem() _patch_dequeue(monkeypatch, [item, None]) - async def _process(_: QueuedWebhookDelivery) -> None: + async def _process(_: QueuedInboundDelivery) -> None: raise RuntimeError("boom") - requeued: list[QueuedWebhookDelivery] = [] + requeued: list[QueuedInboundDelivery] = [] - def _requeue(payload: QueuedWebhookDelivery) -> bool: + def _requeue(payload: QueuedInboundDelivery) -> bool: requeued.append(payload) return True monkeypatch.setattr(dispatch, "_process_single_item", _process) monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue) - monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0) + monkeypatch.setattr(dispatch.settings, "rq_dispatch_throttle_seconds", 0) monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None) await dispatch.flush_webhook_delivery_queue() @@ -161,7 +191,7 @@ async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.Mo item = _FakeQueuedItem() call_count = 0 - def _dequeue() -> QueuedWebhookDelivery | None: + def _dequeue() -> QueuedInboundDelivery | None: nonlocal call_count call_count += 1 if call_count == 1: @@ -174,12 +204,12 @@ async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.Mo processed = 0 - async def _process(_: QueuedWebhookDelivery) -> None: + async def _process(_: QueuedInboundDelivery) -> None: nonlocal processed processed += 1 monkeypatch.setattr(dispatch, "_process_single_item", _process) - monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0) + monkeypatch.setattr(dispatch.settings, "rq_dispatch_throttle_seconds", 0) monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None) await dispatch.flush_webhook_delivery_queue() diff --git a/backend/uv.lock b/backend/uv.lock index a1dcc981..c01cab38 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -730,7 +730,6 @@ dependencies = [ { name = "python-dotenv" }, { name = "redis" }, { name = "rq" }, - { name = "rq-scheduler" }, { name = "sqlalchemy", extra = ["asyncio"] }, { name = "sqlmodel" }, { name = "sse-starlette" }, @@ -775,7 +774,6 @@ requires-dist = [ { name = "python-dotenv", specifier = "==1.2.1" }, { name = "redis", specifier = "==6.3.0" }, { name = "rq", specifier = "==2.6.0" }, - { name = "rq-scheduler", specifier = "==0.14.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = "==0.15.0" }, { name = "sqlalchemy", extras = ["asyncio"], specifier = "==2.0.46" }, { name = "sqlmodel", specifier = "==0.0.32" }, @@ -1195,21 +1193,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cc/66/6cf141584526e3ed5b57a194e09cbdf7058334bd3926bb3f96e2453cf053/rq-2.6.0-py3-none-any.whl", hash = "sha256:be5ccc0f0fc5f32da0999648340e31476368f08067f0c3fce6768d00064edbb5", size = 112533, upload-time = "2025-09-06T03:15:09.894Z" }, ] -[[package]] -name = "rq-scheduler" -version = "0.14.0" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "crontab" }, - { name = "freezegun" }, - { name = "python-dateutil" }, - { name = "rq" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/a0/4e/977bbcc1f3b25ed9ea60ec968b13f7147661defe5b2f9272b44fdb1c5549/rq-scheduler-0.14.0.tar.gz", hash = "sha256:2d5a14a1ab217f8693184ebaa1fe03838edcbc70b4f76572721c0b33058cd023", size = 16582, upload-time = "2024-10-29T13:30:32.641Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/bb/d0/28cedca9f3b321f30e69d644c2dcd7097ec21570ec9606fde56750621300/rq_scheduler-0.14.0-py2.py3-none-any.whl", hash = "sha256:d4ec221a3d8c11b3ff55e041f09d9af1e17f3253db737b6b97e86ab20fc3dc0d", size = 13874, upload-time = "2024-10-29T13:30:30.449Z" }, -] - [[package]] name = "ruff" version = "0.15.0" diff --git a/compose.yml b/compose.yml index 8e797904..85930802 100644 --- a/compose.yml +++ b/compose.yml @@ -37,7 +37,7 @@ services: DB_AUTO_MIGRATE: ${DB_AUTO_MIGRATE:-true} AUTH_MODE: ${AUTH_MODE} LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN} - WEBHOOK_REDIS_URL: redis://redis:6379/0 + RQ_REDIS_URL: redis://redis:6379/0 depends_on: db: condition: service_healthy @@ -70,7 +70,7 @@ services: build: context: . dockerfile: backend/Dockerfile - command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"] + command: ["rq", "worker", "-u", "redis://redis:6379/0"] env_file: - ./backend/.env.example depends_on: @@ -82,29 +82,10 @@ services: DATABASE_URL: postgresql+psycopg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-mission_control} AUTH_MODE: ${AUTH_MODE} LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN} - WEBHOOK_REDIS_URL: redis://redis:6379/0 - WEBHOOK_QUEUE_NAME: webhook-dispatch - WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch - restart: unless-stopped - - webhook-dispatch-cron: - build: - context: . - dockerfile: backend/Dockerfile - command: - - sh - - -c - - | - python -c "from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule; bootstrap_webhook_dispatch_schedule()" && \ - rqscheduler -u "${WEBHOOK_REDIS_URL:-redis://redis:6379/0}" -i 60 - depends_on: - - redis - - webhook-worker - environment: - WEBHOOK_REDIS_URL: redis://redis:6379/0 - WEBHOOK_QUEUE_NAME: webhook-dispatch - WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch - WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: ${WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS:-900} + RQ_REDIS_URL: redis://redis:6379/0 + RQ_QUEUE_NAME: ${RQ_QUEUE_NAME:-default} + RQ_DISPATCH_THROTTLE_SECONDS: ${RQ_DISPATCH_THROTTLE_SECONDS:-2.0} + RQ_DISPATCH_MAX_RETRIES: ${RQ_DISPATCH_MAX_RETRIES:-3} restart: unless-stopped volumes: