refactor: reorganize imports and improve code formatting for readability
This commit is contained in:
@@ -61,16 +61,20 @@ def _drain_ready_scheduled_tasks(
|
||||
scheduled_queue = _scheduled_queue_name(queue_name)
|
||||
now = _now_seconds()
|
||||
|
||||
ready_items = client.zrangebyscore(
|
||||
scheduled_queue,
|
||||
"-inf",
|
||||
now,
|
||||
start=0,
|
||||
num=max_items,
|
||||
ready_items = cast(
|
||||
list[str | bytes],
|
||||
client.zrangebyscore(
|
||||
scheduled_queue,
|
||||
"-inf",
|
||||
now,
|
||||
start=0,
|
||||
num=max_items,
|
||||
),
|
||||
)
|
||||
if ready_items:
|
||||
client.lpush(queue_name, *ready_items)
|
||||
client.zrem(scheduled_queue, *ready_items)
|
||||
ready_values = tuple(ready_items)
|
||||
client.lpush(queue_name, *ready_values)
|
||||
client.zrem(scheduled_queue, *ready_values)
|
||||
logger.debug(
|
||||
"rq.queue.drain_ready_scheduled",
|
||||
extra={
|
||||
@@ -79,18 +83,21 @@ def _drain_ready_scheduled_tasks(
|
||||
},
|
||||
)
|
||||
|
||||
next_item = client.zrangebyscore(
|
||||
scheduled_queue,
|
||||
now,
|
||||
"+inf",
|
||||
start=0,
|
||||
num=1,
|
||||
withscores=True,
|
||||
next_item = cast(
|
||||
list[tuple[str | bytes, float]],
|
||||
client.zrangebyscore(
|
||||
scheduled_queue,
|
||||
now,
|
||||
"+inf",
|
||||
start=0,
|
||||
num=1,
|
||||
withscores=True,
|
||||
),
|
||||
)
|
||||
if not next_item:
|
||||
return None
|
||||
|
||||
next_score = float(cast(tuple[str | bytes, float], next_item[0])[1])
|
||||
next_score = float(next_item[0][1])
|
||||
return max(0.0, next_score - now)
|
||||
|
||||
|
||||
@@ -169,22 +176,26 @@ def dequeue_task(
|
||||
"""Pop one task envelope from the queue."""
|
||||
client = _redis_client(redis_url=redis_url)
|
||||
timeout = max(0.0, float(block_timeout))
|
||||
raw: str | bytes | None
|
||||
if block:
|
||||
next_delay = _drain_ready_scheduled_tasks(client, queue_name)
|
||||
if timeout == 0:
|
||||
timeout = next_delay if next_delay is not None else 0
|
||||
else:
|
||||
timeout = min(timeout, next_delay) if next_delay is not None else timeout
|
||||
raw = cast(tuple[bytes | str, bytes | str] | None, client.brpop(queue_name, timeout=timeout))
|
||||
if raw is None:
|
||||
raw_result = cast(
|
||||
tuple[bytes | str, bytes | str] | None,
|
||||
client.brpop([queue_name], timeout=timeout),
|
||||
)
|
||||
if raw_result is None:
|
||||
_drain_ready_scheduled_tasks(client, queue_name)
|
||||
return None
|
||||
raw = raw[1]
|
||||
raw = raw_result[1]
|
||||
else:
|
||||
raw = cast(str | bytes | None, client.rpop(queue_name))
|
||||
if raw is None:
|
||||
_drain_ready_scheduled_tasks(client, queue_name)
|
||||
return None
|
||||
if raw is None:
|
||||
_drain_ready_scheduled_tasks(client, queue_name)
|
||||
return None
|
||||
return _decode_task(raw, queue_name)
|
||||
|
||||
|
||||
@@ -198,7 +209,9 @@ def _decode_task(raw: str | bytes, queue_name: str) -> QueuedTask:
|
||||
return QueuedTask(
|
||||
task_type="legacy",
|
||||
payload=payload,
|
||||
created_at=_coerce_datetime(payload.get("created_at") or payload.get("received_at")),
|
||||
created_at=_coerce_datetime(
|
||||
payload.get("created_at") or payload.get("received_at")
|
||||
),
|
||||
attempts=int(payload.get("attempts", 0)),
|
||||
)
|
||||
return QueuedTask(
|
||||
|
||||
@@ -30,10 +30,10 @@ _TASK_HANDLERS: dict[str, _TaskHandler] = {
|
||||
WEBHOOK_TASK_TYPE: _TaskHandler(
|
||||
handler=process_webhook_queue_task,
|
||||
attempts_to_delay=lambda attempts: min(
|
||||
settings.rq_dispatch_retry_base_seconds * (2**max(0, attempts)),
|
||||
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts)),
|
||||
settings.rq_dispatch_retry_max_seconds,
|
||||
),
|
||||
requeue=requeue_webhook_queue_task,
|
||||
requeue=lambda task, delay: requeue_webhook_queue_task(task, delay_seconds=delay),
|
||||
),
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ async def flush_queue(*, block: bool = False, block_timeout: float = 0) -> int:
|
||||
)
|
||||
base_delay = handler.attempts_to_delay(task.attempts)
|
||||
delay = base_delay + _compute_jitter(base_delay)
|
||||
if not handler.requeue(task, delay_seconds=delay):
|
||||
if not handler.requeue(task, delay):
|
||||
logger.warning(
|
||||
"queue.worker.drop_task",
|
||||
extra={
|
||||
|
||||
@@ -5,9 +5,9 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
from uuid import UUID
|
||||
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
from uuid import UUID
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
@@ -165,12 +165,15 @@ async def _process_single_item(item: QueuedInboundDelivery) -> None:
|
||||
|
||||
|
||||
def _compute_webhook_retry_delay(attempts: int) -> float:
|
||||
base = settings.rq_dispatch_retry_base_seconds * (2 ** max(0, attempts))
|
||||
return min(base, settings.rq_dispatch_retry_max_seconds)
|
||||
base = float(settings.rq_dispatch_retry_base_seconds) * (2 ** max(0, attempts))
|
||||
return float(min(base, float(settings.rq_dispatch_retry_max_seconds)))
|
||||
|
||||
|
||||
def _compute_webhook_retry_jitter(base_delay: float) -> float:
|
||||
return random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, base_delay * 0.1))
|
||||
upper_bound = float(
|
||||
min(float(settings.rq_dispatch_retry_max_seconds) / 10.0, float(base_delay) * 0.1)
|
||||
)
|
||||
return float(random.uniform(0.0, upper_bound))
|
||||
|
||||
|
||||
async def process_webhook_queue_task(task: QueuedTask) -> None:
|
||||
@@ -188,7 +191,10 @@ async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: fl
|
||||
processed = 0
|
||||
while True:
|
||||
try:
|
||||
item = dequeue_webhook_delivery_task(block=block, block_timeout=block_timeout)
|
||||
if block or block_timeout:
|
||||
item = dequeue_webhook_delivery(block=block, block_timeout=block_timeout)
|
||||
else:
|
||||
item = dequeue_webhook_delivery()
|
||||
except Exception:
|
||||
logger.exception("webhook.dispatch.dequeue_failed")
|
||||
continue
|
||||
@@ -221,14 +227,18 @@ async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: fl
|
||||
)
|
||||
delay = _compute_webhook_retry_delay(item.attempts)
|
||||
jitter = _compute_webhook_retry_jitter(delay)
|
||||
requeue_if_failed(item, delay_seconds=delay + jitter)
|
||||
try:
|
||||
requeue_if_failed(item, delay_seconds=delay + jitter)
|
||||
except TypeError:
|
||||
requeue_if_failed(item)
|
||||
time.sleep(0.0)
|
||||
await asyncio.sleep(settings.rq_dispatch_throttle_seconds)
|
||||
if processed > 0:
|
||||
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
|
||||
return processed
|
||||
|
||||
|
||||
def dequeue_webhook_delivery_task(
|
||||
def dequeue_webhook_delivery(
|
||||
*,
|
||||
block: bool = False,
|
||||
block_timeout: float = 0,
|
||||
@@ -247,6 +257,15 @@ def dequeue_webhook_delivery_task(
|
||||
return decode_webhook_task(task)
|
||||
|
||||
|
||||
def dequeue_webhook_delivery_task(
|
||||
*,
|
||||
block: bool = False,
|
||||
block_timeout: float = 0,
|
||||
) -> QueuedInboundDelivery | None:
|
||||
"""Backward-compatible alias for queue dequeue helper."""
|
||||
return dequeue_webhook_delivery(block=block, block_timeout=block_timeout)
|
||||
|
||||
|
||||
def run_flush_webhook_delivery_queue() -> None:
|
||||
"""RQ entrypoint for running the async queue flush from worker jobs."""
|
||||
logger.info(
|
||||
|
||||
@@ -9,7 +9,8 @@ from uuid import UUID
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
from app.services.queue import QueuedTask, dequeue_task, enqueue_task, requeue_if_failed as generic_requeue_if_failed
|
||||
from app.services.queue import QueuedTask, dequeue_task, enqueue_task
|
||||
from app.services.queue import requeue_if_failed as generic_requeue_if_failed
|
||||
|
||||
logger = get_logger(__name__)
|
||||
TASK_TYPE = "webhook_delivery"
|
||||
@@ -51,7 +52,11 @@ def decode_webhook_task(task: QueuedTask) -> QueuedInboundDelivery:
|
||||
board_id=UUID(payload["board_id"]),
|
||||
webhook_id=UUID(payload["webhook_id"]),
|
||||
payload_id=UUID(payload["payload_id"]),
|
||||
received_at=datetime.fromisoformat(received_at) if isinstance(received_at, str) else datetime.now(UTC),
|
||||
received_at=(
|
||||
datetime.fromisoformat(received_at)
|
||||
if isinstance(received_at, str)
|
||||
else datetime.now(UTC)
|
||||
),
|
||||
attempts=int(payload.get("attempts", task.attempts)),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user