1 Commits

Author SHA1 Message Date
Abhimanyu Saharan
ad6ff33d16 Fix webhook dispatch throttle + split queue/list config 2026-02-14 17:58:50 +00:00
8 changed files with 59 additions and 199 deletions

View File

@@ -21,10 +21,18 @@ CLERK_LEEWAY=10.0
DB_AUTO_MIGRATE=false
# Webhook queue / worker
WEBHOOK_REDIS_URL=redis://localhost:6379/0
WEBHOOK_QUEUE_NAME=webhook-dispatch
# RQ queue that runs the batch dispatch job.
WEBHOOK_LEADS_RQ_QUEUE_NAME=webhook-dispatch
# Redis list key that stores queued webhook deliveries for batching.
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY=webhook-dispatch
# Backwards compat (deprecated): if set, used as both queue name + list key unless the
# split settings above are explicitly set.
WEBHOOK_QUEUE_NAME=
WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0
WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900
WEBHOOK_DISPATCH_MAX_RETRIES=3
# Suppress routine GitHub CI telemetry events from lead notifications (still persisted to DB/memory).
WEBHOOK_DISPATCH_SUPPRESS_ROUTINE_EVENTS=true

View File

@@ -55,14 +55,21 @@ class Settings(BaseSettings):
# Webhook queueing / dispatch
webhook_redis_url: str = "redis://localhost:6379/0"
# NOTE: Deprecated. Historically used for both the Redis list key *and* the RQ queue name.
# Prefer `webhook_leads_batch_redis_list_key` + `webhook_leads_rq_queue_name`.
webhook_queue_name: str = "webhook-dispatch"
# RQ queue that runs the batch dispatch job.
webhook_leads_rq_queue_name: str = "webhook-dispatch"
# Redis list key that stores queued webhook deliveries for batching.
webhook_leads_batch_redis_list_key: str = "webhook-dispatch"
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
webhook_dispatch_throttle_seconds: float = 2.0
webhook_dispatch_schedule_interval_seconds: int = 900
webhook_dispatch_max_retries: int = 3
# If true, suppress high-volume routine CI telemetry events (e.g. GitHub check_run success)
# from lead notifications. Payloads are still persisted and recorded in board memory.
webhook_dispatch_suppress_routine_events: bool = True
# Logging
log_level: str = "INFO"
@@ -73,6 +80,13 @@ class Settings(BaseSettings):
@model_validator(mode="after")
def _defaults(self) -> Self:
# Backwards compatibility: If WEBHOOK_QUEUE_NAME was set (legacy), and the
# newer split settings were not explicitly set, mirror it.
if "webhook_queue_name" in self.model_fields_set:
if "webhook_leads_rq_queue_name" not in self.model_fields_set:
self.webhook_leads_rq_queue_name = self.webhook_queue_name
if "webhook_leads_batch_redis_list_key" not in self.model_fields_set:
self.webhook_leads_batch_redis_list_key = self.webhook_queue_name
if self.auth_mode == AuthMode.CLERK:
if not self.clerk_secret_key.strip():
raise ValueError(

View File

@@ -24,102 +24,6 @@ from app.services.webhooks.queue import (
logger = get_logger(__name__)
_ROUTINE_GITHUB_EVENTS = frozenset({"check_run", "check_suite", "workflow_run"})
_SUCCESS_GITHUB_CONCLUSIONS = frozenset({None, "success", "neutral", "skipped"})
# Consider these actionable enough to page the lead / surface in task threads.
_ACTIONABLE_GITHUB_CONCLUSIONS = frozenset(
{
"failure",
"cancelled",
"timed_out",
"action_required",
"stale",
"startup_failure",
},
)
def _as_dict(value: object) -> dict[str, object] | None:
if isinstance(value, dict):
# Keep only string keys; payloads can include non-str keys in edge cases.
normalized: dict[str, object] = {}
for k, v in value.items():
if isinstance(k, str):
normalized[k] = v
return normalized
return None
def _str_or_none(value: object) -> str | None:
if value is None:
return None
if isinstance(value, str):
return value
return str(value)
def _extract_github_conclusion(payload: dict[str, object], *, key: str) -> str | None:
container = _as_dict(payload.get(key))
if not container:
return None
return _str_or_none(container.get("conclusion"))
def _extract_github_status(payload: dict[str, object], *, key: str) -> str | None:
container = _as_dict(payload.get(key))
if not container:
return None
return _str_or_none(container.get("status"))
def _should_suppress_routine_delivery(
*,
payload_event: str | None,
payload_value: object,
) -> bool:
"""Return True if this delivery is routine noise and should not notify leads.
This intentionally only targets high-volume GitHub CI telemetry events.
We still persist the webhook payload + board memory entry for audit/debug.
"""
if not settings.webhook_dispatch_suppress_routine_events:
return False
if payload_event not in _ROUTINE_GITHUB_EVENTS:
return False
payload = _as_dict(payload_value)
if payload is None:
return False
action = _str_or_none(payload.get("action"))
# If GitHub hasn't marked it completed, it's almost always noise.
if action and action != "completed":
return True
if payload_event == "workflow_run":
status = _extract_github_status(payload, key="workflow_run")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="workflow_run")
elif payload_event == "check_run":
status = _extract_github_status(payload, key="check_run")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="check_run")
else: # check_suite
status = _extract_github_status(payload, key="check_suite")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="check_suite")
if conclusion in _SUCCESS_GITHUB_CONCLUSIONS:
return True
# Only page on explicitly non-success conclusions.
return conclusion not in _ACTIONABLE_GITHUB_CONCLUSIONS
def _build_payload_preview(payload_value: object) -> str:
if isinstance(payload_value, str):
@@ -261,22 +165,6 @@ async def _process_single_item(item: QueuedWebhookDelivery) -> None:
return
board, webhook, payload = loaded
if _should_suppress_routine_delivery(
payload_event=item.payload_event,
payload_value=payload.payload,
):
logger.info(
"webhook.dispatch.suppressed_routine",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"event": item.payload_event,
"attempt": item.attempts,
},
)
return
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
await session.commit()
@@ -318,7 +206,7 @@ async def flush_webhook_delivery_queue() -> None:
},
)
requeue_if_failed(item)
time.sleep(settings.webhook_dispatch_throttle_seconds)
await asyncio.sleep(settings.webhook_dispatch_throttle_seconds)
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})

View File

@@ -51,7 +51,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
"""Persist webhook metadata in a Redis queue for batch dispatch."""
try:
client = _redis_client()
client.lpush(settings.webhook_queue_name, payload.to_json())
client.lpush(settings.webhook_leads_batch_redis_list_key, payload.to_json())
logger.info(
"webhook.queue.enqueued",
extra={
@@ -78,7 +78,10 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
"""Pop one queued webhook delivery payload."""
client = _redis_client()
raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
raw = cast(
str | bytes | None,
client.rpop(settings.webhook_leads_batch_redis_list_key),
)
if raw is None:
return None
if isinstance(raw, bytes):

View File

@@ -14,7 +14,7 @@ from app.services.webhooks import dispatch
def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None:
"""Register a recurring queue-flush job and keep it idempotent."""
connection = Redis.from_url(settings.webhook_redis_url)
scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection)
scheduler = Scheduler(queue_name=settings.webhook_leads_rq_queue_name, connection=connection)
for job in scheduler.get_jobs():
if job.id == settings.webhook_dispatch_schedule_id:
@@ -32,5 +32,5 @@ def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) ->
interval=effective_interval_seconds,
repeat=None,
id=settings.webhook_dispatch_schedule_id,
queue_name=settings.webhook_queue_name,
queue_name=settings.webhook_leads_rq_queue_name,
)

View File

@@ -123,7 +123,11 @@ async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest.
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: throttles.append(seconds))
async def _sleep(seconds: float) -> None:
throttles.append(float(seconds))
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()
@@ -148,7 +152,11 @@ async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.Monk
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
async def _sleep(_: float) -> None:
return None
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()
@@ -180,7 +188,11 @@ async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.Mo
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
async def _sleep(_: float) -> None:
return None
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
await dispatch.flush_webhook_delivery_queue()

View File

@@ -1,71 +0,0 @@
# ruff: noqa: INP001
from __future__ import annotations
import pytest
from app.services.webhooks import dispatch
@pytest.mark.parametrize(
("payload_event", "payload_value", "expected"),
[
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "success"}}, True),
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": None}}, True),
("check_run", {"action": "created", "check_run": {"status": "queued"}}, True),
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "failure"}}, False),
(
"workflow_run",
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "success"}},
True,
),
(
"workflow_run",
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "cancelled"}},
False,
),
(
"check_suite",
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "timed_out"}},
False,
),
(
"check_suite",
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "neutral"}},
True,
),
# Non-target events should not be suppressed by this helper.
("pull_request", {"action": "opened"}, False),
(None, {"action": "opened"}, False),
# Non-dict payloads: don't suppress (we can't reason about it).
("check_run", "raw", False),
],
)
def test_should_suppress_routine_delivery(
monkeypatch: pytest.MonkeyPatch,
payload_event: str | None,
payload_value: object,
expected: bool,
) -> None:
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", True)
assert (
dispatch._should_suppress_routine_delivery(
payload_event=payload_event,
payload_value=payload_value,
)
is expected
)
def test_suppression_disabled_via_settings(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", False)
assert (
dispatch._should_suppress_routine_delivery(
payload_event="check_run",
payload_value={
"action": "completed",
"check_run": {"status": "completed", "conclusion": "success"},
},
)
is False
)

View File

@@ -38,6 +38,8 @@ services:
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
depends_on:
db:
condition: service_healthy
@@ -70,6 +72,8 @@ services:
build:
context: .
dockerfile: backend/Dockerfile
# NOTE: keep explicit queue name to preserve historical defaults.
# Can be changed by setting WEBHOOK_LEADS_RQ_QUEUE_NAME and updating this command.
command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"]
env_file:
- ./backend/.env.example
@@ -83,7 +87,8 @@ services:
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
restart: unless-stopped
@@ -102,7 +107,8 @@ services:
- webhook-worker
environment:
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900
restart: unless-stopped