7 Commits

Author SHA1 Message Date
Abhimanyu Saharan
bb66190913 Suppress routine GitHub CI webhook deliveries 2026-02-14 14:29:15 +00:00
Abhimanyu Saharan
4557fcc8ae test: add webhook dispatch worker coverage for process loop 2026-02-14 06:38:19 +00:00
Abhimanyu Saharan
554b73184a chore: decouple webhook-worker from backend service dependency 2026-02-14 06:36:46 +00:00
Abhimanyu Saharan
912387bd1c chore: centralize webhook scheduler interval config 2026-02-14 06:36:09 +00:00
Abhimanyu Saharan
d01365abfb fix: resolve mypy typing issues in webhook queue dispatch 2026-02-14 06:33:35 +00:00
Abhimanyu Saharan
e535f377ff fix: remove unused UUID import in webhook dispatch test 2026-02-14 06:30:11 +00:00
Abhimanyu Saharan
b987db58b8 feat: add RQ-based webhook dispatch queue and delayed worker 2026-02-14 06:23:55 +00:00
13 changed files with 1009 additions and 23 deletions

View File

@@ -12,12 +12,19 @@ BASE_URL=
AUTH_MODE=local
# REQUIRED when AUTH_MODE=local (must be non-placeholder and at least 50 chars).
LOCAL_AUTH_TOKEN=
# Clerk (auth only; used when AUTH_MODE=clerk)
CLERK_SECRET_KEY=
CLERK_API_URL=https://api.clerk.com
CLERK_VERIFY_IAT=true
CLERK_LEEWAY=10.0
# Database
DB_AUTO_MIGRATE=false
# Webhook queue / worker
WEBHOOK_REDIS_URL=redis://localhost:6379/0
WEBHOOK_QUEUE_NAME=webhook-dispatch
WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0
WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900
WEBHOOK_DISPATCH_MAX_RETRIES=3
# Suppress routine GitHub CI telemetry events from lead notifications (still persisted to DB/memory).
WEBHOOK_DISPATCH_SUPPRESS_ROUTINE_EVENTS=true

View File

@@ -29,6 +29,7 @@ from app.schemas.board_webhooks import (
from app.schemas.common import OkResponse
from app.schemas.pagination import DefaultLimitOffsetPage
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.webhooks.queue import QueuedWebhookDelivery, enqueue_webhook_delivery
if TYPE_CHECKING:
from collections.abc import Sequence
@@ -166,6 +167,12 @@ def _captured_headers(request: Request) -> dict[str, str] | None:
return captured or None
def _extract_webhook_event(headers: dict[str, str] | None) -> str | None:
if not headers:
return None
return headers.get("x-github-event") or headers.get("x-event-type")
def _payload_preview(
value: dict[str, object] | list[object] | str | int | float | bool | None,
) -> str:
@@ -412,6 +419,7 @@ async def ingest_board_webhook(
)
content_type = request.headers.get("content-type")
headers = _captured_headers(request)
payload_value = _decode_payload(
await request.body(),
content_type=content_type,
@@ -420,7 +428,7 @@ async def ingest_board_webhook(
board_id=board.id,
webhook_id=webhook.id,
payload=payload_value,
headers=_captured_headers(request),
headers=headers,
source_ip=request.client.host if request.client else None,
content_type=content_type,
)
@@ -438,12 +446,25 @@ async def ingest_board_webhook(
)
session.add(memory)
await session.commit()
enqueued = enqueue_webhook_delivery(
QueuedWebhookDelivery(
board_id=board.id,
webhook_id=webhook.id,
payload_id=payload.id,
payload_event=_extract_webhook_event(headers),
received_at=payload.received_at,
),
)
if not enqueued:
# Preserve historical behavior by still notifying synchronously if queueing fails.
await _notify_lead_on_webhook_payload(
session=session,
board=board,
webhook=webhook,
payload=payload,
)
return BoardWebhookIngestResponse(
board_id=board.id,
webhook_id=webhook.id,

View File

@@ -53,6 +53,17 @@ class Settings(BaseSettings):
# Database lifecycle
db_auto_migrate: bool = False
# Webhook queueing / dispatch
webhook_redis_url: str = "redis://localhost:6379/0"
webhook_queue_name: str = "webhook-dispatch"
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
webhook_dispatch_throttle_seconds: float = 2.0
webhook_dispatch_schedule_interval_seconds: int = 900
webhook_dispatch_max_retries: int = 3
# If true, suppress high-volume routine CI telemetry events (e.g. GitHub check_run success)
# from lead notifications. Payloads are still persisted and recorded in board memory.
webhook_dispatch_suppress_routine_events: bool = True
# Logging
log_level: str = "INFO"
log_format: str = "text"

View File

@@ -0,0 +1,3 @@
"""Webhook queueing and dispatch utilities."""
__all__ = ["dispatch", "queue", "scheduler"]

View File

@@ -0,0 +1,334 @@
"""Webhook dispatch worker routines."""
from __future__ import annotations
import asyncio
import time
from sqlmodel.ext.asyncio.session import AsyncSession
from uuid import UUID
from app.core.config import settings
from app.core.logging import get_logger
from app.db.session import async_session_maker
from app.models.agents import Agent
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.webhooks.queue import (
QueuedWebhookDelivery,
dequeue_webhook_delivery,
requeue_if_failed,
)
logger = get_logger(__name__)
_ROUTINE_GITHUB_EVENTS = frozenset({"check_run", "check_suite", "workflow_run"})
_SUCCESS_GITHUB_CONCLUSIONS = frozenset({None, "success", "neutral", "skipped"})
# Consider these actionable enough to page the lead / surface in task threads.
_ACTIONABLE_GITHUB_CONCLUSIONS = frozenset(
{
"failure",
"cancelled",
"timed_out",
"action_required",
"stale",
"startup_failure",
},
)
def _as_dict(value: object) -> dict[str, object] | None:
if isinstance(value, dict):
# Keep only string keys; payloads can include non-str keys in edge cases.
normalized: dict[str, object] = {}
for k, v in value.items():
if isinstance(k, str):
normalized[k] = v
return normalized
return None
def _str_or_none(value: object) -> str | None:
if value is None:
return None
if isinstance(value, str):
return value
return str(value)
def _extract_github_conclusion(payload: dict[str, object], *, key: str) -> str | None:
container = _as_dict(payload.get(key))
if not container:
return None
return _str_or_none(container.get("conclusion"))
def _extract_github_status(payload: dict[str, object], *, key: str) -> str | None:
container = _as_dict(payload.get(key))
if not container:
return None
return _str_or_none(container.get("status"))
def _should_suppress_routine_delivery(
*,
payload_event: str | None,
payload_value: object,
) -> bool:
"""Return True if this delivery is routine noise and should not notify leads.
This intentionally only targets high-volume GitHub CI telemetry events.
We still persist the webhook payload + board memory entry for audit/debug.
"""
if not settings.webhook_dispatch_suppress_routine_events:
return False
if payload_event not in _ROUTINE_GITHUB_EVENTS:
return False
payload = _as_dict(payload_value)
if payload is None:
return False
action = _str_or_none(payload.get("action"))
# If GitHub hasn't marked it completed, it's almost always noise.
if action and action != "completed":
return True
if payload_event == "workflow_run":
status = _extract_github_status(payload, key="workflow_run")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="workflow_run")
elif payload_event == "check_run":
status = _extract_github_status(payload, key="check_run")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="check_run")
else: # check_suite
status = _extract_github_status(payload, key="check_suite")
if status and status != "completed":
return True
conclusion = _extract_github_conclusion(payload, key="check_suite")
if conclusion in _SUCCESS_GITHUB_CONCLUSIONS:
return True
# Only page on explicitly non-success conclusions.
return conclusion not in _ACTIONABLE_GITHUB_CONCLUSIONS
def _build_payload_preview(payload_value: object) -> str:
if isinstance(payload_value, str):
return payload_value
try:
import json
return json.dumps(payload_value, indent=2, ensure_ascii=True)
except TypeError:
return str(payload_value)
def _payload_preview(payload_value: object) -> str:
preview = _build_payload_preview(payload_value)
if len(preview) <= 1600:
return preview
return f"{preview[:1597]}..."
def _webhook_message(
*,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> str:
preview = _payload_preview(payload.payload)
return (
"WEBHOOK EVENT RECEIVED\n"
f"Board: {board.name}\n"
f"Webhook ID: {webhook.id}\n"
f"Payload ID: {payload.id}\n"
f"Instruction: {webhook.description}\n\n"
"Take action:\n"
"1) Triage this payload against the webhook instruction.\n"
"2) Create/update tasks as needed.\n"
f"3) Reference payload ID {payload.id} in task descriptions.\n\n"
"Payload preview:\n"
f"{preview}\n\n"
"To inspect board memory entries:\n"
f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false"
)
async def _notify_lead(
*,
session: AsyncSession,
board: Board,
webhook: BoardWebhook,
payload: BoardWebhookPayload,
) -> None:
lead = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(session)
if lead is None or not lead.openclaw_session_id:
return
dispatch = GatewayDispatchService(session)
config = await dispatch.optional_gateway_config_for_board(board)
if config is None:
return
message = _webhook_message(board=board, webhook=webhook, payload=payload)
await dispatch.try_send_agent_message(
session_key=lead.openclaw_session_id,
config=config,
agent_name=lead.name,
message=message,
deliver=False,
)
async def _load_webhook_payload(
*,
session: AsyncSession,
payload_id: UUID,
webhook_id: UUID,
board_id: UUID,
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
payload = await session.get(BoardWebhookPayload, payload_id)
if payload is None:
logger.warning(
"webhook.queue.payload_missing",
extra={
"payload_id": str(payload_id),
"webhook_id": str(webhook_id),
"board_id": str(board_id),
},
)
return None
if payload.board_id != board_id or payload.webhook_id != webhook_id:
logger.warning(
"webhook.queue.payload_mismatch",
extra={
"payload_id": str(payload_id),
"payload_webhook_id": str(payload.webhook_id),
"payload_board_id": str(payload.board_id),
},
)
return None
board = await Board.objects.by_id(board_id).first(session)
if board is None:
logger.warning(
"webhook.queue.board_missing",
extra={"board_id": str(board_id), "payload_id": str(payload_id)},
)
return None
webhook = await session.get(BoardWebhook, webhook_id)
if webhook is None:
logger.warning(
"webhook.queue.webhook_missing",
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
)
return None
if webhook.board_id != board_id:
logger.warning(
"webhook.queue.webhook_board_mismatch",
extra={
"webhook_id": str(webhook_id),
"payload_board_id": str(payload.board_id),
"expected_board_id": str(board_id),
},
)
return None
return board, webhook, payload
async def _process_single_item(item: QueuedWebhookDelivery) -> None:
async with async_session_maker() as session:
loaded = await _load_webhook_payload(
session=session,
payload_id=item.payload_id,
webhook_id=item.webhook_id,
board_id=item.board_id,
)
if loaded is None:
return
board, webhook, payload = loaded
if _should_suppress_routine_delivery(
payload_event=item.payload_event,
payload_value=payload.payload,
):
logger.info(
"webhook.dispatch.suppressed_routine",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"event": item.payload_event,
"attempt": item.attempts,
},
)
return
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
await session.commit()
async def flush_webhook_delivery_queue() -> None:
"""Consume queued webhook events and notify board leads in a throttled batch."""
processed = 0
while True:
try:
item = dequeue_webhook_delivery()
except Exception:
logger.exception("webhook.dispatch.dequeue_failed")
continue
if item is None:
break
try:
await _process_single_item(item)
processed += 1
logger.info(
"webhook.dispatch.success",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
},
)
except Exception as exc:
logger.exception(
"webhook.dispatch.failed",
extra={
"payload_id": str(item.payload_id),
"webhook_id": str(item.webhook_id),
"board_id": str(item.board_id),
"attempt": item.attempts,
"error": str(exc),
},
)
requeue_if_failed(item)
time.sleep(settings.webhook_dispatch_throttle_seconds)
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
def run_flush_webhook_delivery_queue() -> None:
"""RQ entrypoint for running the async queue flush from worker jobs."""
logger.info(
"webhook.dispatch.batch_started",
extra={"throttle_seconds": settings.webhook_dispatch_throttle_seconds},
)
start = time.time()
asyncio.run(flush_webhook_delivery_queue())
elapsed_ms = int((time.time() - start) * 1000)
logger.info("webhook.dispatch.batch_finished", extra={"duration_ms": elapsed_ms})

View File

@@ -0,0 +1,136 @@
"""Webhook queue persistence and delivery helpers."""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from uuid import UUID
from typing import cast
import redis
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
@dataclass(frozen=True)
class QueuedWebhookDelivery:
"""Payload metadata stored for deferred webhook lead dispatch."""
board_id: UUID
webhook_id: UUID
payload_id: UUID
payload_event: str | None
received_at: datetime
attempts: int = 0
def to_json(self) -> str:
return json.dumps(
{
"board_id": str(self.board_id),
"webhook_id": str(self.webhook_id),
"payload_id": str(self.payload_id),
"payload_event": self.payload_event,
"received_at": self.received_at.isoformat(),
"attempts": self.attempts,
},
sort_keys=True,
)
def _redis_client() -> redis.Redis:
return redis.Redis.from_url(settings.webhook_redis_url)
def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
"""Persist webhook metadata in a Redis queue for batch dispatch."""
try:
client = _redis_client()
client.lpush(settings.webhook_queue_name, payload.to_json())
logger.info(
"webhook.queue.enqueued",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempt": payload.attempts,
},
)
return True
except Exception as exc:
logger.warning(
"webhook.queue.enqueue_failed",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"error": str(exc),
},
)
return False
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
"""Pop one queued webhook delivery payload."""
client = _redis_client()
raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
if raw is None:
return None
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
try:
payload: dict[str, Any] = json.loads(raw)
event = payload.get("payload_event")
if event is not None:
event = str(event)
return QueuedWebhookDelivery(
board_id=UUID(payload["board_id"]),
webhook_id=UUID(payload["webhook_id"]),
payload_id=UUID(payload["payload_id"]),
payload_event=event,
received_at=datetime.fromisoformat(payload["received_at"]),
attempts=int(payload.get("attempts", 0)),
)
except Exception as exc:
logger.error(
"webhook.queue.dequeue_failed",
extra={"raw_payload": str(raw), "error": str(exc)},
)
raise
def _requeue_with_attempt(payload: QueuedWebhookDelivery) -> None:
payload = QueuedWebhookDelivery(
board_id=payload.board_id,
webhook_id=payload.webhook_id,
payload_id=payload.payload_id,
payload_event=payload.payload_event,
received_at=payload.received_at,
attempts=payload.attempts + 1,
)
enqueue_webhook_delivery(payload)
def requeue_if_failed(payload: QueuedWebhookDelivery) -> bool:
"""Requeue payload delivery with capped retries.
Returns True if requeued.
"""
if payload.attempts >= settings.webhook_dispatch_max_retries:
logger.warning(
"webhook.queue.drop_failed_delivery",
extra={
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempts": payload.attempts,
},
)
return False
_requeue_with_attempt(payload)
return True

View File

@@ -0,0 +1,36 @@
"""Webhook dispatch scheduler bootstrap for rq-scheduler."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from redis import Redis
from rq_scheduler import Scheduler # type: ignore[import-untyped]
from app.core.config import settings
from app.services.webhooks import dispatch
def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None:
"""Register a recurring queue-flush job and keep it idempotent."""
connection = Redis.from_url(settings.webhook_redis_url)
scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection)
for job in scheduler.get_jobs():
if job.id == settings.webhook_dispatch_schedule_id:
scheduler.cancel(job)
effective_interval_seconds = (
settings.webhook_dispatch_schedule_interval_seconds
if interval_seconds is None
else interval_seconds
)
scheduler.schedule(
datetime.now(tz=timezone.utc) + timedelta(seconds=5),
func=dispatch.run_flush_webhook_delivery_queue,
interval=effective_interval_seconds,
repeat=None,
id=settings.webhook_dispatch_schedule_id,
queue_name=settings.webhook_queue_name,
)

View File

@@ -25,6 +25,9 @@ dependencies = [
"sse-starlette==3.2.0",
"uvicorn[standard]==0.40.0",
"websockets==16.0",
"redis==6.3.0",
"rq==2.6.0",
"rq-scheduler==0.14.0",
]
[project.optional-dependencies]

View File

@@ -23,6 +23,7 @@ from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organizations import Organization
from app.services.webhooks.queue import QueuedWebhookDelivery
async def _make_engine() -> AsyncEngine:
@@ -112,7 +113,7 @@ async def _seed_webhook(
@pytest.mark.asyncio
async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
async def test_ingest_board_webhook_stores_payload_and_enqueues_for_lead_dispatch(
monkeypatch: pytest.MonkeyPatch,
) -> None:
engine = await _make_engine()
@@ -122,16 +123,23 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
expire_on_commit=False,
)
app = _build_test_app(session_maker)
enqueued: list[dict[str, object]] = []
sent_messages: list[dict[str, str]] = []
async with session_maker() as session:
board, webhook = await _seed_webhook(session, enabled=True)
async def _fake_optional_gateway_config_for_board(
self: board_webhooks.GatewayDispatchService,
_board: Board,
) -> object:
return object()
def _fake_enqueue(payload: QueuedWebhookDelivery) -> bool:
enqueued.append(
{
"board_id": str(payload.board_id),
"webhook_id": str(payload.webhook_id),
"payload_id": str(payload.payload_id),
"attempts": payload.attempts,
"event": payload.payload_event,
},
)
return True
async def _fake_try_send_agent_message(
self: board_webhooks.GatewayDispatchService,
@@ -145,7 +153,7 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
del self, config, deliver
sent_messages.append(
{
"session_key": session_key,
"session_id": session_key,
"agent_name": agent_name,
"message": message,
},
@@ -153,9 +161,9 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
return None
monkeypatch.setattr(
board_webhooks.GatewayDispatchService,
"optional_gateway_config_for_board",
_fake_optional_gateway_config_for_board,
board_webhooks,
"enqueue_webhook_delivery",
_fake_enqueue,
)
monkeypatch.setattr(
board_webhooks.GatewayDispatchService,
@@ -204,11 +212,12 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
assert f"payload:{payload_id}" in memory_items[0].tags
assert f"Payload ID: {payload_id}" in memory_items[0].content
assert len(sent_messages) == 1
assert sent_messages[0]["session_key"] == "lead:session:key"
assert "WEBHOOK EVENT RECEIVED" in sent_messages[0]["message"]
assert str(payload_id) in sent_messages[0]["message"]
assert webhook.description in sent_messages[0]["message"]
assert len(enqueued) == 1
assert enqueued[0]["board_id"] == str(board.id)
assert enqueued[0]["webhook_id"] == str(webhook.id)
assert enqueued[0]["payload_id"] == str(payload_id)
assert len(sent_messages) == 0
finally:
await engine.dispose()

View File

@@ -0,0 +1,201 @@
# ruff: noqa: INP001
"""Webhook queue and dispatch worker tests."""
from __future__ import annotations
from datetime import UTC, datetime
from uuid import UUID, uuid4
import pytest
from app.services.webhooks import dispatch
from app.services.webhooks.queue import (
QueuedWebhookDelivery,
dequeue_webhook_delivery,
enqueue_webhook_delivery,
requeue_if_failed,
)
class _FakeRedis:
def __init__(self) -> None:
self.values: list[str] = []
def lpush(self, key: str, value: str) -> None:
self.values.insert(0, value)
def rpop(self, key: str) -> str | None:
if not self.values:
return None
return self.values.pop()
@pytest.mark.parametrize("attempts", [0, 1, 2])
def test_webhook_queue_roundtrip(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None:
fake = _FakeRedis()
def _fake_redis() -> _FakeRedis:
return fake
board_id = uuid4()
webhook_id = uuid4()
payload_id = uuid4()
payload = QueuedWebhookDelivery(
board_id=board_id,
webhook_id=webhook_id,
payload_id=payload_id,
payload_event="push",
received_at=datetime.now(UTC),
attempts=attempts,
)
monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis)
assert enqueue_webhook_delivery(payload)
dequeued = dequeue_webhook_delivery()
assert dequeued is not None
assert dequeued.board_id == board_id
assert dequeued.webhook_id == webhook_id
assert dequeued.payload_id == payload_id
assert dequeued.payload_event == "push"
assert dequeued.attempts == attempts
@pytest.mark.parametrize("attempts", [0, 1, 2, 3])
def test_requeue_respects_retry_cap(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None:
fake = _FakeRedis()
def _fake_redis() -> _FakeRedis:
return fake
monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis)
payload = QueuedWebhookDelivery(
board_id=uuid4(),
webhook_id=uuid4(),
payload_id=uuid4(),
payload_event="push",
received_at=datetime.now(UTC),
attempts=attempts,
)
if attempts >= 3:
assert requeue_if_failed(payload) is False
assert fake.values == []
else:
assert requeue_if_failed(payload) is True
requeued = dequeue_webhook_delivery()
assert requeued is not None
assert requeued.attempts == attempts + 1
class _FakeQueuedItem:
def __init__(self, attempts: int = 0) -> None:
self.payload_id = uuid4()
self.webhook_id = uuid4()
self.board_id = uuid4()
self.attempts = attempts
def _patch_dequeue(monkeypatch: pytest.MonkeyPatch, items: list[QueuedWebhookDelivery | None]) -> None:
def _dequeue() -> QueuedWebhookDelivery | None:
if not items:
return None
return items.pop(0)
monkeypatch.setattr(dispatch, "dequeue_webhook_delivery", _dequeue)
@pytest.mark.asyncio
async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest.MonkeyPatch) -> None:
items: list[QueuedWebhookDelivery | None] = [
_FakeQueuedItem(),
_FakeQueuedItem(),
None,
]
_patch_dequeue(monkeypatch, items)
processed: list[UUID] = []
throttles: list[float] = []
async def _process(item: QueuedWebhookDelivery) -> None:
processed.append(item.payload_id)
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: throttles.append(seconds))
await dispatch.flush_webhook_delivery_queue()
assert len(processed) == 2
assert throttles == [0.0, 0.0]
@pytest.mark.asyncio
async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.MonkeyPatch) -> None:
item = _FakeQueuedItem()
_patch_dequeue(monkeypatch, [item, None])
async def _process(_: QueuedWebhookDelivery) -> None:
raise RuntimeError("boom")
requeued: list[QueuedWebhookDelivery] = []
def _requeue(payload: QueuedWebhookDelivery) -> bool:
requeued.append(payload)
return True
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
await dispatch.flush_webhook_delivery_queue()
assert len(requeued) == 1
assert requeued[0].payload_id == item.payload_id
@pytest.mark.asyncio
async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.MonkeyPatch) -> None:
item = _FakeQueuedItem()
call_count = 0
def _dequeue() -> QueuedWebhookDelivery | None:
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("dequeue broken")
if call_count == 2:
return item
return None
monkeypatch.setattr(dispatch, "dequeue_webhook_delivery", _dequeue)
processed = 0
async def _process(_: QueuedWebhookDelivery) -> None:
nonlocal processed
processed += 1
monkeypatch.setattr(dispatch, "_process_single_item", _process)
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
await dispatch.flush_webhook_delivery_queue()
assert call_count == 3
assert processed == 1
def test_dispatch_run_entrypoint_calls_async_flush(monkeypatch: pytest.MonkeyPatch) -> None:
called: list[bool] = []
async def _flush() -> None:
called.append(True)
monkeypatch.setattr(dispatch, "flush_webhook_delivery_queue", _flush)
dispatch.run_flush_webhook_delivery_queue()
assert called == [True]

View File

@@ -0,0 +1,71 @@
# ruff: noqa: INP001
from __future__ import annotations
import pytest
from app.services.webhooks import dispatch
@pytest.mark.parametrize(
("payload_event", "payload_value", "expected"),
[
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "success"}}, True),
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": None}}, True),
("check_run", {"action": "created", "check_run": {"status": "queued"}}, True),
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "failure"}}, False),
(
"workflow_run",
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "success"}},
True,
),
(
"workflow_run",
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "cancelled"}},
False,
),
(
"check_suite",
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "timed_out"}},
False,
),
(
"check_suite",
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "neutral"}},
True,
),
# Non-target events should not be suppressed by this helper.
("pull_request", {"action": "opened"}, False),
(None, {"action": "opened"}, False),
# Non-dict payloads: don't suppress (we can't reason about it).
("check_run", "raw", False),
],
)
def test_should_suppress_routine_delivery(
monkeypatch: pytest.MonkeyPatch,
payload_event: str | None,
payload_value: object,
expected: bool,
) -> None:
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", True)
assert (
dispatch._should_suppress_routine_delivery(
payload_event=payload_event,
payload_value=payload_value,
)
is expected
)
def test_suppression_disabled_via_settings(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", False)
assert (
dispatch._should_suppress_routine_delivery(
payload_event="check_run",
payload_value={
"action": "completed",
"check_run": {"status": "completed", "conclusion": "success"},
},
)
is False
)

105
backend/uv.lock generated
View File

@@ -279,6 +279,25 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0d/4a/331fe2caf6799d591109bb9c08083080f6de90a823695d412a935622abb2/coverage-7.13.4-py3-none-any.whl", hash = "sha256:1af1641e57cf7ba1bd67d677c9abdbcd6cc2ab7da3bca7fa1e2b7e50e65f2ad0", size = 211242, upload-time = "2026-02-09T12:59:02.032Z" },
]
[[package]]
name = "croniter"
version = "6.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "python-dateutil" },
{ name = "pytz" },
]
sdist = { url = "https://files.pythonhosted.org/packages/ad/2f/44d1ae153a0e27be56be43465e5cb39b9650c781e001e7864389deb25090/croniter-6.0.0.tar.gz", hash = "sha256:37c504b313956114a983ece2c2b07790b1f1094fe9d81cc94739214748255577", size = 64481, upload-time = "2024-12-17T17:17:47.32Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/4b/290b4c3efd6417a8b0c284896de19b1d5855e6dbdb97d2a35e68fa42de85/croniter-6.0.0-py2.py3-none-any.whl", hash = "sha256:2f878c3856f17896979b2a4379ba1f09c83e374931ea15cc835c5dd2eee9b368", size = 25468, upload-time = "2024-12-17T17:17:45.359Z" },
]
[[package]]
name = "crontab"
version = "1.0.5"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/36/a255b6f5a2e22df03fd2b2f3088974b44b8c9e9407e26b44742cb7cfbf5b/crontab-1.0.5.tar.gz", hash = "sha256:f80e01b4f07219763a9869f926dd17147278e7965a928089bca6d3dc80ae46d5", size = 21963, upload-time = "2025-07-09T17:09:38.264Z" }
[[package]]
name = "cryptography"
version = "45.0.7"
@@ -358,6 +377,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9f/56/13ab06b4f93ca7cac71078fbe37fcea175d3216f31f85c3168a6bbd0bb9a/flake8-7.3.0-py2.py3-none-any.whl", hash = "sha256:b9696257b9ce8beb888cdbe31cf885c90d31928fe202be0889a7cdafad32f01e", size = 57922, upload-time = "2025-06-20T19:31:34.425Z" },
]
[[package]]
name = "freezegun"
version = "1.5.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "python-dateutil" },
]
sdist = { url = "https://files.pythonhosted.org/packages/95/dd/23e2f4e357f8fd3bdff613c1fe4466d21bfb00a6177f238079b17f7b1c84/freezegun-1.5.5.tar.gz", hash = "sha256:ac7742a6cc6c25a2c35e9292dfd554b897b517d2dec26891a2e8debf205cb94a", size = 35914, upload-time = "2025-08-09T10:39:08.338Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/5e/2e/b41d8a1a917d6581fc27a35d05561037b048e47df50f27f8ac9c7e27a710/freezegun-1.5.5-py3-none-any.whl", hash = "sha256:cd557f4a75cf074e84bc374249b9dd491eaeacd61376b9eb3c423282211619d2", size = 19266, upload-time = "2025-08-09T10:39:06.636Z" },
]
[[package]]
name = "greenlet"
version = "3.3.1"
@@ -697,6 +728,9 @@ dependencies = [
{ name = "psycopg", extra = ["binary"] },
{ name = "pydantic-settings" },
{ name = "python-dotenv" },
{ name = "redis" },
{ name = "rq" },
{ name = "rq-scheduler" },
{ name = "sqlalchemy", extra = ["asyncio"] },
{ name = "sqlmodel" },
{ name = "sse-starlette" },
@@ -739,6 +773,9 @@ requires-dist = [
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = "==1.3.0" },
{ name = "pytest-cov", marker = "extra == 'dev'", specifier = "==7.0.0" },
{ name = "python-dotenv", specifier = "==1.2.1" },
{ name = "redis", specifier = "==6.3.0" },
{ name = "rq", specifier = "==2.6.0" },
{ name = "rq-scheduler", specifier = "==0.14.0" },
{ name = "ruff", marker = "extra == 'dev'", specifier = "==0.15.0" },
{ name = "sqlalchemy", extras = ["asyncio"], specifier = "==2.0.46" },
{ name = "sqlmodel", specifier = "==0.0.32" },
@@ -1030,6 +1067,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" },
]
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "six" },
]
sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432, upload-time = "2024-03-01T18:36:20.211Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" },
]
[[package]]
name = "python-dotenv"
version = "1.2.1"
@@ -1068,6 +1117,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c6/78/397db326746f0a342855b81216ae1f0a32965deccfd7c830a2dbc66d2483/pytokens-0.4.1-py3-none-any.whl", hash = "sha256:26cef14744a8385f35d0e095dc8b3a7583f6c953c2e3d269c7f82484bf5ad2de", size = 13729, upload-time = "2026-01-30T01:03:45.029Z" },
]
[[package]]
name = "pytz"
version = "2025.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f8/bf/abbd3cdfb8fbc7fb3d4d38d320f2441b1e7cbe29be4f23797b4a2b5d8aac/pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3", size = 320884, upload-time = "2025-03-25T02:25:00.538Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225, upload-time = "2025-03-25T02:24:58.468Z" },
]
[[package]]
name = "pyyaml"
version = "6.0.3"
@@ -1114,6 +1172,44 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" },
]
[[package]]
name = "redis"
version = "6.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/21/cd/030274634a1a052b708756016283ea3d84e91ae45f74d7f5dcf55d753a0f/redis-6.3.0.tar.gz", hash = "sha256:3000dbe532babfb0999cdab7b3e5744bcb23e51923febcfaeb52c8cfb29632ef", size = 4647275, upload-time = "2025-08-05T08:12:31.648Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/df/a7/2fe45801534a187543fc45d28b3844d84559c1589255bc2ece30d92dc205/redis-6.3.0-py3-none-any.whl", hash = "sha256:92f079d656ded871535e099080f70fab8e75273c0236797126ac60242d638e9b", size = 280018, upload-time = "2025-08-05T08:12:30.093Z" },
]
[[package]]
name = "rq"
version = "2.6.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
{ name = "croniter" },
{ name = "redis" },
]
sdist = { url = "https://files.pythonhosted.org/packages/8e/f5/46e39abc46ff6ff4f3151ee4fd2c1bf7601a8d26bd30fd951c5496b1e6c6/rq-2.6.0.tar.gz", hash = "sha256:92ad55676cda14512c4eea5782f398a102dc3af108bea197c868c4c50c5d3e81", size = 675315, upload-time = "2025-09-06T03:15:12.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cc/66/6cf141584526e3ed5b57a194e09cbdf7058334bd3926bb3f96e2453cf053/rq-2.6.0-py3-none-any.whl", hash = "sha256:be5ccc0f0fc5f32da0999648340e31476368f08067f0c3fce6768d00064edbb5", size = 112533, upload-time = "2025-09-06T03:15:09.894Z" },
]
[[package]]
name = "rq-scheduler"
version = "0.14.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "crontab" },
{ name = "freezegun" },
{ name = "python-dateutil" },
{ name = "rq" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a0/4e/977bbcc1f3b25ed9ea60ec968b13f7147661defe5b2f9272b44fdb1c5549/rq-scheduler-0.14.0.tar.gz", hash = "sha256:2d5a14a1ab217f8693184ebaa1fe03838edcbc70b4f76572721c0b33058cd023", size = 16582, upload-time = "2024-10-29T13:30:32.641Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bb/d0/28cedca9f3b321f30e69d644c2dcd7097ec21570ec9606fde56750621300/rq_scheduler-0.14.0-py2.py3-none-any.whl", hash = "sha256:d4ec221a3d8c11b3ff55e041f09d9af1e17f3253db737b6b97e86ab20fc3dc0d", size = 13874, upload-time = "2024-10-29T13:30:30.449Z" },
]
[[package]]
name = "ruff"
version = "0.15.0"
@@ -1139,6 +1235,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f6/b0/2d823f6e77ebe560f4e397d078487e8d52c1516b331e3521bc75db4272ca/ruff-0.15.0-py3-none-win_arm64.whl", hash = "sha256:c480d632cc0ca3f0727acac8b7d053542d9e114a462a145d0b00e7cd658c515a", size = 10865753, upload-time = "2026-02-03T17:53:03.014Z" },
]
[[package]]
name = "six"
version = "1.17.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" },
]
[[package]]
name = "sqlalchemy"
version = "2.0.46"

View File

@@ -17,6 +17,11 @@ services:
timeout: 3s
retries: 20
redis:
image: redis:7-alpine
ports:
- "${REDIS_PORT:-6379}:6379"
backend:
build:
# Build from repo root so the backend image can include repo-level assets
@@ -32,9 +37,12 @@ services:
DB_AUTO_MIGRATE: ${DB_AUTO_MIGRATE:-true}
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
ports:
- "${BACKEND_PORT:-8000}:8000"
@@ -58,5 +66,46 @@ services:
ports:
- "${FRONTEND_PORT:-3000}:3000"
webhook-worker:
build:
context: .
dockerfile: backend/Dockerfile
command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"]
env_file:
- ./backend/.env.example
depends_on:
redis:
condition: service_started
db:
condition: service_healthy
environment:
DATABASE_URL: postgresql+psycopg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-mission_control}
AUTH_MODE: ${AUTH_MODE}
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
restart: unless-stopped
webhook-dispatch-cron:
build:
context: .
dockerfile: backend/Dockerfile
command:
- sh
- -c
- |
python -c "from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule; bootstrap_webhook_dispatch_schedule()" && \
rqscheduler -u redis://redis:6379/0 -i 60
depends_on:
- redis
- webhook-worker
environment:
WEBHOOK_REDIS_URL: redis://redis:6379/0
WEBHOOK_QUEUE_NAME: webhook-dispatch
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900
restart: unless-stopped
volumes:
postgres_data: