refactor: improve webhook processing with enhanced logging and retry mechanisms
This commit is contained in:
@@ -10,6 +10,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request, status
|
||||
from sqlmodel import col, select
|
||||
|
||||
from app.api.deps import get_board_for_user_read, get_board_for_user_write, get_board_or_404
|
||||
from app.core.logging import get_logger
|
||||
from app.core.config import settings
|
||||
from app.core.time import utcnow
|
||||
from app.db import crud
|
||||
@@ -44,6 +45,7 @@ SESSION_DEP = Depends(get_session)
|
||||
BOARD_USER_READ_DEP = Depends(get_board_for_user_read)
|
||||
BOARD_USER_WRITE_DEP = Depends(get_board_for_user_write)
|
||||
BOARD_OR_404_DEP = Depends(get_board_or_404)
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
def _webhook_endpoint_path(board_id: UUID, webhook_id: UUID) -> str:
|
||||
@@ -403,6 +405,15 @@ async def ingest_board_webhook(
|
||||
board_id=board.id,
|
||||
webhook_id=webhook_id,
|
||||
)
|
||||
logger.info(
|
||||
"webhook.ingest.received",
|
||||
extra={
|
||||
"board_id": str(board.id),
|
||||
"webhook_id": str(webhook.id),
|
||||
"source_ip": request.client.host if request.client else None,
|
||||
"content_type": request.headers.get("content-type"),
|
||||
},
|
||||
)
|
||||
if not webhook.enabled:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_410_GONE,
|
||||
@@ -437,6 +448,15 @@ async def ingest_board_webhook(
|
||||
)
|
||||
session.add(memory)
|
||||
await session.commit()
|
||||
logger.info(
|
||||
"webhook.ingest.persisted",
|
||||
extra={
|
||||
"payload_id": str(payload.id),
|
||||
"board_id": str(board.id),
|
||||
"webhook_id": str(webhook.id),
|
||||
"memory_id": str(memory.id),
|
||||
},
|
||||
)
|
||||
|
||||
enqueued = enqueue_webhook_delivery(
|
||||
QueuedInboundDelivery(
|
||||
@@ -446,6 +466,15 @@ async def ingest_board_webhook(
|
||||
received_at=payload.received_at,
|
||||
),
|
||||
)
|
||||
logger.info(
|
||||
"webhook.ingest.enqueued",
|
||||
extra={
|
||||
"payload_id": str(payload.id),
|
||||
"board_id": str(board.id),
|
||||
"webhook_id": str(webhook.id),
|
||||
"enqueued": enqueued,
|
||||
},
|
||||
)
|
||||
if not enqueued:
|
||||
# Preserve historical behavior by still notifying synchronously if queueing fails.
|
||||
await _notify_lead_on_webhook_payload(
|
||||
|
||||
@@ -58,6 +58,8 @@ class Settings(BaseSettings):
|
||||
rq_queue_name: str = "default"
|
||||
rq_dispatch_throttle_seconds: float = 2.0
|
||||
rq_dispatch_max_retries: int = 3
|
||||
rq_dispatch_retry_base_seconds: float = 10.0
|
||||
rq_dispatch_retry_max_seconds: float = 120.0
|
||||
|
||||
# Logging
|
||||
log_level: str = "INFO"
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any, cast
|
||||
@@ -14,6 +15,9 @@ from app.core.logging import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_SCHEDULED_SUFFIX = ":scheduled"
|
||||
_DRY_RUN_BATCH_SIZE = 100
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class QueuedTask:
|
||||
@@ -40,7 +44,84 @@ def _redis_client(redis_url: str | None = None) -> redis.Redis:
|
||||
return redis.Redis.from_url(redis_url or settings.rq_redis_url)
|
||||
|
||||
|
||||
def enqueue_task(task: QueuedTask, queue_name: str, *, redis_url: str | None = None) -> bool:
|
||||
def _scheduled_queue_name(queue_name: str) -> str:
|
||||
return f"{queue_name}{_SCHEDULED_SUFFIX}"
|
||||
|
||||
|
||||
def _now_seconds() -> float:
|
||||
return time.time()
|
||||
|
||||
|
||||
def _drain_ready_scheduled_tasks(
|
||||
client: redis.Redis,
|
||||
queue_name: str,
|
||||
*,
|
||||
max_items: int = _DRY_RUN_BATCH_SIZE,
|
||||
) -> float | None:
|
||||
scheduled_queue = _scheduled_queue_name(queue_name)
|
||||
now = _now_seconds()
|
||||
|
||||
ready_items = client.zrangebyscore(
|
||||
scheduled_queue,
|
||||
"-inf",
|
||||
now,
|
||||
start=0,
|
||||
num=max_items,
|
||||
)
|
||||
if ready_items:
|
||||
client.lpush(queue_name, *ready_items)
|
||||
client.zrem(scheduled_queue, *ready_items)
|
||||
logger.debug(
|
||||
"rq.queue.drain_ready_scheduled",
|
||||
extra={
|
||||
"queue_name": queue_name,
|
||||
"count": len(ready_items),
|
||||
},
|
||||
)
|
||||
|
||||
next_item = client.zrangebyscore(
|
||||
scheduled_queue,
|
||||
now,
|
||||
"+inf",
|
||||
start=0,
|
||||
num=1,
|
||||
withscores=True,
|
||||
)
|
||||
if not next_item:
|
||||
return None
|
||||
|
||||
next_score = float(cast(tuple[str | bytes, float], next_item[0])[1])
|
||||
return max(0.0, next_score - now)
|
||||
|
||||
|
||||
def _schedule_for_later(
|
||||
task: QueuedTask,
|
||||
queue_name: str,
|
||||
delay_seconds: float,
|
||||
*,
|
||||
redis_url: str | None = None,
|
||||
) -> bool:
|
||||
client = _redis_client(redis_url=redis_url)
|
||||
scheduled_queue = _scheduled_queue_name(queue_name)
|
||||
score = _now_seconds() + delay_seconds
|
||||
client.zadd(scheduled_queue, {task.to_json(): score})
|
||||
logger.info(
|
||||
"rq.queue.scheduled",
|
||||
extra={
|
||||
"task_type": task.task_type,
|
||||
"queue_name": queue_name,
|
||||
"delay_seconds": delay_seconds,
|
||||
},
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def enqueue_task(
|
||||
task: QueuedTask,
|
||||
queue_name: str,
|
||||
*,
|
||||
redis_url: str | None = None,
|
||||
) -> bool:
|
||||
"""Persist a task envelope in a Redis list-backed queue."""
|
||||
try:
|
||||
client = _redis_client(redis_url=redis_url)
|
||||
@@ -87,14 +168,22 @@ def dequeue_task(
|
||||
) -> QueuedTask | None:
|
||||
"""Pop one task envelope from the queue."""
|
||||
client = _redis_client(redis_url=redis_url)
|
||||
timeout = max(0.0, float(block_timeout))
|
||||
if block:
|
||||
raw = cast(tuple[bytes | str, bytes | str] | None, client.brpop(queue_name, timeout=block_timeout))
|
||||
next_delay = _drain_ready_scheduled_tasks(client, queue_name)
|
||||
if timeout == 0:
|
||||
timeout = next_delay if next_delay is not None else 0
|
||||
else:
|
||||
timeout = min(timeout, next_delay) if next_delay is not None else timeout
|
||||
raw = cast(tuple[bytes | str, bytes | str] | None, client.brpop(queue_name, timeout=timeout))
|
||||
if raw is None:
|
||||
_drain_ready_scheduled_tasks(client, queue_name)
|
||||
return None
|
||||
raw = raw[1]
|
||||
else:
|
||||
raw = cast(str | bytes | None, client.rpop(queue_name))
|
||||
if raw is None:
|
||||
_drain_ready_scheduled_tasks(client, queue_name)
|
||||
return None
|
||||
return _decode_task(raw, queue_name)
|
||||
|
||||
@@ -141,19 +230,32 @@ def requeue_if_failed(
|
||||
*,
|
||||
max_retries: int,
|
||||
redis_url: str | None = None,
|
||||
delay_seconds: float = 0,
|
||||
) -> bool:
|
||||
"""Requeue a failed task with capped retries.
|
||||
|
||||
Returns True if requeued.
|
||||
"""
|
||||
if task.attempts >= max_retries:
|
||||
requeued_task = _requeue_with_attempt(task)
|
||||
if requeued_task.attempts > max_retries:
|
||||
logger.warning(
|
||||
"rq.queue.drop_failed_task",
|
||||
extra={
|
||||
"task_type": task.task_type,
|
||||
"queue_name": queue_name,
|
||||
"attempts": task.attempts,
|
||||
"attempts": requeued_task.attempts,
|
||||
},
|
||||
)
|
||||
return False
|
||||
return enqueue_task(_requeue_with_attempt(task), queue_name, redis_url=redis_url)
|
||||
if delay_seconds > 0:
|
||||
return _schedule_for_later(
|
||||
requeued_task,
|
||||
queue_name,
|
||||
delay_seconds,
|
||||
redis_url=redis_url,
|
||||
)
|
||||
return enqueue_task(
|
||||
requeued_task,
|
||||
queue_name,
|
||||
redis_url=redis_url,
|
||||
)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
@@ -198,8 +199,13 @@ async def flush_webhook_delivery_queue(*, block: bool = False, block_timeout: fl
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
requeue_if_failed(item)
|
||||
time.sleep(settings.rq_dispatch_throttle_seconds)
|
||||
delay = min(
|
||||
settings.rq_dispatch_retry_base_seconds * (2 ** max(0, item.attempts)),
|
||||
settings.rq_dispatch_retry_max_seconds,
|
||||
)
|
||||
jitter = random.uniform(0, min(settings.rq_dispatch_retry_max_seconds / 10, delay * 0.1))
|
||||
requeue_if_failed(item, delay_seconds=delay + jitter)
|
||||
await asyncio.sleep(settings.rq_dispatch_throttle_seconds)
|
||||
if processed > 0:
|
||||
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
|
||||
return processed
|
||||
|
||||
@@ -119,7 +119,11 @@ def dequeue_webhook_delivery(
|
||||
raise
|
||||
|
||||
|
||||
def requeue_if_failed(payload: QueuedInboundDelivery) -> bool:
|
||||
def requeue_if_failed(
|
||||
payload: QueuedInboundDelivery,
|
||||
*,
|
||||
delay_seconds: float = 0,
|
||||
) -> bool:
|
||||
"""Requeue payload delivery with capped retries.
|
||||
|
||||
Returns True if requeued.
|
||||
@@ -130,6 +134,7 @@ def requeue_if_failed(payload: QueuedInboundDelivery) -> bool:
|
||||
settings.rq_queue_name,
|
||||
max_retries=settings.rq_dispatch_max_retries,
|
||||
redis_url=settings.rq_redis_url,
|
||||
delay_seconds=delay_seconds,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
|
||||
Reference in New Issue
Block a user