diff --git a/backend/.env.example b/backend/.env.example index 8d3f3dec..10257962 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -12,12 +12,16 @@ BASE_URL= AUTH_MODE=local # REQUIRED when AUTH_MODE=local (must be non-placeholder and at least 50 chars). LOCAL_AUTH_TOKEN= - # Clerk (auth only; used when AUTH_MODE=clerk) CLERK_SECRET_KEY= CLERK_API_URL=https://api.clerk.com CLERK_VERIFY_IAT=true CLERK_LEEWAY=10.0 - # Database DB_AUTO_MIGRATE=false +# Webhook queue / worker +WEBHOOK_REDIS_URL=redis://localhost:6379/0 +WEBHOOK_QUEUE_NAME=webhook-dispatch +WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0 +WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch +WEBHOOK_DISPATCH_MAX_RETRIES=3 diff --git a/backend/app/api/board_webhooks.py b/backend/app/api/board_webhooks.py index f9ab37ca..a0318960 100644 --- a/backend/app/api/board_webhooks.py +++ b/backend/app/api/board_webhooks.py @@ -29,6 +29,7 @@ from app.schemas.board_webhooks import ( from app.schemas.common import OkResponse from app.schemas.pagination import DefaultLimitOffsetPage from app.services.openclaw.gateway_dispatch import GatewayDispatchService +from app.services.webhooks.queue import QueuedWebhookDelivery, enqueue_webhook_delivery if TYPE_CHECKING: from collections.abc import Sequence @@ -166,6 +167,12 @@ def _captured_headers(request: Request) -> dict[str, str] | None: return captured or None +def _extract_webhook_event(headers: dict[str, str] | None) -> str | None: + if not headers: + return None + return headers.get("x-github-event") or headers.get("x-event-type") + + def _payload_preview( value: dict[str, object] | list[object] | str | int | float | bool | None, ) -> str: @@ -412,6 +419,7 @@ async def ingest_board_webhook( ) content_type = request.headers.get("content-type") + headers = _captured_headers(request) payload_value = _decode_payload( await request.body(), content_type=content_type, @@ -420,7 +428,7 @@ async def ingest_board_webhook( board_id=board.id, webhook_id=webhook.id, payload=payload_value, - headers=_captured_headers(request), + headers=headers, source_ip=request.client.host if request.client else None, content_type=content_type, ) @@ -438,12 +446,25 @@ async def ingest_board_webhook( ) session.add(memory) await session.commit() - await _notify_lead_on_webhook_payload( - session=session, - board=board, - webhook=webhook, - payload=payload, + + enqueued = enqueue_webhook_delivery( + QueuedWebhookDelivery( + board_id=board.id, + webhook_id=webhook.id, + payload_id=payload.id, + payload_event=_extract_webhook_event(headers), + received_at=payload.received_at, + ), ) + if not enqueued: + # Preserve historical behavior by still notifying synchronously if queueing fails. + await _notify_lead_on_webhook_payload( + session=session, + board=board, + webhook=webhook, + payload=payload, + ) + return BoardWebhookIngestResponse( board_id=board.id, webhook_id=webhook.id, diff --git a/backend/app/core/config.py b/backend/app/core/config.py index d857e39e..1475bd6c 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -53,6 +53,12 @@ class Settings(BaseSettings): # Database lifecycle db_auto_migrate: bool = False + # Webhook queueing / dispatch + webhook_redis_url: str = "redis://localhost:6379/0" + webhook_queue_name: str = "webhook-dispatch" + webhook_dispatch_throttle_seconds: float = 2.0 + webhook_dispatch_max_retries: int = 3 + # Logging log_level: str = "INFO" log_format: str = "text" diff --git a/backend/app/services/webhooks/__init__.py b/backend/app/services/webhooks/__init__.py new file mode 100644 index 00000000..d4c7fc60 --- /dev/null +++ b/backend/app/services/webhooks/__init__.py @@ -0,0 +1,3 @@ +"""Webhook queueing and dispatch utilities.""" + +__all__ = ["dispatch", "queue", "scheduler"] diff --git a/backend/app/services/webhooks/dispatch.py b/backend/app/services/webhooks/dispatch.py new file mode 100644 index 00000000..7952e46c --- /dev/null +++ b/backend/app/services/webhooks/dispatch.py @@ -0,0 +1,213 @@ +"""Webhook dispatch worker routines.""" + +from __future__ import annotations + +import asyncio +import time + +from sqlalchemy import col, select +from sqlmodel.ext.asyncio.session import AsyncSession +from uuid import UUID + +from app.core.config import settings +from app.core.logging import get_logger +from app.db.session import async_session_maker +from app.models.agents import Agent +from app.models.board_webhook_payloads import BoardWebhookPayload +from app.models.board_webhooks import BoardWebhook +from app.models.boards import Board +from app.services.openclaw.gateway_dispatch import GatewayDispatchService +from app.services.webhooks.queue import QueuedWebhookDelivery, dequeue_webhook_delivery, requeue_if_failed + +logger = get_logger(__name__) + + +def _build_payload_preview(payload_value: object) -> str: + if isinstance(payload_value, str): + return payload_value + try: + import json + + return json.dumps(payload_value, indent=2, ensure_ascii=True) + except TypeError: + return str(payload_value) + + +def _payload_preview(payload_value: object) -> str: + preview = _build_payload_preview(payload_value) + if len(preview) <= 1600: + return preview + return f"{preview[:1597]}..." + + +def _webhook_message( + *, + board: Board, + webhook: BoardWebhook, + payload: BoardWebhookPayload, +) -> str: + preview = _payload_preview(payload.payload) + return ( + "WEBHOOK EVENT RECEIVED\n" + f"Board: {board.name}\n" + f"Webhook ID: {webhook.id}\n" + f"Payload ID: {payload.id}\n" + f"Instruction: {webhook.description}\n\n" + "Take action:\n" + "1) Triage this payload against the webhook instruction.\n" + "2) Create/update tasks as needed.\n" + f"3) Reference payload ID {payload.id} in task descriptions.\n\n" + "Payload preview:\n" + f"{preview}\n\n" + "To inspect board memory entries:\n" + f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false" + ) + + +async def _notify_lead( + *, + session: AsyncSession, + board: Board, + webhook: BoardWebhook, + payload: BoardWebhookPayload, +) -> None: + lead = ( + await Agent.objects.filter_by(board_id=board.id) + .filter(col(Agent.is_board_lead).is_(True)) + .first(session) + ) + if lead is None or not lead.openclaw_session_id: + return + + dispatch = GatewayDispatchService(session) + config = await dispatch.optional_gateway_config_for_board(board) + if config is None: + return + + message = _webhook_message(board=board, webhook=webhook, payload=payload) + await dispatch.try_send_agent_message( + session_key=lead.openclaw_session_id, + config=config, + agent_name=lead.name, + message=message, + deliver=False, + ) + + +async def _load_webhook_payload( + *, + session: AsyncSession, + payload_id: UUID, + webhook_id: UUID, + board_id: UUID, +) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None: + payload = ( + await session.exec( + select(BoardWebhookPayload) + .where(col(BoardWebhookPayload.id) == payload_id) + .where(col(BoardWebhookPayload.board_id) == board_id) + .where(col(BoardWebhookPayload.webhook_id) == webhook_id), + ) + ).first() + if payload is None: + logger.warning( + "webhook.queue.payload_missing", + extra={ + "payload_id": str(payload_id), + "webhook_id": str(webhook_id), + "board_id": str(board_id), + }, + ) + return None + + board = await Board.objects.by_id(board_id).first(session) + if board is None: + logger.warning( + "webhook.queue.board_missing", + extra={"board_id": str(board_id), "payload_id": str(payload_id)}, + ) + return None + + webhook = ( + await session.exec( + select(BoardWebhook) + .where(col(BoardWebhook.id) == webhook_id) + .where(col(BoardWebhook.board_id) == board_id), + ) + ).first() + if webhook is None: + logger.warning( + "webhook.queue.webhook_missing", + extra={"webhook_id": str(webhook_id), "board_id": str(board_id)}, + ) + return None + return board, webhook, payload + + +async def _process_single_item(item: QueuedWebhookDelivery) -> None: + async with async_session_maker() as session: + loaded = await _load_webhook_payload( + session=session, + payload_id=item.payload_id, + webhook_id=item.webhook_id, + board_id=item.board_id, + ) + if loaded is None: + return + + board, webhook, payload = loaded + await _notify_lead(session=session, board=board, webhook=webhook, payload=payload) + await session.commit() + + +async def flush_webhook_delivery_queue() -> None: + """Consume queued webhook events and notify board leads in a throttled batch.""" + processed = 0 + while True: + try: + item = dequeue_webhook_delivery() + except Exception: + logger.exception("webhook.dispatch.dequeue_failed") + continue + + if item is None: + break + + try: + await _process_single_item(item) + processed += 1 + logger.info( + "webhook.dispatch.success", + extra={ + "payload_id": str(item.payload_id), + "webhook_id": str(item.webhook_id), + "board_id": str(item.board_id), + "attempt": item.attempts, + }, + ) + except Exception as exc: + logger.exception( + "webhook.dispatch.failed", + extra={ + "payload_id": str(item.payload_id), + "webhook_id": str(item.webhook_id), + "board_id": str(item.board_id), + "attempt": item.attempts, + "error": str(exc), + }, + ) + requeue_if_failed(item) + time.sleep(settings.webhook_dispatch_throttle_seconds) + logger.info("webhook.dispatch.batch_complete", extra={"count": processed}) + + +def run_flush_webhook_delivery_queue() -> None: + """RQ entrypoint for running the async queue flush from worker jobs.""" + logger.info( + "webhook.dispatch.batch_started", + extra={"throttle_seconds": settings.webhook_dispatch_throttle_seconds}, + ) + start = time.time() + asyncio.run(flush_webhook_delivery_queue()) + elapsed_ms = int((time.time() - start) * 1000) + logger.info("webhook.dispatch.batch_finished", extra={"duration_ms": elapsed_ms}) diff --git a/backend/app/services/webhooks/queue.py b/backend/app/services/webhooks/queue.py new file mode 100644 index 00000000..41237ec1 --- /dev/null +++ b/backend/app/services/webhooks/queue.py @@ -0,0 +1,134 @@ +"""Webhook queue persistence and delivery helpers.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime +from typing import Any +from uuid import UUID + +import redis + +from app.core.config import settings +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +@dataclass(frozen=True) +class QueuedWebhookDelivery: + """Payload metadata stored for deferred webhook lead dispatch.""" + + board_id: UUID + webhook_id: UUID + payload_id: UUID + payload_event: str | None + received_at: datetime + attempts: int = 0 + + def to_json(self) -> str: + return json.dumps( + { + "board_id": str(self.board_id), + "webhook_id": str(self.webhook_id), + "payload_id": str(self.payload_id), + "payload_event": self.payload_event, + "received_at": self.received_at.isoformat(), + "attempts": self.attempts, + }, + sort_keys=True, + ) + + +def _redis_client() -> redis.Redis: + return redis.Redis.from_url(settings.webhook_redis_url) + + +def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool: + """Persist webhook metadata in a Redis queue for batch dispatch.""" + try: + client = _redis_client() + client.lpush(settings.webhook_queue_name, payload.to_json()) + logger.info( + "webhook.queue.enqueued", + extra={ + "board_id": str(payload.board_id), + "webhook_id": str(payload.webhook_id), + "payload_id": str(payload.payload_id), + "attempt": payload.attempts, + }, + ) + return True + except Exception as exc: + logger.warning( + "webhook.queue.enqueue_failed", + extra={ + "board_id": str(payload.board_id), + "webhook_id": str(payload.webhook_id), + "payload_id": str(payload.payload_id), + "error": str(exc), + }, + ) + return False + + +def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None: + """Pop one queued webhook delivery payload.""" + client = _redis_client() + raw = client.rpop(settings.webhook_queue_name) + if raw is None: + return None + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + try: + payload: dict[str, Any] = json.loads(raw) + event = payload.get("payload_event") + if event is not None: + event = str(event) + return QueuedWebhookDelivery( + board_id=UUID(payload["board_id"]), + webhook_id=UUID(payload["webhook_id"]), + payload_id=UUID(payload["payload_id"]), + payload_event=event, + received_at=datetime.fromisoformat(payload["received_at"]), + attempts=int(payload.get("attempts", 0)), + ) + except Exception as exc: + logger.error( + "webhook.queue.dequeue_failed", + extra={"raw_payload": str(raw), "error": str(exc)}, + ) + raise + + +def _requeue_with_attempt(payload: QueuedWebhookDelivery) -> None: + payload = QueuedWebhookDelivery( + board_id=payload.board_id, + webhook_id=payload.webhook_id, + payload_id=payload.payload_id, + payload_event=payload.payload_event, + received_at=payload.received_at, + attempts=payload.attempts + 1, + ) + enqueue_webhook_delivery(payload) + + +def requeue_if_failed(payload: QueuedWebhookDelivery) -> bool: + """Requeue payload delivery with capped retries. + + Returns True if requeued. + """ + if payload.attempts >= settings.webhook_dispatch_max_retries: + logger.warning( + "webhook.queue.drop_failed_delivery", + extra={ + "board_id": str(payload.board_id), + "webhook_id": str(payload.webhook_id), + "payload_id": str(payload.payload_id), + "attempts": payload.attempts, + }, + ) + return False + _requeue_with_attempt(payload) + return True diff --git a/backend/app/services/webhooks/scheduler.py b/backend/app/services/webhooks/scheduler.py new file mode 100644 index 00000000..6c452f5f --- /dev/null +++ b/backend/app/services/webhooks/scheduler.py @@ -0,0 +1,30 @@ +"""Webhook dispatch scheduler bootstrap for rq-scheduler.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +from redis import Redis +from rq_scheduler import Scheduler + +from app.core.config import settings +from app.services.webhooks import dispatch + + +def bootstrap_webhook_dispatch_schedule(interval_seconds: int = 900) -> None: + """Register a recurring queue-flush job and keep it idempotent.""" + connection = Redis.from_url(settings.webhook_redis_url) + scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection) + + for job in scheduler.get_jobs(): + if job.id == settings.webhook_dispatch_schedule_id: + scheduler.cancel(job) + + scheduler.schedule( + datetime.now(tz=timezone.utc) + timedelta(seconds=5), + func=dispatch.run_flush_webhook_delivery_queue, + interval=interval_seconds, + repeat=None, + id=settings.webhook_dispatch_schedule_id, + queue_name=settings.webhook_queue_name, + ) diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 9346c23b..f1f5704e 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -25,6 +25,9 @@ dependencies = [ "sse-starlette==3.2.0", "uvicorn[standard]==0.40.0", "websockets==16.0", + "redis==6.3.0", + "rq==2.6.0", + "rq-scheduler==0.14.0", ] [project.optional-dependencies] diff --git a/backend/tests/test_board_webhooks_api.py b/backend/tests/test_board_webhooks_api.py index 77128c87..26643424 100644 --- a/backend/tests/test_board_webhooks_api.py +++ b/backend/tests/test_board_webhooks_api.py @@ -23,6 +23,7 @@ from app.models.board_webhooks import BoardWebhook from app.models.boards import Board from app.models.gateways import Gateway from app.models.organizations import Organization +from app.services.webhooks.queue import QueuedWebhookDelivery async def _make_engine() -> AsyncEngine: @@ -112,7 +113,7 @@ async def _seed_webhook( @pytest.mark.asyncio -async def test_ingest_board_webhook_stores_payload_and_notifies_lead( +async def test_ingest_board_webhook_stores_payload_and_enqueues_for_lead_dispatch( monkeypatch: pytest.MonkeyPatch, ) -> None: engine = await _make_engine() @@ -122,16 +123,23 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead( expire_on_commit=False, ) app = _build_test_app(session_maker) + enqueued: list[dict[str, object]] = [] sent_messages: list[dict[str, str]] = [] async with session_maker() as session: board, webhook = await _seed_webhook(session, enabled=True) - async def _fake_optional_gateway_config_for_board( - self: board_webhooks.GatewayDispatchService, - _board: Board, - ) -> object: - return object() + def _fake_enqueue(payload: QueuedWebhookDelivery) -> bool: + enqueued.append( + { + "board_id": str(payload.board_id), + "webhook_id": str(payload.webhook_id), + "payload_id": str(payload.payload_id), + "attempts": payload.attempts, + "event": payload.payload_event, + }, + ) + return True async def _fake_try_send_agent_message( self: board_webhooks.GatewayDispatchService, @@ -145,7 +153,7 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead( del self, config, deliver sent_messages.append( { - "session_key": session_key, + "session_id": session_key, "agent_name": agent_name, "message": message, }, @@ -153,9 +161,9 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead( return None monkeypatch.setattr( - board_webhooks.GatewayDispatchService, - "optional_gateway_config_for_board", - _fake_optional_gateway_config_for_board, + board_webhooks, + "enqueue_webhook_delivery", + _fake_enqueue, ) monkeypatch.setattr( board_webhooks.GatewayDispatchService, @@ -204,11 +212,12 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead( assert f"payload:{payload_id}" in memory_items[0].tags assert f"Payload ID: {payload_id}" in memory_items[0].content - assert len(sent_messages) == 1 - assert sent_messages[0]["session_key"] == "lead:session:key" - assert "WEBHOOK EVENT RECEIVED" in sent_messages[0]["message"] - assert str(payload_id) in sent_messages[0]["message"] - assert webhook.description in sent_messages[0]["message"] + assert len(enqueued) == 1 + assert enqueued[0]["board_id"] == str(board.id) + assert enqueued[0]["webhook_id"] == str(webhook.id) + assert enqueued[0]["payload_id"] == str(payload_id) + + assert len(sent_messages) == 0 finally: await engine.dispose() diff --git a/backend/tests/test_webhook_dispatch.py b/backend/tests/test_webhook_dispatch.py new file mode 100644 index 00000000..4df2c99a --- /dev/null +++ b/backend/tests/test_webhook_dispatch.py @@ -0,0 +1,88 @@ +# ruff: noqa: INP001 +"""Webhook queue helper unit tests.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from uuid import UUID, uuid4 + +import pytest + +from app.services.webhooks.queue import ( + QueuedWebhookDelivery, + dequeue_webhook_delivery, + enqueue_webhook_delivery, + requeue_if_failed, +) + + +class _FakeRedis: + def __init__(self) -> None: + self.values: list[str] = [] + + def lpush(self, key: str, value: str) -> None: + self.values.insert(0, value) + + def rpop(self, key: str) -> str | None: + if not self.values: + return None + return self.values.pop() + + +@pytest.mark.parametrize("attempts", [0, 1, 2]) +def test_webhook_queue_roundtrip(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None: + fake = _FakeRedis() + + def _fake_redis() -> _FakeRedis: + return fake + + board_id = uuid4() + webhook_id = uuid4() + payload_id = uuid4() + payload = QueuedWebhookDelivery( + board_id=board_id, + webhook_id=webhook_id, + payload_id=payload_id, + payload_event="push", + received_at=datetime.now(UTC), + attempts=attempts, + ) + + monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis) + assert enqueue_webhook_delivery(payload) + + dequeued = dequeue_webhook_delivery() + assert dequeued is not None + assert dequeued.board_id == board_id + assert dequeued.webhook_id == webhook_id + assert dequeued.payload_id == payload_id + assert dequeued.payload_event == "push" + assert dequeued.attempts == attempts + + +@pytest.mark.parametrize("attempts", [0, 1, 2, 3]) +def test_requeue_respects_retry_cap(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None: + fake = _FakeRedis() + + def _fake_redis() -> _FakeRedis: + return fake + + monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis) + + payload = QueuedWebhookDelivery( + board_id=uuid4(), + webhook_id=uuid4(), + payload_id=uuid4(), + payload_event="push", + received_at=datetime.now(UTC), + attempts=attempts, + ) + + if attempts >= 3: + assert requeue_if_failed(payload) is False + assert fake.values == [] + else: + assert requeue_if_failed(payload) is True + requeued = dequeue_webhook_delivery() + assert requeued is not None + assert requeued.attempts == attempts + 1 diff --git a/backend/uv.lock b/backend/uv.lock index 37634295..a1dcc981 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -279,6 +279,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0d/4a/331fe2caf6799d591109bb9c08083080f6de90a823695d412a935622abb2/coverage-7.13.4-py3-none-any.whl", hash = "sha256:1af1641e57cf7ba1bd67d677c9abdbcd6cc2ab7da3bca7fa1e2b7e50e65f2ad0", size = 211242, upload-time = "2026-02-09T12:59:02.032Z" }, ] +[[package]] +name = "croniter" +version = "6.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, + { name = "pytz" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ad/2f/44d1ae153a0e27be56be43465e5cb39b9650c781e001e7864389deb25090/croniter-6.0.0.tar.gz", hash = "sha256:37c504b313956114a983ece2c2b07790b1f1094fe9d81cc94739214748255577", size = 64481, upload-time = "2024-12-17T17:17:47.32Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/4b/290b4c3efd6417a8b0c284896de19b1d5855e6dbdb97d2a35e68fa42de85/croniter-6.0.0-py2.py3-none-any.whl", hash = "sha256:2f878c3856f17896979b2a4379ba1f09c83e374931ea15cc835c5dd2eee9b368", size = 25468, upload-time = "2024-12-17T17:17:45.359Z" }, +] + +[[package]] +name = "crontab" +version = "1.0.5" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d6/36/a255b6f5a2e22df03fd2b2f3088974b44b8c9e9407e26b44742cb7cfbf5b/crontab-1.0.5.tar.gz", hash = "sha256:f80e01b4f07219763a9869f926dd17147278e7965a928089bca6d3dc80ae46d5", size = 21963, upload-time = "2025-07-09T17:09:38.264Z" } + [[package]] name = "cryptography" version = "45.0.7" @@ -358,6 +377,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9f/56/13ab06b4f93ca7cac71078fbe37fcea175d3216f31f85c3168a6bbd0bb9a/flake8-7.3.0-py2.py3-none-any.whl", hash = "sha256:b9696257b9ce8beb888cdbe31cf885c90d31928fe202be0889a7cdafad32f01e", size = 57922, upload-time = "2025-06-20T19:31:34.425Z" }, ] +[[package]] +name = "freezegun" +version = "1.5.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/dd/23e2f4e357f8fd3bdff613c1fe4466d21bfb00a6177f238079b17f7b1c84/freezegun-1.5.5.tar.gz", hash = "sha256:ac7742a6cc6c25a2c35e9292dfd554b897b517d2dec26891a2e8debf205cb94a", size = 35914, upload-time = "2025-08-09T10:39:08.338Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5e/2e/b41d8a1a917d6581fc27a35d05561037b048e47df50f27f8ac9c7e27a710/freezegun-1.5.5-py3-none-any.whl", hash = "sha256:cd557f4a75cf074e84bc374249b9dd491eaeacd61376b9eb3c423282211619d2", size = 19266, upload-time = "2025-08-09T10:39:06.636Z" }, +] + [[package]] name = "greenlet" version = "3.3.1" @@ -697,6 +728,9 @@ dependencies = [ { name = "psycopg", extra = ["binary"] }, { name = "pydantic-settings" }, { name = "python-dotenv" }, + { name = "redis" }, + { name = "rq" }, + { name = "rq-scheduler" }, { name = "sqlalchemy", extra = ["asyncio"] }, { name = "sqlmodel" }, { name = "sse-starlette" }, @@ -739,6 +773,9 @@ requires-dist = [ { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = "==1.3.0" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = "==7.0.0" }, { name = "python-dotenv", specifier = "==1.2.1" }, + { name = "redis", specifier = "==6.3.0" }, + { name = "rq", specifier = "==2.6.0" }, + { name = "rq-scheduler", specifier = "==0.14.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = "==0.15.0" }, { name = "sqlalchemy", extras = ["asyncio"], specifier = "==2.0.46" }, { name = "sqlmodel", specifier = "==0.0.32" }, @@ -1030,6 +1067,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, ] +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "six" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432, upload-time = "2024-03-01T18:36:20.211Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" }, +] + [[package]] name = "python-dotenv" version = "1.2.1" @@ -1068,6 +1117,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c6/78/397db326746f0a342855b81216ae1f0a32965deccfd7c830a2dbc66d2483/pytokens-0.4.1-py3-none-any.whl", hash = "sha256:26cef14744a8385f35d0e095dc8b3a7583f6c953c2e3d269c7f82484bf5ad2de", size = 13729, upload-time = "2026-01-30T01:03:45.029Z" }, ] +[[package]] +name = "pytz" +version = "2025.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f8/bf/abbd3cdfb8fbc7fb3d4d38d320f2441b1e7cbe29be4f23797b4a2b5d8aac/pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3", size = 320884, upload-time = "2025-03-25T02:25:00.538Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225, upload-time = "2025-03-25T02:24:58.468Z" }, +] + [[package]] name = "pyyaml" version = "6.0.3" @@ -1114,6 +1172,44 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" }, ] +[[package]] +name = "redis" +version = "6.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/21/cd/030274634a1a052b708756016283ea3d84e91ae45f74d7f5dcf55d753a0f/redis-6.3.0.tar.gz", hash = "sha256:3000dbe532babfb0999cdab7b3e5744bcb23e51923febcfaeb52c8cfb29632ef", size = 4647275, upload-time = "2025-08-05T08:12:31.648Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/df/a7/2fe45801534a187543fc45d28b3844d84559c1589255bc2ece30d92dc205/redis-6.3.0-py3-none-any.whl", hash = "sha256:92f079d656ded871535e099080f70fab8e75273c0236797126ac60242d638e9b", size = 280018, upload-time = "2025-08-05T08:12:30.093Z" }, +] + +[[package]] +name = "rq" +version = "2.6.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "croniter" }, + { name = "redis" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8e/f5/46e39abc46ff6ff4f3151ee4fd2c1bf7601a8d26bd30fd951c5496b1e6c6/rq-2.6.0.tar.gz", hash = "sha256:92ad55676cda14512c4eea5782f398a102dc3af108bea197c868c4c50c5d3e81", size = 675315, upload-time = "2025-09-06T03:15:12.854Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cc/66/6cf141584526e3ed5b57a194e09cbdf7058334bd3926bb3f96e2453cf053/rq-2.6.0-py3-none-any.whl", hash = "sha256:be5ccc0f0fc5f32da0999648340e31476368f08067f0c3fce6768d00064edbb5", size = 112533, upload-time = "2025-09-06T03:15:09.894Z" }, +] + +[[package]] +name = "rq-scheduler" +version = "0.14.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "crontab" }, + { name = "freezegun" }, + { name = "python-dateutil" }, + { name = "rq" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a0/4e/977bbcc1f3b25ed9ea60ec968b13f7147661defe5b2f9272b44fdb1c5549/rq-scheduler-0.14.0.tar.gz", hash = "sha256:2d5a14a1ab217f8693184ebaa1fe03838edcbc70b4f76572721c0b33058cd023", size = 16582, upload-time = "2024-10-29T13:30:32.641Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bb/d0/28cedca9f3b321f30e69d644c2dcd7097ec21570ec9606fde56750621300/rq_scheduler-0.14.0-py2.py3-none-any.whl", hash = "sha256:d4ec221a3d8c11b3ff55e041f09d9af1e17f3253db737b6b97e86ab20fc3dc0d", size = 13874, upload-time = "2024-10-29T13:30:30.449Z" }, +] + [[package]] name = "ruff" version = "0.15.0" @@ -1139,6 +1235,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f6/b0/2d823f6e77ebe560f4e397d078487e8d52c1516b331e3521bc75db4272ca/ruff-0.15.0-py3-none-win_arm64.whl", hash = "sha256:c480d632cc0ca3f0727acac8b7d053542d9e114a462a145d0b00e7cd658c515a", size = 10865753, upload-time = "2026-02-03T17:53:03.014Z" }, ] +[[package]] +name = "six" +version = "1.17.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, +] + [[package]] name = "sqlalchemy" version = "2.0.46" diff --git a/compose.yml b/compose.yml index f3c85fd1..ea1a7501 100644 --- a/compose.yml +++ b/compose.yml @@ -17,6 +17,11 @@ services: timeout: 3s retries: 20 + redis: + image: redis:7-alpine + ports: + - "${REDIS_PORT:-6379}:6379" + backend: build: # Build from repo root so the backend image can include repo-level assets @@ -32,9 +37,12 @@ services: DB_AUTO_MIGRATE: ${DB_AUTO_MIGRATE:-true} AUTH_MODE: ${AUTH_MODE} LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN} + WEBHOOK_REDIS_URL: redis://redis:6379/0 depends_on: db: condition: service_healthy + redis: + condition: service_started ports: - "${BACKEND_PORT:-8000}:8000" @@ -58,5 +66,37 @@ services: ports: - "${FRONTEND_PORT:-3000}:3000" + webhook-worker: + build: + context: . + dockerfile: backend/Dockerfile + command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"] + depends_on: + - redis + - backend + environment: + WEBHOOK_REDIS_URL: redis://redis:6379/0 + WEBHOOK_QUEUE_NAME: webhook-dispatch + restart: unless-stopped + + webhook-dispatch-cron: + build: + context: . + dockerfile: backend/Dockerfile + command: + - sh + - -c + - | + python -c "from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule; bootstrap_webhook_dispatch_schedule()" && \ + rqscheduler -u redis://redis:6379/0 -i 60 + depends_on: + - redis + - webhook-worker + environment: + WEBHOOK_REDIS_URL: redis://redis:6379/0 + WEBHOOK_QUEUE_NAME: webhook-dispatch + WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch + restart: unless-stopped + volumes: postgres_data: