Files
openclaw-mission-control/backend/app/services/queue_worker.py

153 lines
4.9 KiB
Python

"""Generic queue worker with task-type dispatch."""
from __future__ import annotations
import asyncio
import random
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from app.core.config import settings
from app.core.logging import get_logger
from app.services.openclaw.lifecycle_queue import TASK_TYPE as LIFECYCLE_RECONCILE_TASK_TYPE
from app.services.openclaw.lifecycle_queue import (
requeue_lifecycle_queue_task,
)
from app.services.openclaw.lifecycle_reconcile import process_lifecycle_queue_task
from app.services.queue import QueuedTask, dequeue_task
from app.services.webhooks.dispatch import (
process_webhook_queue_task,
requeue_webhook_queue_task,
)
from app.services.webhooks.queue import TASK_TYPE as WEBHOOK_TASK_TYPE
logger = get_logger(__name__)
_WORKER_BLOCK_TIMEOUT_SECONDS = 5.0
@dataclass(frozen=True)
class _TaskHandler:
handler: Callable[[QueuedTask], Awaitable[None]]
attempts_to_delay: Callable[[int], float]
requeue: Callable[[QueuedTask, float], bool]
_TASK_HANDLERS: dict[str, _TaskHandler] = {
LIFECYCLE_RECONCILE_TASK_TYPE: _TaskHandler(
handler=process_lifecycle_queue_task,
attempts_to_delay=lambda attempts: min(
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)),
settings.rq_dispatch_retry_max_seconds,
),
requeue=lambda task, delay: requeue_lifecycle_queue_task(task, delay_seconds=delay),
),
WEBHOOK_TASK_TYPE: _TaskHandler(
handler=process_webhook_queue_task,
attempts_to_delay=lambda attempts: min(
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)),
settings.rq_dispatch_retry_max_seconds,
),
requeue=lambda task, delay: requeue_webhook_queue_task(task, delay_seconds=delay),
),
}
def _compute_jitter(base_delay: float) -> float:
return random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, base_delay * 0.1))
async def flush_queue(*, block: bool = False, block_timeout: float = 0) -> int:
"""Consume one queue batch and dispatch by task type."""
processed = 0
while True:
try:
task = dequeue_task(
settings.rq_queue_name,
redis_url=settings.rq_redis_url,
block=block,
block_timeout=block_timeout,
)
except Exception:
logger.exception(
"queue.worker.dequeue_failed",
extra={"queue_name": settings.rq_queue_name},
)
continue
if task is None:
break
handler = _TASK_HANDLERS.get(task.task_type)
if handler is None:
logger.warning(
"queue.worker.task_unhandled",
extra={
"task_type": task.task_type,
"queue_name": settings.rq_queue_name,
},
)
continue
try:
await handler.handler(task)
processed += 1
logger.info(
"queue.worker.success",
extra={
"task_type": task.task_type,
"attempt": task.attempts,
},
)
except Exception as exc:
logger.exception(
"queue.worker.failed",
extra={
"task_type": task.task_type,
"attempt": task.attempts,
"error": str(exc),
},
)
base_delay = handler.attempts_to_delay(task.attempts)
delay = base_delay + _compute_jitter(base_delay)
if not handler.requeue(task, delay):
logger.warning(
"queue.worker.drop_task",
extra={
"task_type": task.task_type,
"attempt": task.attempts,
},
)
await asyncio.sleep(settings.rq_dispatch_throttle_seconds)
if processed > 0:
logger.info("queue.worker.batch_complete", extra={"count": processed})
return processed
async def _run_worker_loop() -> None:
while True:
try:
await flush_queue(
block=True,
# Keep a finite timeout so scheduled tasks are periodically drained.
block_timeout=_WORKER_BLOCK_TIMEOUT_SECONDS,
)
except Exception:
logger.exception(
"queue.worker.loop_failed",
extra={"queue_name": settings.rq_queue_name},
)
await asyncio.sleep(1)
def run_worker() -> None:
"""RQ entrypoint for running continuous queue processing."""
logger.info(
"queue.worker.batch_started",
extra={"throttle_seconds": settings.rq_dispatch_throttle_seconds},
)
try:
asyncio.run(_run_worker_loop())
finally:
logger.info("queue.worker.stopped", extra={"queue_name": settings.rq_queue_name})