1 Commits

Author SHA1 Message Date
Abhimanyu Saharan
bb66190913 Suppress routine GitHub CI webhook deliveries 2026-02-14 14:29:15 +00:00
8 changed files with 199 additions and 59 deletions

View File

@@ -21,18 +21,10 @@ CLERK_LEEWAY=10.0
DB_AUTO_MIGRATE=false
# Webhook queue / worker
WEBHOOK_REDIS_URL=redis://localhost:6379/0
# RQ queue that runs the batch dispatch job.
WEBHOOK_LEADS_RQ_QUEUE_NAME=webhook-dispatch
# Redis list key that stores queued webhook deliveries for batching.
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY=webhook-dispatch
# Backwards compat (deprecated): if set, used as both queue name + list key unless the
# split settings above are explicitly set.
WEBHOOK_QUEUE_NAME=
WEBHOOK_QUEUE_NAME=webhook-dispatch
WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0
WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900
WEBHOOK_DISPATCH_MAX_RETRIES=3
# Suppress routine GitHub CI telemetry events from lead notifications (still persisted to DB/memory).
WEBHOOK_DISPATCH_SUPPRESS_ROUTINE_EVENTS=true

View File

@@ -55,21 +55,14 @@ class Settings(BaseSettings):
# Webhook queueing / dispatch
webhook_redis_url: str = "redis://localhost:6379/0"
# NOTE: Deprecated. Historically used for both the Redis list key *and* the RQ queue name.
# Prefer `webhook_leads_batch_redis_list_key` + `webhook_leads_rq_queue_name`.
webhook_queue_name: str = "webhook-dispatch"
# RQ queue that runs the batch dispatch job.
webhook_leads_rq_queue_name: str = "webhook-dispatch"
# Redis list key that stores queued webhook deliveries for batching.
webhook_leads_batch_redis_list_key: str = "webhook-dispatch"
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
webhook_dispatch_throttle_seconds: float = 2.0
webhook_dispatch_schedule_interval_seconds: int = 900
webhook_dispatch_max_retries: int = 3
# If true, suppress high-volume routine CI telemetry events (e.g. GitHub check_run success)
# from lead notifications. Payloads are still persisted and recorded in board memory.
webhook_dispatch_suppress_routine_events: bool = True
# Logging
log_level: str = "INFO"
@@ -80,13 +73,6 @@ class Settings(BaseSettings):
@model_validator(mode="after")
def _defaults(self) -> Self:
# Backwards compatibility: If WEBHOOK_QUEUE_NAME was set (legacy), and the
# newer split settings were not explicitly set, mirror it.
if "webhook_queue_name" in self.model_fields_set:
if "webhook_leads_rq_queue_name" not in self.model_fields_set:
self.webhook_leads_rq_queue_name = self.webhook_queue_name
if "webhook_leads_batch_redis_list_key" not in self.model_fields_set:
self.webhook_leads_batch_redis_list_key = self.webhook_queue_name
if self.auth_mode == AuthMode.CLERK:
if not self.clerk_secret_key.strip():
raise ValueError(

View File

@@ -24,6 +24,102 @@ from app.services.webhooks.queue import (
logger = get_logger(__name__)
_ROUTINE_GITHUB_EVENTS = frozenset({"check_run", "check_suite", "workflow_run"})
_SUCCESS_GITHUB_CONCLUSIONS = frozenset({None, "success", "neutral", "skipped"})
# Consider these actionable enough to page the lead / surface in task threads.
_ACTIONABLE_GITHUB_CONCLUSIONS = frozenset(
{
"failure",
"cancelled",
"timed_out",
"action_required",
"stale",
"startup_failure",
},
)
def _as_dict(value: object) -> dict[str, object] | None:
if isinstance(value, dict):
# Keep only string keys; payloads can include non-str keys in edge cases.
normalized: dict[str, object] = {}
for k, v in value.items():
if isinstance(k, str):
normalized[k] = v
return normalized
return None
def _str_or_none(value: object) -> str | None:
if value is None:
return None
if isinstance(value, str):
return value
return str(value)
def _extract_github_conclusion(payload: dict[str, object], *, key: str) -> str | None:
container = _as_dict(payload.get(key))
if not container:
return None
return _str_or_none(container.get("conclusion"))
def _extract_github_status(payload: dict[str, object], *, key: str) -> str | None:
container = _as_dict(payload.get(key))
if not container:
return None
return _str_or_none(container.get("status"))
def _should_suppress_routine_delivery(
*,
payload_event: str | None,
payload_value: object,
) -> bool:
"""Return True if this delivery is routine noise and should not notify leads.
This intentionally only targets high-volume GitHub CI telemetry events.
We still persist the webhook payload + board memory entry for audit/debug.
"""
if not settings.webhook_dispatch_suppress_routine_events:
return False
if payload_event not in _ROUTINE_GITHUB_EVENTS:
return False
payload = _as_dict(payload_value)
if payload is None:
return False
action = _str_or_none(payload.get("action"))
# If GitHub hasn't marked it completed, it's almost always noise.
if action and action != "completed":
return True
if payload_event == "workflow_run":
status = _extract_github_status(payload, key="workflow_run")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="workflow_run")
elif payload_event == "check_run":
status = _extract_github_status(payload, key="check_run")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="check_run")
else: # check_suite
status = _extract_github_status(payload, key="check_suite")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="check_suite")
if conclusion in _SUCCESS_GITHUB_CONCLUSIONS:
return True
# Only page on explicitly non-success conclusions.
return conclusion not in _ACTIONABLE_GITHUB_CONCLUSIONS
def _build_payload_preview(payload_value: object) -> str:
if isinstance(payload_value, str):
@@ -165,6 +261,22 @@ async def _process_single_item(item: QueuedWebhookDelivery) -> None:
return
board, webhook, payload = loaded
if _should_suppress_routine_delivery(
payload_event=item.payload_event,
payload_value=payload.payload,
):
logger.info(
"webhook.dispatch.suppressed_routine",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"event": item.payload_event,
"attempt": item.attempts,
},
)
return
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
await session.commit()
@@ -206,7 +318,7 @@ async def flush_webhook_delivery_queue() -> None:
},
)
requeue_if_failed(item)
await asyncio.sleep(settings.webhook_dispatch_throttle_seconds)
time.sleep(settings.webhook_dispatch_throttle_seconds)
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})

View File

@@ -51,7 +51,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
"""Persist webhook metadata in a Redis queue for batch dispatch."""
try:
client = _redis_client()
client.lpush(settings.webhook_leads_batch_redis_list_key, payload.to_json())
client.lpush(settings.webhook_queue_name, payload.to_json())
logger.info(
"webhook.queue.enqueued",
extra={
@@ -78,10 +78,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
"""Pop one queued webhook delivery payload."""
client = _redis_client()
raw = cast(
str | bytes | None,
client.rpop(settings.webhook_leads_batch_redis_list_key),
)
raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
if raw is None:
return None
if isinstance(raw, bytes):

View File

@@ -14,7 +14,7 @@ from app.services.webhooks import dispatch
def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None:
"""Register a recurring queue-flush job and keep it idempotent."""
connection = Redis.from_url(settings.webhook_redis_url)
scheduler = Scheduler(queue_name=settings.webhook_leads_rq_queue_name, connection=connection)
scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection)
for job in scheduler.get_jobs():
if job.id == settings.webhook_dispatch_schedule_id:
@@ -32,5 +32,5 @@ def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) ->
interval=effective_interval_seconds,
repeat=None,
id=settings.webhook_dispatch_schedule_id,
queue_name=settings.webhook_leads_rq_queue_name,
queue_name=settings.webhook_queue_name,
)

View File

@@ -123,11 +123,7 @@ async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest.
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
async def _sleep(seconds: float) -> None:
throttles.append(float(seconds))
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: throttles.append(seconds))
await dispatch.flush_webhook_delivery_queue()
@@ -152,11 +148,7 @@ async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.Monk
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
async def _sleep(_: float) -> None:
return None
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
await dispatch.flush_webhook_delivery_queue()
@@ -188,11 +180,7 @@ async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.Mo
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
async def _sleep(_: float) -> None:
return None
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
await dispatch.flush_webhook_delivery_queue()

View File

@@ -0,0 +1,71 @@
# ruff: noqa: INP001
from __future__ import annotations
import pytest
from app.services.webhooks import dispatch
@pytest.mark.parametrize(
("payload_event", "payload_value", "expected"),
[
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "success"}}, True),
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": None}}, True),
("check_run", {"action": "created", "check_run": {"status": "queued"}}, True),
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "failure"}}, False),
(
"workflow_run",
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "success"}},
True,
),
(
"workflow_run",
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "cancelled"}},
False,
),
(
"check_suite",
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "timed_out"}},
False,
),
(
"check_suite",
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "neutral"}},
True,
),
# Non-target events should not be suppressed by this helper.
("pull_request", {"action": "opened"}, False),
(None, {"action": "opened"}, False),
# Non-dict payloads: don't suppress (we can't reason about it).
("check_run", "raw", False),
],
)
def test_should_suppress_routine_delivery(
monkeypatch: pytest.MonkeyPatch,
payload_event: str | None,
payload_value: object,
expected: bool,
) -> None:
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", True)
assert (
dispatch._should_suppress_routine_delivery(
payload_event=payload_event,
payload_value=payload_value,
)
is expected
)
def test_suppression_disabled_via_settings(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", False)
assert (
dispatch._should_suppress_routine_delivery(
payload_event="check_run",
payload_value={
"action": "completed",
"check_run": {"status": "completed", "conclusion": "success"},
},
)
is False
)

View File

@@ -38,8 +38,6 @@ services:
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
depends_on:
db:
condition: service_healthy
@@ -72,8 +70,6 @@ services:
build:
context: .
dockerfile: backend/Dockerfile
# NOTE: keep explicit queue name to preserve historical defaults.
# Can be changed by setting WEBHOOK_LEADS_RQ_QUEUE_NAME and updating this command.
command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"]
env_file:
- ./backend/.env.example
@@ -87,8 +83,7 @@ services:
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
restart: unless-stopped
@@ -107,8 +102,7 @@ services:
- webhook-worker
environment:
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900
restart: unless-stopped