diff --git a/Makefile b/Makefile index 389f7ed5..cc1056ce 100644 --- a/Makefile +++ b/Makefile @@ -142,12 +142,9 @@ frontend-build: frontend-tooling ## Build frontend (next build) api-gen: frontend-tooling ## Regenerate TS API client (requires backend running at 127.0.0.1:8000) $(NODE_WRAP) --cwd $(FRONTEND_DIR) npm run api:gen -WEBHOOK_WORKER_INTERVAL_SECONDS ?= 2 - .PHONY: rq-worker -rq-worker: ## Run background queue worker loop (optional WEBHOOK_WORKER_INTERVAL_SECONDS) - cd $(BACKEND_DIR) && uv run python ../scripts/rq worker \ - --interval-seconds "$(WEBHOOK_WORKER_INTERVAL_SECONDS)" +rq-worker: ## Run background webhook queue worker loop + cd $(BACKEND_DIR) && uv run python ../scripts/rq worker .PHONY: backend-templates-sync backend-templates-sync: ## Sync templates to existing gateway agents (usage: make backend-templates-sync GATEWAY_ID= SYNC_ARGS="--reset-sessions") diff --git a/backend/app/api/board_webhooks.py b/backend/app/api/board_webhooks.py index 1c0b3490..90f065ae 100644 --- a/backend/app/api/board_webhooks.py +++ b/backend/app/api/board_webhooks.py @@ -10,6 +10,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request, status from sqlmodel import col, select from app.api.deps import get_board_for_user_read, get_board_for_user_write, get_board_or_404 +from app.core.logging import get_logger from app.core.config import settings from app.core.time import utcnow from app.db import crud @@ -44,6 +45,7 @@ SESSION_DEP = Depends(get_session) BOARD_USER_READ_DEP = Depends(get_board_for_user_read) BOARD_USER_WRITE_DEP = Depends(get_board_for_user_write) BOARD_OR_404_DEP = Depends(get_board_or_404) +logger = get_logger(__name__) def _webhook_endpoint_path(board_id: UUID, webhook_id: UUID) -> str: @@ -403,6 +405,15 @@ async def ingest_board_webhook( board_id=board.id, webhook_id=webhook_id, ) + logger.info( + "webhook.ingest.received", + extra={ + "board_id": str(board.id), + "webhook_id": str(webhook.id), + "source_ip": request.client.host if request.client else None, + "content_type": request.headers.get("content-type"), + }, + ) if not webhook.enabled: raise HTTPException( status_code=status.HTTP_410_GONE, @@ -437,6 +448,15 @@ async def ingest_board_webhook( ) session.add(memory) await session.commit() + logger.info( + "webhook.ingest.persisted", + extra={ + "payload_id": str(payload.id), + "board_id": str(board.id), + "webhook_id": str(webhook.id), + "memory_id": str(memory.id), + }, + ) enqueued = enqueue_webhook_delivery( QueuedInboundDelivery( @@ -446,6 +466,15 @@ async def ingest_board_webhook( received_at=payload.received_at, ), ) + logger.info( + "webhook.ingest.enqueued", + extra={ + "payload_id": str(payload.id), + "board_id": str(board.id), + "webhook_id": str(webhook.id), + "enqueued": enqueued, + }, + ) if not enqueued: # Preserve historical behavior by still notifying synchronously if queueing fails. await _notify_lead_on_webhook_payload( diff --git a/backend/app/core/config.py b/backend/app/core/config.py index e78bee95..ecb5f76a 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -58,6 +58,8 @@ class Settings(BaseSettings): rq_queue_name: str = "default" rq_dispatch_throttle_seconds: float = 2.0 rq_dispatch_max_retries: int = 3 + rq_dispatch_retry_base_seconds: float = 10.0 + rq_dispatch_retry_max_seconds: float = 120.0 # Logging log_level: str = "INFO" diff --git a/backend/app/services/queue.py b/backend/app/services/queue.py index 0f3ab1a9..2b794dde 100644 --- a/backend/app/services/queue.py +++ b/backend/app/services/queue.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import time from dataclasses import dataclass from datetime import UTC, datetime from typing import Any, cast @@ -14,6 +15,9 @@ from app.core.logging import get_logger logger = get_logger(__name__) +_SCHEDULED_SUFFIX = ":scheduled" +_DRY_RUN_BATCH_SIZE = 100 + @dataclass(frozen=True) class QueuedTask: @@ -40,7 +44,84 @@ def _redis_client(redis_url: str | None = None) -> redis.Redis: return redis.Redis.from_url(redis_url or settings.rq_redis_url) -def enqueue_task(task: QueuedTask, queue_name: str, *, redis_url: str | None = None) -> bool: +def _scheduled_queue_name(queue_name: str) -> str: + return f"{queue_name}{_SCHEDULED_SUFFIX}" + + +def _now_seconds() -> float: + return time.time() + + +def _drain_ready_scheduled_tasks( + client: redis.Redis, + queue_name: str, + *, + max_items: int = _DRY_RUN_BATCH_SIZE, +) -> float | None: + scheduled_queue = _scheduled_queue_name(queue_name) + now = _now_seconds() + + ready_items = client.zrangebyscore( + scheduled_queue, + "-inf", + now, + start=0, + num=max_items, + ) + if ready_items: + client.lpush(queue_name, *ready_items) + client.zrem(scheduled_queue, *ready_items) + logger.debug( + "rq.queue.drain_ready_scheduled", + extra={ + "queue_name": queue_name, + "count": len(ready_items), + }, + ) + + next_item = client.zrangebyscore( + scheduled_queue, + now, + "+inf", + start=0, + num=1, + withscores=True, + ) + if not next_item: + return None + + next_score = float(cast(tuple[str | bytes, float], next_item[0])[1]) + return max(0.0, next_score - now) + + +def _schedule_for_later( + task: QueuedTask, + queue_name: str, + delay_seconds: float, + *, + redis_url: str | None = None, +) -> bool: + client = _redis_client(redis_url=redis_url) + scheduled_queue = _scheduled_queue_name(queue_name) + score = _now_seconds() + delay_seconds + client.zadd(scheduled_queue, {task.to_json(): score}) + logger.info( + "rq.queue.scheduled", + extra={ + "task_type": task.task_type, + "queue_name": queue_name, + "delay_seconds": delay_seconds, + }, + ) + return True + + +def enqueue_task( + task: QueuedTask, + queue_name: str, + *, + redis_url: str | None = None, +) -> bool: """Persist a task envelope in a Redis list-backed queue.""" try: client = _redis_client(redis_url=redis_url) @@ -87,14 +168,22 @@ def dequeue_task( ) -> QueuedTask | None: """Pop one task envelope from the queue.""" client = _redis_client(redis_url=redis_url) + timeout = max(0.0, float(block_timeout)) if block: - raw = cast(tuple[bytes | str, bytes | str] | None, client.brpop(queue_name, timeout=block_timeout)) + next_delay = _drain_ready_scheduled_tasks(client, queue_name) + if timeout == 0: + timeout = next_delay if next_delay is not None else 0 + else: + timeout = min(timeout, next_delay) if next_delay is not None else timeout + raw = cast(tuple[bytes | str, bytes | str] | None, client.brpop(queue_name, timeout=timeout)) if raw is None: + _drain_ready_scheduled_tasks(client, queue_name) return None raw = raw[1] else: raw = cast(str | bytes | None, client.rpop(queue_name)) if raw is None: + _drain_ready_scheduled_tasks(client, queue_name) return None return _decode_task(raw, queue_name) @@ -141,19 +230,32 @@ def requeue_if_failed( *, max_retries: int, redis_url: str | None = None, + delay_seconds: float = 0, ) -> bool: """Requeue a failed task with capped retries. Returns True if requeued. """ - if task.attempts >= max_retries: + requeued_task = _requeue_with_attempt(task) + if requeued_task.attempts > max_retries: logger.warning( "rq.queue.drop_failed_task", extra={ "task_type": task.task_type, "queue_name": queue_name, - "attempts": task.attempts, + "attempts": requeued_task.attempts, }, ) return False - return enqueue_task(_requeue_with_attempt(task), queue_name, redis_url=redis_url) + if delay_seconds > 0: + return _schedule_for_later( + requeued_task, + queue_name, + delay_seconds, + redis_url=redis_url, + ) + return enqueue_task( + requeued_task, + queue_name, + redis_url=redis_url, + ) diff --git a/backend/app/services/webhooks/dispatch.py b/backend/app/services/webhooks/dispatch.py index dc4611c5..c4a4b220 100644 --- a/backend/app/services/webhooks/dispatch.py +++ b/backend/app/services/webhooks/dispatch.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import random import time from sqlmodel.ext.asyncio.session import AsyncSession @@ -198,8 +199,13 @@ async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: fl "error": str(exc), }, ) - requeue_if_failed(item) - time.sleep(settings.rq_dispatch_throttle_seconds) + delay = min( + settings.rq_dispatch_retry_base_seconds * (2 ** max(0, item.attempts)), + settings.rq_dispatch_retry_max_seconds, + ) + jitter = random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, delay * 0.1)) + requeue_if_failed(item, delay_seconds=delay + jitter) + await asyncio.sleep(settings.rq_dispatch_throttle_seconds) if processed > 0: logger.info("webhook.dispatch.batch_complete", extra={"count": processed}) return processed diff --git a/backend/app/services/webhooks/queue.py b/backend/app/services/webhooks/queue.py index 4ef37880..9f0aacd5 100644 --- a/backend/app/services/webhooks/queue.py +++ b/backend/app/services/webhooks/queue.py @@ -119,7 +119,11 @@ def dequeue_webhook_delivery( raise -def requeue_if_failed(payload: QueuedInboundDelivery) -> bool: +def requeue_if_failed( + payload: QueuedInboundDelivery, + *, + delay_seconds: float = 0, +) -> bool: """Requeue payload delivery with capped retries. Returns True if requeued. @@ -130,6 +134,7 @@ def requeue_if_failed(payload: QueuedInboundDelivery) -> bool: settings.rq_queue_name, max_retries=settings.rq_dispatch_max_retries, redis_url=settings.rq_redis_url, + delay_seconds=delay_seconds, ) except Exception as exc: logger.warning( diff --git a/scripts/rq b/scripts/rq index 3b576e29..2d13541a 100755 --- a/scripts/rq +++ b/scripts/rq @@ -1,32 +1,43 @@ #!/usr/bin/env python -"""Top-level RQ helper entrypoint.""" +"""RQ worker entrypoint.""" from __future__ import annotations import argparse -import os +import asyncio import sys from pathlib import Path ROOT_DIR = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT_DIR)) +BACKEND_ROOT = ROOT_DIR / "backend" +sys.path.insert(0, str(BACKEND_ROOT)) -from scripts.rq_webhook import cmd_worker +from app.services.webhooks.dispatch import flush_webhook_delivery_queue + + +def cmd_worker(args: argparse.Namespace) -> int: + async def _run_forever() -> None: + while True: + await flush_webhook_delivery_queue( + block=True, + block_timeout=0, + ) + + try: + asyncio.run(_run_forever()) + except KeyboardInterrupt: + return 0 + return 0 def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Generic background worker helpers.") + parser = argparse.ArgumentParser(description="RQ background worker helpers.") subparsers = parser.add_subparsers(dest="command", required=True) worker_parser = subparsers.add_parser( "worker", - help="Run background queue worker loop.", - ) - worker_parser.add_argument( - "--interval-seconds", - type=float, - default=float(os.environ.get("WEBHOOK_WORKER_INTERVAL_SECONDS", "2")), - help="Seconds to wait for queue work before returning when idle.", + help="Continuously process queued webhook delivery work.", ) worker_parser.set_defaults(func=cmd_worker) diff --git a/scripts/rq_webhook.py b/scripts/rq_webhook.py deleted file mode 100644 index 17c2b502..00000000 --- a/scripts/rq_webhook.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python -"""RQ helpers for background queue processing.""" - -from __future__ import annotations - -import argparse -import asyncio -import os -import sys -from pathlib import Path - -ROOT_DIR = Path(__file__).resolve().parent.parent -BACKEND_ROOT = ROOT_DIR / "backend" -sys.path.insert(0, str(BACKEND_ROOT)) - -from app.services.webhooks.dispatch import flush_webhook_delivery_queue - - -def cmd_worker(args: argparse.Namespace) -> int: - interval_seconds = max(args.interval_seconds, 0.0) - try: - while True: - asyncio.run( - flush_webhook_delivery_queue( - block=True, - block_timeout=interval_seconds, - ), - ) - except KeyboardInterrupt: - return 0 - return 0 - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Background RQ helper commands.") - subparsers = parser.add_subparsers(dest="command", required=True) - - worker_parser = subparsers.add_parser("worker", help="Continuously process queued work.") - worker_parser.add_argument( - "--interval-seconds", - type=float, - default=float(os.environ.get("WEBHOOK_WORKER_INTERVAL_SECONDS", "2")), - help="Seconds to wait for queue work before returning when idle.", - ) - worker_parser.set_defaults(func=cmd_worker) - - return parser - - -def main() -> None: - parser = build_parser() - args = parser.parse_args() - sys.exit(args.func(args)) - - -if __name__ == "__main__": - main()