diff --git a/Makefile b/Makefile index cc1056ce..79c66da2 100644 --- a/Makefile +++ b/Makefile @@ -143,7 +143,7 @@ api-gen: frontend-tooling ## Regenerate TS API client (requires backend running $(NODE_WRAP) --cwd $(FRONTEND_DIR) npm run api:gen .PHONY: rq-worker -rq-worker: ## Run background webhook queue worker loop +rq-worker: ## Run background queue worker loop cd $(BACKEND_DIR) && uv run python ../scripts/rq worker .PHONY: backend-templates-sync diff --git a/backend/app/db/session.py b/backend/app/db/session.py index ab22a8e2..2530ff06 100644 --- a/backend/app/db/session.py +++ b/backend/app/db/session.py @@ -6,8 +6,6 @@ import asyncio from pathlib import Path from typing import TYPE_CHECKING -from alembic import command -from alembic.config import Config from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine from sqlmodel import SQLModel @@ -45,8 +43,10 @@ async_session_maker = async_sessionmaker( logger = get_logger(__name__) -def _alembic_config() -> Config: +def _alembic_config(): alembic_ini = Path(__file__).resolve().parents[2] / "alembic.ini" + from alembic.config import Config + alembic_cfg = Config(str(alembic_ini)) alembic_cfg.attributes["configure_logger"] = False return alembic_cfg @@ -54,6 +54,8 @@ def _alembic_config() -> Config: def run_migrations() -> None: """Apply Alembic migrations to the latest revision.""" + from alembic import command + logger.info("Running database migrations.") command.upgrade(_alembic_config(), "head") logger.info("Database migrations complete.") diff --git a/backend/app/services/queue_worker.py b/backend/app/services/queue_worker.py new file mode 100644 index 00000000..d233e9c1 --- /dev/null +++ b/backend/app/services/queue_worker.py @@ -0,0 +1,137 @@ +"""Generic queue worker with task-type dispatch.""" + +from __future__ import annotations + +import asyncio +import random +from collections.abc import Awaitable, Callable +from dataclasses import dataclass + +from app.core.config import settings +from app.core.logging import get_logger +from app.services.queue import QueuedTask, dequeue_task +from app.services.webhooks.dispatch import ( + process_webhook_queue_task, + requeue_webhook_queue_task, +) +from app.services.webhooks.queue import TASK_TYPE as WEBHOOK_TASK_TYPE + +logger = get_logger(__name__) + + +@dataclass(frozen=True) +class _TaskHandler: + handler: Callable[[QueuedTask], Awaitable[None]] + attempts_to_delay: Callable[[int], float] + requeue: Callable[[QueuedTask, float], bool] + + +_TASK_HANDLERS: dict[str, _TaskHandler] = { + WEBHOOK_TASK_TYPE: _TaskHandler( + handler=process_webhook_queue_task, + attempts_to_delay=lambda attempts: min( + settings.rq_dispatch_retry_base_seconds * (2**max(0, attempts)), + settings.rq_dispatch_retry_max_seconds, + ), + requeue=requeue_webhook_queue_task, + ), +} + + +def _compute_jitter(base_delay: float) -> float: + return random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, base_delay * 0.1)) + + +async def flush_queue(*, block: bool = False, block_timeout: float = 0) -> int: + """Consume one queue batch and dispatch by task type.""" + processed = 0 + while True: + try: + task = dequeue_task( + settings.rq_queue_name, + redis_url=settings.rq_redis_url, + block=block, + block_timeout=block_timeout, + ) + except Exception: + logger.exception( + "queue.worker.dequeue_failed", + extra={"queue_name": settings.rq_queue_name}, + ) + continue + + if task is None: + break + + handler = _TASK_HANDLERS.get(task.task_type) + if handler is None: + logger.warning( + "queue.worker.task_unhandled", + extra={ + "task_type": task.task_type, + "queue_name": settings.rq_queue_name, + }, + ) + continue + + try: + await handler.handler(task) + processed += 1 + logger.info( + "queue.worker.success", + extra={ + "task_type": task.task_type, + "attempt": task.attempts, + }, + ) + except Exception as exc: + logger.exception( + "queue.worker.failed", + extra={ + "task_type": task.task_type, + "attempt": task.attempts, + "error": str(exc), + }, + ) + base_delay = handler.attempts_to_delay(task.attempts) + delay = base_delay + _compute_jitter(base_delay) + if not handler.requeue(task, delay_seconds=delay): + logger.warning( + "queue.worker.drop_task", + extra={ + "task_type": task.task_type, + "attempt": task.attempts, + }, + ) + await asyncio.sleep(settings.rq_dispatch_throttle_seconds) + + if processed > 0: + logger.info("queue.worker.batch_complete", extra={"count": processed}) + return processed + + +async def _run_worker_loop() -> None: + while True: + try: + await flush_queue( + block=True, + block_timeout=0, + ) + except Exception: + logger.exception( + "queue.worker.loop_failed", + extra={"queue_name": settings.rq_queue_name}, + ) + await asyncio.sleep(1) + + +def run_worker() -> None: + """RQ entrypoint for running continuous queue processing.""" + logger.info( + "queue.worker.batch_started", + extra={"throttle_seconds": settings.rq_dispatch_throttle_seconds}, + ) + try: + asyncio.run(_run_worker_loop()) + finally: + logger.info("queue.worker.stopped", extra={"queue_name": settings.rq_queue_name}) diff --git a/backend/app/services/webhooks/dispatch.py b/backend/app/services/webhooks/dispatch.py index c4a4b220..2e45e887 100644 --- a/backend/app/services/webhooks/dispatch.py +++ b/backend/app/services/webhooks/dispatch.py @@ -17,9 +17,10 @@ from app.models.board_webhook_payloads import BoardWebhookPayload from app.models.board_webhooks import BoardWebhook from app.models.boards import Board from app.services.openclaw.gateway_dispatch import GatewayDispatchService +from app.services.queue import QueuedTask from app.services.webhooks.queue import ( QueuedInboundDelivery, - dequeue_webhook_delivery, + decode_webhook_task, requeue_if_failed, ) @@ -163,12 +164,31 @@ async def _process_single_item(item: QueuedInboundDelivery) -> None: await session.commit() +def _compute_webhook_retry_delay(attempts: int) -> float: + base = settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)) + return min(base, settings.rq_dispatch_retry_max_seconds) + + +def _compute_webhook_retry_jitter(base_delay: float) -> float: + return random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, base_delay * 0.1)) + + +async def process_webhook_queue_task(task: QueuedTask) -> None: + item = decode_webhook_task(task) + await _process_single_item(item) + + +def requeue_webhook_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool: + payload = decode_webhook_task(task) + return requeue_if_failed(payload, delay_seconds=delay_seconds) + + async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: float = 0) -> int: """Consume queued webhook events and notify board leads in a throttled batch.""" processed = 0 while True: try: - item = dequeue_webhook_delivery(block=block, block_timeout=block_timeout) + item = dequeue_webhook_delivery_task(block=block, block_timeout=block_timeout) except Exception: logger.exception("webhook.dispatch.dequeue_failed") continue @@ -199,11 +219,8 @@ async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: fl "error": str(exc), }, ) - delay = min( - settings.rq_dispatch_retry_base_seconds * (2 ** max(0, item.attempts)), - settings.rq_dispatch_retry_max_seconds, - ) - jitter = random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, delay * 0.1)) + delay = _compute_webhook_retry_delay(item.attempts) + jitter = _compute_webhook_retry_jitter(delay) requeue_if_failed(item, delay_seconds=delay + jitter) await asyncio.sleep(settings.rq_dispatch_throttle_seconds) if processed > 0: @@ -211,6 +228,25 @@ async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: fl return processed +def dequeue_webhook_delivery_task( + *, + block: bool = False, + block_timeout: float = 0, +) -> QueuedInboundDelivery | None: + """Pop one queued webhook delivery payload.""" + from app.services.queue import dequeue_task + + task = dequeue_task( + settings.rq_queue_name, + redis_url=settings.rq_redis_url, + block=block, + block_timeout=block_timeout, + ) + if task is None: + return None + return decode_webhook_task(task) + + def run_flush_webhook_delivery_queue() -> None: """RQ entrypoint for running the async queue flush from worker jobs.""" logger.info( diff --git a/backend/app/services/webhooks/queue.py b/backend/app/services/webhooks/queue.py index 9f0aacd5..13887f62 100644 --- a/backend/app/services/webhooks/queue.py +++ b/backend/app/services/webhooks/queue.py @@ -40,7 +40,7 @@ def _task_from_payload(payload: QueuedInboundDelivery) -> QueuedTask: ) -def _payload_from_task(task: QueuedTask) -> QueuedInboundDelivery: +def decode_webhook_task(task: QueuedTask) -> QueuedInboundDelivery: if task.task_type not in {TASK_TYPE, "legacy"}: raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}") @@ -107,7 +107,7 @@ def dequeue_webhook_delivery( ) if task is None: return None - return _payload_from_task(task) + return decode_webhook_task(task) except Exception as exc: logger.error( "webhook.queue.dequeue_failed", diff --git a/scripts/rq b/scripts/rq index 2d13541a..36a3fc08 100755 --- a/scripts/rq +++ b/scripts/rq @@ -4,7 +4,6 @@ from __future__ import annotations import argparse -import asyncio import sys from pathlib import Path @@ -13,19 +12,12 @@ sys.path.insert(0, str(ROOT_DIR)) BACKEND_ROOT = ROOT_DIR / "backend" sys.path.insert(0, str(BACKEND_ROOT)) -from app.services.webhooks.dispatch import flush_webhook_delivery_queue +from app.services.queue_worker import run_worker def cmd_worker(args: argparse.Namespace) -> int: - async def _run_forever() -> None: - while True: - await flush_webhook_delivery_queue( - block=True, - block_timeout=0, - ) - try: - asyncio.run(_run_forever()) + run_worker() except KeyboardInterrupt: return 0 return 0 @@ -37,7 +29,7 @@ def build_parser() -> argparse.ArgumentParser: worker_parser = subparsers.add_parser( "worker", - help="Continuously process queued webhook delivery work.", + help="Continuously process queued background work.", ) worker_parser.set_defaults(func=cmd_worker)