Compare commits
1 Commits
feat/webho
...
noah/pr130
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ad6ff33d16 |
@@ -21,10 +21,18 @@ CLERK_LEEWAY=10.0
|
||||
DB_AUTO_MIGRATE=false
|
||||
# Webhook queue / worker
|
||||
WEBHOOK_REDIS_URL=redis://localhost:6379/0
|
||||
WEBHOOK_QUEUE_NAME=webhook-dispatch
|
||||
|
||||
# RQ queue that runs the batch dispatch job.
|
||||
WEBHOOK_LEADS_RQ_QUEUE_NAME=webhook-dispatch
|
||||
|
||||
# Redis list key that stores queued webhook deliveries for batching.
|
||||
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY=webhook-dispatch
|
||||
|
||||
# Backwards compat (deprecated): if set, used as both queue name + list key unless the
|
||||
# split settings above are explicitly set.
|
||||
WEBHOOK_QUEUE_NAME=
|
||||
|
||||
WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0
|
||||
WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch
|
||||
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900
|
||||
WEBHOOK_DISPATCH_MAX_RETRIES=3
|
||||
# Suppress routine GitHub CI telemetry events from lead notifications (still persisted to DB/memory).
|
||||
WEBHOOK_DISPATCH_SUPPRESS_ROUTINE_EVENTS=true
|
||||
|
||||
@@ -55,14 +55,21 @@ class Settings(BaseSettings):
|
||||
|
||||
# Webhook queueing / dispatch
|
||||
webhook_redis_url: str = "redis://localhost:6379/0"
|
||||
|
||||
# NOTE: Deprecated. Historically used for both the Redis list key *and* the RQ queue name.
|
||||
# Prefer `webhook_leads_batch_redis_list_key` + `webhook_leads_rq_queue_name`.
|
||||
webhook_queue_name: str = "webhook-dispatch"
|
||||
|
||||
# RQ queue that runs the batch dispatch job.
|
||||
webhook_leads_rq_queue_name: str = "webhook-dispatch"
|
||||
|
||||
# Redis list key that stores queued webhook deliveries for batching.
|
||||
webhook_leads_batch_redis_list_key: str = "webhook-dispatch"
|
||||
|
||||
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
|
||||
webhook_dispatch_throttle_seconds: float = 2.0
|
||||
webhook_dispatch_schedule_interval_seconds: int = 900
|
||||
webhook_dispatch_max_retries: int = 3
|
||||
# If true, suppress high-volume routine CI telemetry events (e.g. GitHub check_run success)
|
||||
# from lead notifications. Payloads are still persisted and recorded in board memory.
|
||||
webhook_dispatch_suppress_routine_events: bool = True
|
||||
|
||||
# Logging
|
||||
log_level: str = "INFO"
|
||||
@@ -73,6 +80,13 @@ class Settings(BaseSettings):
|
||||
|
||||
@model_validator(mode="after")
|
||||
def _defaults(self) -> Self:
|
||||
# Backwards compatibility: If WEBHOOK_QUEUE_NAME was set (legacy), and the
|
||||
# newer split settings were not explicitly set, mirror it.
|
||||
if "webhook_queue_name" in self.model_fields_set:
|
||||
if "webhook_leads_rq_queue_name" not in self.model_fields_set:
|
||||
self.webhook_leads_rq_queue_name = self.webhook_queue_name
|
||||
if "webhook_leads_batch_redis_list_key" not in self.model_fields_set:
|
||||
self.webhook_leads_batch_redis_list_key = self.webhook_queue_name
|
||||
if self.auth_mode == AuthMode.CLERK:
|
||||
if not self.clerk_secret_key.strip():
|
||||
raise ValueError(
|
||||
|
||||
@@ -24,102 +24,6 @@ from app.services.webhooks.queue import (
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_ROUTINE_GITHUB_EVENTS = frozenset({"check_run", "check_suite", "workflow_run"})
|
||||
_SUCCESS_GITHUB_CONCLUSIONS = frozenset({None, "success", "neutral", "skipped"})
|
||||
# Consider these actionable enough to page the lead / surface in task threads.
|
||||
_ACTIONABLE_GITHUB_CONCLUSIONS = frozenset(
|
||||
{
|
||||
"failure",
|
||||
"cancelled",
|
||||
"timed_out",
|
||||
"action_required",
|
||||
"stale",
|
||||
"startup_failure",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _as_dict(value: object) -> dict[str, object] | None:
|
||||
if isinstance(value, dict):
|
||||
# Keep only string keys; payloads can include non-str keys in edge cases.
|
||||
normalized: dict[str, object] = {}
|
||||
for k, v in value.items():
|
||||
if isinstance(k, str):
|
||||
normalized[k] = v
|
||||
return normalized
|
||||
return None
|
||||
|
||||
|
||||
def _str_or_none(value: object) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
return str(value)
|
||||
|
||||
|
||||
def _extract_github_conclusion(payload: dict[str, object], *, key: str) -> str | None:
|
||||
container = _as_dict(payload.get(key))
|
||||
if not container:
|
||||
return None
|
||||
return _str_or_none(container.get("conclusion"))
|
||||
|
||||
|
||||
def _extract_github_status(payload: dict[str, object], *, key: str) -> str | None:
|
||||
container = _as_dict(payload.get(key))
|
||||
if not container:
|
||||
return None
|
||||
return _str_or_none(container.get("status"))
|
||||
|
||||
|
||||
def _should_suppress_routine_delivery(
|
||||
*,
|
||||
payload_event: str | None,
|
||||
payload_value: object,
|
||||
) -> bool:
|
||||
"""Return True if this delivery is routine noise and should not notify leads.
|
||||
|
||||
This intentionally only targets high-volume GitHub CI telemetry events.
|
||||
We still persist the webhook payload + board memory entry for audit/debug.
|
||||
"""
|
||||
|
||||
if not settings.webhook_dispatch_suppress_routine_events:
|
||||
return False
|
||||
|
||||
if payload_event not in _ROUTINE_GITHUB_EVENTS:
|
||||
return False
|
||||
|
||||
payload = _as_dict(payload_value)
|
||||
if payload is None:
|
||||
return False
|
||||
|
||||
action = _str_or_none(payload.get("action"))
|
||||
# If GitHub hasn't marked it completed, it's almost always noise.
|
||||
if action and action != "completed":
|
||||
return True
|
||||
|
||||
if payload_event == "workflow_run":
|
||||
status = _extract_github_status(payload, key="workflow_run")
|
||||
if status and status != "completed":
|
||||
return True
|
||||
conclusion = _extract_github_conclusion(payload, key="workflow_run")
|
||||
elif payload_event == "check_run":
|
||||
status = _extract_github_status(payload, key="check_run")
|
||||
if status and status != "completed":
|
||||
return True
|
||||
conclusion = _extract_github_conclusion(payload, key="check_run")
|
||||
else: # check_suite
|
||||
status = _extract_github_status(payload, key="check_suite")
|
||||
if status and status != "completed":
|
||||
return True
|
||||
conclusion = _extract_github_conclusion(payload, key="check_suite")
|
||||
|
||||
if conclusion in _SUCCESS_GITHUB_CONCLUSIONS:
|
||||
return True
|
||||
|
||||
# Only page on explicitly non-success conclusions.
|
||||
return conclusion not in _ACTIONABLE_GITHUB_CONCLUSIONS
|
||||
|
||||
|
||||
def _build_payload_preview(payload_value: object) -> str:
|
||||
if isinstance(payload_value, str):
|
||||
@@ -261,22 +165,6 @@ async def _process_single_item(item: QueuedWebhookDelivery) -> None:
|
||||
return
|
||||
|
||||
board, webhook, payload = loaded
|
||||
if _should_suppress_routine_delivery(
|
||||
payload_event=item.payload_event,
|
||||
payload_value=payload.payload,
|
||||
):
|
||||
logger.info(
|
||||
"webhook.dispatch.suppressed_routine",
|
||||
extra={
|
||||
"payload_id": str(item.payload_id),
|
||||
"webhook_id": str(item.webhook_id),
|
||||
"board_id": str(item.board_id),
|
||||
"event": item.payload_event,
|
||||
"attempt": item.attempts,
|
||||
},
|
||||
)
|
||||
return
|
||||
|
||||
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
|
||||
await session.commit()
|
||||
|
||||
@@ -318,7 +206,7 @@ async def flush_webhook_delivery_queue() -> None:
|
||||
},
|
||||
)
|
||||
requeue_if_failed(item)
|
||||
time.sleep(settings.webhook_dispatch_throttle_seconds)
|
||||
await asyncio.sleep(settings.webhook_dispatch_throttle_seconds)
|
||||
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
|
||||
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
|
||||
"""Persist webhook metadata in a Redis queue for batch dispatch."""
|
||||
try:
|
||||
client = _redis_client()
|
||||
client.lpush(settings.webhook_queue_name, payload.to_json())
|
||||
client.lpush(settings.webhook_leads_batch_redis_list_key, payload.to_json())
|
||||
logger.info(
|
||||
"webhook.queue.enqueued",
|
||||
extra={
|
||||
@@ -78,7 +78,10 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
|
||||
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
|
||||
"""Pop one queued webhook delivery payload."""
|
||||
client = _redis_client()
|
||||
raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
|
||||
raw = cast(
|
||||
str | bytes | None,
|
||||
client.rpop(settings.webhook_leads_batch_redis_list_key),
|
||||
)
|
||||
if raw is None:
|
||||
return None
|
||||
if isinstance(raw, bytes):
|
||||
|
||||
@@ -14,7 +14,7 @@ from app.services.webhooks import dispatch
|
||||
def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None:
|
||||
"""Register a recurring queue-flush job and keep it idempotent."""
|
||||
connection = Redis.from_url(settings.webhook_redis_url)
|
||||
scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection)
|
||||
scheduler = Scheduler(queue_name=settings.webhook_leads_rq_queue_name, connection=connection)
|
||||
|
||||
for job in scheduler.get_jobs():
|
||||
if job.id == settings.webhook_dispatch_schedule_id:
|
||||
@@ -32,5 +32,5 @@ def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) ->
|
||||
interval=effective_interval_seconds,
|
||||
repeat=None,
|
||||
id=settings.webhook_dispatch_schedule_id,
|
||||
queue_name=settings.webhook_queue_name,
|
||||
queue_name=settings.webhook_leads_rq_queue_name,
|
||||
)
|
||||
|
||||
@@ -123,7 +123,11 @@ async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest.
|
||||
|
||||
monkeypatch.setattr(dispatch, "_process_single_item", _process)
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
|
||||
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: throttles.append(seconds))
|
||||
|
||||
async def _sleep(seconds: float) -> None:
|
||||
throttles.append(float(seconds))
|
||||
|
||||
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
|
||||
|
||||
await dispatch.flush_webhook_delivery_queue()
|
||||
|
||||
@@ -148,7 +152,11 @@ async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.Monk
|
||||
monkeypatch.setattr(dispatch, "_process_single_item", _process)
|
||||
monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue)
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
|
||||
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
|
||||
|
||||
async def _sleep(_: float) -> None:
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
|
||||
|
||||
await dispatch.flush_webhook_delivery_queue()
|
||||
|
||||
@@ -180,7 +188,11 @@ async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.Mo
|
||||
|
||||
monkeypatch.setattr(dispatch, "_process_single_item", _process)
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
|
||||
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
|
||||
|
||||
async def _sleep(_: float) -> None:
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(dispatch.asyncio, "sleep", _sleep)
|
||||
|
||||
await dispatch.flush_webhook_delivery_queue()
|
||||
|
||||
|
||||
@@ -1,71 +0,0 @@
|
||||
# ruff: noqa: INP001
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.webhooks import dispatch
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("payload_event", "payload_value", "expected"),
|
||||
[
|
||||
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "success"}}, True),
|
||||
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": None}}, True),
|
||||
("check_run", {"action": "created", "check_run": {"status": "queued"}}, True),
|
||||
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "failure"}}, False),
|
||||
(
|
||||
"workflow_run",
|
||||
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "success"}},
|
||||
True,
|
||||
),
|
||||
(
|
||||
"workflow_run",
|
||||
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "cancelled"}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
"check_suite",
|
||||
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "timed_out"}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
"check_suite",
|
||||
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "neutral"}},
|
||||
True,
|
||||
),
|
||||
# Non-target events should not be suppressed by this helper.
|
||||
("pull_request", {"action": "opened"}, False),
|
||||
(None, {"action": "opened"}, False),
|
||||
# Non-dict payloads: don't suppress (we can't reason about it).
|
||||
("check_run", "raw", False),
|
||||
],
|
||||
)
|
||||
def test_should_suppress_routine_delivery(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
payload_event: str | None,
|
||||
payload_value: object,
|
||||
expected: bool,
|
||||
) -> None:
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", True)
|
||||
assert (
|
||||
dispatch._should_suppress_routine_delivery(
|
||||
payload_event=payload_event,
|
||||
payload_value=payload_value,
|
||||
)
|
||||
is expected
|
||||
)
|
||||
|
||||
|
||||
def test_suppression_disabled_via_settings(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", False)
|
||||
assert (
|
||||
dispatch._should_suppress_routine_delivery(
|
||||
payload_event="check_run",
|
||||
payload_value={
|
||||
"action": "completed",
|
||||
"check_run": {"status": "completed", "conclusion": "success"},
|
||||
},
|
||||
)
|
||||
is False
|
||||
)
|
||||
10
compose.yml
10
compose.yml
@@ -38,6 +38,8 @@ services:
|
||||
AUTH_MODE: ${AUTH_MODE}
|
||||
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
|
||||
WEBHOOK_REDIS_URL: redis://redis:6379/0
|
||||
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
|
||||
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
@@ -70,6 +72,8 @@ services:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: backend/Dockerfile
|
||||
# NOTE: keep explicit queue name to preserve historical defaults.
|
||||
# Can be changed by setting WEBHOOK_LEADS_RQ_QUEUE_NAME and updating this command.
|
||||
command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"]
|
||||
env_file:
|
||||
- ./backend/.env.example
|
||||
@@ -83,7 +87,8 @@ services:
|
||||
AUTH_MODE: ${AUTH_MODE}
|
||||
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
|
||||
WEBHOOK_REDIS_URL: redis://redis:6379/0
|
||||
WEBHOOK_QUEUE_NAME: webhook-dispatch
|
||||
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
|
||||
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
|
||||
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
|
||||
restart: unless-stopped
|
||||
|
||||
@@ -102,7 +107,8 @@ services:
|
||||
- webhook-worker
|
||||
environment:
|
||||
WEBHOOK_REDIS_URL: redis://redis:6379/0
|
||||
WEBHOOK_QUEUE_NAME: webhook-dispatch
|
||||
WEBHOOK_LEADS_RQ_QUEUE_NAME: webhook-dispatch
|
||||
WEBHOOK_LEADS_BATCH_REDIS_LIST_KEY: webhook-dispatch
|
||||
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
|
||||
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900
|
||||
restart: unless-stopped
|
||||
|
||||
Reference in New Issue
Block a user