From e28496245bc4b50f9a30ba18fb81abde61a4e512 Mon Sep 17 00:00:00 2001 From: Abhimanyu Saharan Date: Sun, 15 Feb 2026 12:48:08 +0530 Subject: [PATCH] refactor: enhance webhook delivery queue processing with configurable blocking --- Makefile | 7 +++ backend/app/services/queue.py | 24 ++++++++-- backend/app/services/webhooks/dispatch.py | 8 ++-- backend/app/services/webhooks/queue.py | 13 +++++- scripts/rq | 43 +++++++++++++++++ scripts/rq_webhook.py | 57 +++++++++++++++++++++++ 6 files changed, 143 insertions(+), 9 deletions(-) create mode 100755 scripts/rq create mode 100644 scripts/rq_webhook.py diff --git a/Makefile b/Makefile index b126da29..389f7ed5 100644 --- a/Makefile +++ b/Makefile @@ -142,6 +142,13 @@ frontend-build: frontend-tooling ## Build frontend (next build) api-gen: frontend-tooling ## Regenerate TS API client (requires backend running at 127.0.0.1:8000) $(NODE_WRAP) --cwd $(FRONTEND_DIR) npm run api:gen +WEBHOOK_WORKER_INTERVAL_SECONDS ?= 2 + +.PHONY: rq-worker +rq-worker: ## Run background queue worker loop (optional WEBHOOK_WORKER_INTERVAL_SECONDS) + cd $(BACKEND_DIR) && uv run python ../scripts/rq worker \ + --interval-seconds "$(WEBHOOK_WORKER_INTERVAL_SECONDS)" + .PHONY: backend-templates-sync backend-templates-sync: ## Sync templates to existing gateway agents (usage: make backend-templates-sync GATEWAY_ID= SYNC_ARGS="--reset-sessions") @if [ -z "$(GATEWAY_ID)" ]; then echo "GATEWAY_ID is required (uuid)"; exit 1; fi diff --git a/backend/app/services/queue.py b/backend/app/services/queue.py index 37105658..0f3ab1a9 100644 --- a/backend/app/services/queue.py +++ b/backend/app/services/queue.py @@ -78,12 +78,28 @@ def _coerce_datetime(raw: object | None) -> datetime: return datetime.now(UTC) -def dequeue_task(queue_name: str, *, redis_url: str | None = None) -> QueuedTask | None: +def dequeue_task( + queue_name: str, + *, + redis_url: str | None = None, + block: bool = False, + block_timeout: float = 0, +) -> QueuedTask | None: """Pop one task envelope from the queue.""" client = _redis_client(redis_url=redis_url) - raw = cast(str | bytes | None, client.rpop(queue_name)) - if raw is None: - return None + if block: + raw = cast(tuple[bytes | str, bytes | str] | None, client.brpop(queue_name, timeout=block_timeout)) + if raw is None: + return None + raw = raw[1] + else: + raw = cast(str | bytes | None, client.rpop(queue_name)) + if raw is None: + return None + return _decode_task(raw, queue_name) + + +def _decode_task(raw: str | bytes, queue_name: str) -> QueuedTask: if isinstance(raw, bytes): raw = raw.decode("utf-8") diff --git a/backend/app/services/webhooks/dispatch.py b/backend/app/services/webhooks/dispatch.py index 9682975e..dc4611c5 100644 --- a/backend/app/services/webhooks/dispatch.py +++ b/backend/app/services/webhooks/dispatch.py @@ -162,12 +162,12 @@ async def _process_single_item(item: QueuedInboundDelivery) -> None: await session.commit() -async def flush_webhook_delivery_queue() -> None: +async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: float = 0) -> int: """Consume queued webhook events and notify board leads in a throttled batch.""" processed = 0 while True: try: - item = dequeue_webhook_delivery() + item = dequeue_webhook_delivery(block=block, block_timeout=block_timeout) except Exception: logger.exception("webhook.dispatch.dequeue_failed") continue @@ -200,7 +200,9 @@ async def flush_webhook_delivery_queue() -> None: ) requeue_if_failed(item) time.sleep(settings.rq_dispatch_throttle_seconds) - logger.info("webhook.dispatch.batch_complete", extra={"count": processed}) + if processed > 0: + logger.info("webhook.dispatch.batch_complete", extra={"count": processed}) + return processed def run_flush_webhook_delivery_queue() -> None: diff --git a/backend/app/services/webhooks/queue.py b/backend/app/services/webhooks/queue.py index f978c607..4ef37880 100644 --- a/backend/app/services/webhooks/queue.py +++ b/backend/app/services/webhooks/queue.py @@ -92,10 +92,19 @@ def enqueue_webhook_delivery(payload: QueuedInboundDelivery) -> bool: return False -def dequeue_webhook_delivery() -> QueuedInboundDelivery | None: +def dequeue_webhook_delivery( + *, + block: bool = False, + block_timeout: float = 0, +) -> QueuedInboundDelivery | None: """Pop one queued webhook delivery payload.""" try: - task = dequeue_task(settings.rq_queue_name, redis_url=settings.rq_redis_url) + task = dequeue_task( + settings.rq_queue_name, + redis_url=settings.rq_redis_url, + block=block, + block_timeout=block_timeout, + ) if task is None: return None return _payload_from_task(task) diff --git a/scripts/rq b/scripts/rq new file mode 100755 index 00000000..3b576e29 --- /dev/null +++ b/scripts/rq @@ -0,0 +1,43 @@ +#!/usr/bin/env python +"""Top-level RQ helper entrypoint.""" + +from __future__ import annotations + +import argparse +import os +import sys +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT_DIR)) + +from scripts.rq_webhook import cmd_worker + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Generic background worker helpers.") + subparsers = parser.add_subparsers(dest="command", required=True) + + worker_parser = subparsers.add_parser( + "worker", + help="Run background queue worker loop.", + ) + worker_parser.add_argument( + "--interval-seconds", + type=float, + default=float(os.environ.get("WEBHOOK_WORKER_INTERVAL_SECONDS", "2")), + help="Seconds to wait for queue work before returning when idle.", + ) + worker_parser.set_defaults(func=cmd_worker) + + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + sys.exit(args.func(args)) + + +if __name__ == "__main__": + main() diff --git a/scripts/rq_webhook.py b/scripts/rq_webhook.py new file mode 100644 index 00000000..17c2b502 --- /dev/null +++ b/scripts/rq_webhook.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +"""RQ helpers for background queue processing.""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parent.parent +BACKEND_ROOT = ROOT_DIR / "backend" +sys.path.insert(0, str(BACKEND_ROOT)) + +from app.services.webhooks.dispatch import flush_webhook_delivery_queue + + +def cmd_worker(args: argparse.Namespace) -> int: + interval_seconds = max(args.interval_seconds, 0.0) + try: + while True: + asyncio.run( + flush_webhook_delivery_queue( + block=True, + block_timeout=interval_seconds, + ), + ) + except KeyboardInterrupt: + return 0 + return 0 + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Background RQ helper commands.") + subparsers = parser.add_subparsers(dest="command", required=True) + + worker_parser = subparsers.add_parser("worker", help="Continuously process queued work.") + worker_parser.add_argument( + "--interval-seconds", + type=float, + default=float(os.environ.get("WEBHOOK_WORKER_INTERVAL_SECONDS", "2")), + help="Seconds to wait for queue work before returning when idle.", + ) + worker_parser.set_defaults(func=cmd_worker) + + return parser + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + sys.exit(args.func(args)) + + +if __name__ == "__main__": + main()