refactor: implement generic queue worker with task-type dispatch and improved retry logic
This commit is contained in:
2
Makefile
2
Makefile
@@ -143,7 +143,7 @@ api-gen: frontend-tooling ## Regenerate TS API client (requires backend running
|
||||
$(NODE_WRAP) --cwd $(FRONTEND_DIR) npm run api:gen
|
||||
|
||||
.PHONY: rq-worker
|
||||
rq-worker: ## Run background webhook queue worker loop
|
||||
rq-worker: ## Run background queue worker loop
|
||||
cd $(BACKEND_DIR) && uv run python ../scripts/rq worker
|
||||
|
||||
.PHONY: backend-templates-sync
|
||||
|
||||
@@ -6,8 +6,6 @@ import asyncio
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from alembic import command
|
||||
from alembic.config import Config
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
|
||||
from sqlmodel import SQLModel
|
||||
@@ -45,8 +43,10 @@ async_session_maker = async_sessionmaker(
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
def _alembic_config() -> Config:
|
||||
def _alembic_config():
|
||||
alembic_ini = Path(__file__).resolve().parents[2] / "alembic.ini"
|
||||
from alembic.config import Config
|
||||
|
||||
alembic_cfg = Config(str(alembic_ini))
|
||||
alembic_cfg.attributes["configure_logger"] = False
|
||||
return alembic_cfg
|
||||
@@ -54,6 +54,8 @@ def _alembic_config() -> Config:
|
||||
|
||||
def run_migrations() -> None:
|
||||
"""Apply Alembic migrations to the latest revision."""
|
||||
from alembic import command
|
||||
|
||||
logger.info("Running database migrations.")
|
||||
command.upgrade(_alembic_config(), "head")
|
||||
logger.info("Database migrations complete.")
|
||||
|
||||
137
backend/app/services/queue_worker.py
Normal file
137
backend/app/services/queue_worker.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""Generic queue worker with task-type dispatch."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
from collections.abc import Awaitable, Callable
|
||||
from dataclasses import dataclass
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
from app.services.queue import QueuedTask, dequeue_task
|
||||
from app.services.webhooks.dispatch import (
|
||||
process_webhook_queue_task,
|
||||
requeue_webhook_queue_task,
|
||||
)
|
||||
from app.services.webhooks.queue import TASK_TYPE as WEBHOOK_TASK_TYPE
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _TaskHandler:
|
||||
handler: Callable[[QueuedTask], Awaitable[None]]
|
||||
attempts_to_delay: Callable[[int], float]
|
||||
requeue: Callable[[QueuedTask, float], bool]
|
||||
|
||||
|
||||
_TASK_HANDLERS: dict[str, _TaskHandler] = {
|
||||
WEBHOOK_TASK_TYPE: _TaskHandler(
|
||||
handler=process_webhook_queue_task,
|
||||
attempts_to_delay=lambda attempts: min(
|
||||
settings.rq_dispatch_retry_base_seconds * (2**max(0, attempts)),
|
||||
settings.rq_dispatch_retry_max_seconds,
|
||||
),
|
||||
requeue=requeue_webhook_queue_task,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def _compute_jitter(base_delay: float) -> float:
|
||||
return random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, base_delay * 0.1))
|
||||
|
||||
|
||||
async def flush_queue(*, block: bool = False, block_timeout: float = 0) -> int:
|
||||
"""Consume one queue batch and dispatch by task type."""
|
||||
processed = 0
|
||||
while True:
|
||||
try:
|
||||
task = dequeue_task(
|
||||
settings.rq_queue_name,
|
||||
redis_url=settings.rq_redis_url,
|
||||
block=block,
|
||||
block_timeout=block_timeout,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"queue.worker.dequeue_failed",
|
||||
extra={"queue_name": settings.rq_queue_name},
|
||||
)
|
||||
continue
|
||||
|
||||
if task is None:
|
||||
break
|
||||
|
||||
handler = _TASK_HANDLERS.get(task.task_type)
|
||||
if handler is None:
|
||||
logger.warning(
|
||||
"queue.worker.task_unhandled",
|
||||
extra={
|
||||
"task_type": task.task_type,
|
||||
"queue_name": settings.rq_queue_name,
|
||||
},
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
await handler.handler(task)
|
||||
processed += 1
|
||||
logger.info(
|
||||
"queue.worker.success",
|
||||
extra={
|
||||
"task_type": task.task_type,
|
||||
"attempt": task.attempts,
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"queue.worker.failed",
|
||||
extra={
|
||||
"task_type": task.task_type,
|
||||
"attempt": task.attempts,
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
base_delay = handler.attempts_to_delay(task.attempts)
|
||||
delay = base_delay + _compute_jitter(base_delay)
|
||||
if not handler.requeue(task, delay_seconds=delay):
|
||||
logger.warning(
|
||||
"queue.worker.drop_task",
|
||||
extra={
|
||||
"task_type": task.task_type,
|
||||
"attempt": task.attempts,
|
||||
},
|
||||
)
|
||||
await asyncio.sleep(settings.rq_dispatch_throttle_seconds)
|
||||
|
||||
if processed > 0:
|
||||
logger.info("queue.worker.batch_complete", extra={"count": processed})
|
||||
return processed
|
||||
|
||||
|
||||
async def _run_worker_loop() -> None:
|
||||
while True:
|
||||
try:
|
||||
await flush_queue(
|
||||
block=True,
|
||||
block_timeout=0,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"queue.worker.loop_failed",
|
||||
extra={"queue_name": settings.rq_queue_name},
|
||||
)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
def run_worker() -> None:
|
||||
"""RQ entrypoint for running continuous queue processing."""
|
||||
logger.info(
|
||||
"queue.worker.batch_started",
|
||||
extra={"throttle_seconds": settings.rq_dispatch_throttle_seconds},
|
||||
)
|
||||
try:
|
||||
asyncio.run(_run_worker_loop())
|
||||
finally:
|
||||
logger.info("queue.worker.stopped", extra={"queue_name": settings.rq_queue_name})
|
||||
@@ -17,9 +17,10 @@ from app.models.board_webhook_payloads import BoardWebhookPayload
|
||||
from app.models.board_webhooks import BoardWebhook
|
||||
from app.models.boards import Board
|
||||
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
|
||||
from app.services.queue import QueuedTask
|
||||
from app.services.webhooks.queue import (
|
||||
QueuedInboundDelivery,
|
||||
dequeue_webhook_delivery,
|
||||
decode_webhook_task,
|
||||
requeue_if_failed,
|
||||
)
|
||||
|
||||
@@ -163,12 +164,31 @@ async def _process_single_item(item: QueuedInboundDelivery) -> None:
|
||||
await session.commit()
|
||||
|
||||
|
||||
def _compute_webhook_retry_delay(attempts: int) -> float:
|
||||
base = settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts))
|
||||
return min(base, settings.rq_dispatch_retry_max_seconds)
|
||||
|
||||
|
||||
def _compute_webhook_retry_jitter(base_delay: float) -> float:
|
||||
return random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, base_delay * 0.1))
|
||||
|
||||
|
||||
async def process_webhook_queue_task(task: QueuedTask) -> None:
|
||||
item = decode_webhook_task(task)
|
||||
await _process_single_item(item)
|
||||
|
||||
|
||||
def requeue_webhook_queue_task(task: QueuedTask, *, delay_seconds: float = 0) -> bool:
|
||||
payload = decode_webhook_task(task)
|
||||
return requeue_if_failed(payload, delay_seconds=delay_seconds)
|
||||
|
||||
|
||||
async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: float = 0) -> int:
|
||||
"""Consume queued webhook events and notify board leads in a throttled batch."""
|
||||
processed = 0
|
||||
while True:
|
||||
try:
|
||||
item = dequeue_webhook_delivery(block=block, block_timeout=block_timeout)
|
||||
item = dequeue_webhook_delivery_task(block=block, block_timeout=block_timeout)
|
||||
except Exception:
|
||||
logger.exception("webhook.dispatch.dequeue_failed")
|
||||
continue
|
||||
@@ -199,11 +219,8 @@ async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: fl
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
delay = min(
|
||||
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, item.attempts)),
|
||||
settings.rq_dispatch_retry_max_seconds,
|
||||
)
|
||||
jitter = random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, delay * 0.1))
|
||||
delay = _compute_webhook_retry_delay(item.attempts)
|
||||
jitter = _compute_webhook_retry_jitter(delay)
|
||||
requeue_if_failed(item, delay_seconds=delay + jitter)
|
||||
await asyncio.sleep(settings.rq_dispatch_throttle_seconds)
|
||||
if processed > 0:
|
||||
@@ -211,6 +228,25 @@ async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: fl
|
||||
return processed
|
||||
|
||||
|
||||
def dequeue_webhook_delivery_task(
|
||||
*,
|
||||
block: bool = False,
|
||||
block_timeout: float = 0,
|
||||
) -> QueuedInboundDelivery | None:
|
||||
"""Pop one queued webhook delivery payload."""
|
||||
from app.services.queue import dequeue_task
|
||||
|
||||
task = dequeue_task(
|
||||
settings.rq_queue_name,
|
||||
redis_url=settings.rq_redis_url,
|
||||
block=block,
|
||||
block_timeout=block_timeout,
|
||||
)
|
||||
if task is None:
|
||||
return None
|
||||
return decode_webhook_task(task)
|
||||
|
||||
|
||||
def run_flush_webhook_delivery_queue() -> None:
|
||||
"""RQ entrypoint for running the async queue flush from worker jobs."""
|
||||
logger.info(
|
||||
|
||||
@@ -40,7 +40,7 @@ def _task_from_payload(payload: QueuedInboundDelivery) -> QueuedTask:
|
||||
)
|
||||
|
||||
|
||||
def _payload_from_task(task: QueuedTask) -> QueuedInboundDelivery:
|
||||
def decode_webhook_task(task: QueuedTask) -> QueuedInboundDelivery:
|
||||
if task.task_type not in {TASK_TYPE, "legacy"}:
|
||||
raise ValueError(f"Unexpected task_type={task.task_type!r}; expected {TASK_TYPE!r}")
|
||||
|
||||
@@ -107,7 +107,7 @@ def dequeue_webhook_delivery(
|
||||
)
|
||||
if task is None:
|
||||
return None
|
||||
return _payload_from_task(task)
|
||||
return decode_webhook_task(task)
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"webhook.queue.dequeue_failed",
|
||||
|
||||
14
scripts/rq
14
scripts/rq
@@ -4,7 +4,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
@@ -13,19 +12,12 @@ sys.path.insert(0, str(ROOT_DIR))
|
||||
BACKEND_ROOT = ROOT_DIR / "backend"
|
||||
sys.path.insert(0, str(BACKEND_ROOT))
|
||||
|
||||
from app.services.webhooks.dispatch import flush_webhook_delivery_queue
|
||||
from app.services.queue_worker import run_worker
|
||||
|
||||
|
||||
def cmd_worker(args: argparse.Namespace) -> int:
|
||||
async def _run_forever() -> None:
|
||||
while True:
|
||||
await flush_webhook_delivery_queue(
|
||||
block=True,
|
||||
block_timeout=0,
|
||||
)
|
||||
|
||||
try:
|
||||
asyncio.run(_run_forever())
|
||||
run_worker()
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
return 0
|
||||
@@ -37,7 +29,7 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
|
||||
worker_parser = subparsers.add_parser(
|
||||
"worker",
|
||||
help="Continuously process queued webhook delivery work.",
|
||||
help="Continuously process queued background work.",
|
||||
)
|
||||
worker_parser.set_defaults(func=cmd_worker)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user