feat: add RQ-based webhook dispatch queue and delayed worker
This commit is contained in:
@@ -23,6 +23,7 @@ from app.models.board_webhooks import BoardWebhook
|
||||
from app.models.boards import Board
|
||||
from app.models.gateways import Gateway
|
||||
from app.models.organizations import Organization
|
||||
from app.services.webhooks.queue import QueuedWebhookDelivery
|
||||
|
||||
|
||||
async def _make_engine() -> AsyncEngine:
|
||||
@@ -112,7 +113,7 @@ async def _seed_webhook(
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
async def test_ingest_board_webhook_stores_payload_and_enqueues_for_lead_dispatch(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
engine = await _make_engine()
|
||||
@@ -122,16 +123,23 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
expire_on_commit=False,
|
||||
)
|
||||
app = _build_test_app(session_maker)
|
||||
enqueued: list[dict[str, object]] = []
|
||||
sent_messages: list[dict[str, str]] = []
|
||||
|
||||
async with session_maker() as session:
|
||||
board, webhook = await _seed_webhook(session, enabled=True)
|
||||
|
||||
async def _fake_optional_gateway_config_for_board(
|
||||
self: board_webhooks.GatewayDispatchService,
|
||||
_board: Board,
|
||||
) -> object:
|
||||
return object()
|
||||
def _fake_enqueue(payload: QueuedWebhookDelivery) -> bool:
|
||||
enqueued.append(
|
||||
{
|
||||
"board_id": str(payload.board_id),
|
||||
"webhook_id": str(payload.webhook_id),
|
||||
"payload_id": str(payload.payload_id),
|
||||
"attempts": payload.attempts,
|
||||
"event": payload.payload_event,
|
||||
},
|
||||
)
|
||||
return True
|
||||
|
||||
async def _fake_try_send_agent_message(
|
||||
self: board_webhooks.GatewayDispatchService,
|
||||
@@ -145,7 +153,7 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
del self, config, deliver
|
||||
sent_messages.append(
|
||||
{
|
||||
"session_key": session_key,
|
||||
"session_id": session_key,
|
||||
"agent_name": agent_name,
|
||||
"message": message,
|
||||
},
|
||||
@@ -153,9 +161,9 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(
|
||||
board_webhooks.GatewayDispatchService,
|
||||
"optional_gateway_config_for_board",
|
||||
_fake_optional_gateway_config_for_board,
|
||||
board_webhooks,
|
||||
"enqueue_webhook_delivery",
|
||||
_fake_enqueue,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
board_webhooks.GatewayDispatchService,
|
||||
@@ -204,11 +212,12 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
assert f"payload:{payload_id}" in memory_items[0].tags
|
||||
assert f"Payload ID: {payload_id}" in memory_items[0].content
|
||||
|
||||
assert len(sent_messages) == 1
|
||||
assert sent_messages[0]["session_key"] == "lead:session:key"
|
||||
assert "WEBHOOK EVENT RECEIVED" in sent_messages[0]["message"]
|
||||
assert str(payload_id) in sent_messages[0]["message"]
|
||||
assert webhook.description in sent_messages[0]["message"]
|
||||
assert len(enqueued) == 1
|
||||
assert enqueued[0]["board_id"] == str(board.id)
|
||||
assert enqueued[0]["webhook_id"] == str(webhook.id)
|
||||
assert enqueued[0]["payload_id"] == str(payload_id)
|
||||
|
||||
assert len(sent_messages) == 0
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
88
backend/tests/test_webhook_dispatch.py
Normal file
88
backend/tests/test_webhook_dispatch.py
Normal file
@@ -0,0 +1,88 @@
|
||||
# ruff: noqa: INP001
|
||||
"""Webhook queue helper unit tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.webhooks.queue import (
|
||||
QueuedWebhookDelivery,
|
||||
dequeue_webhook_delivery,
|
||||
enqueue_webhook_delivery,
|
||||
requeue_if_failed,
|
||||
)
|
||||
|
||||
|
||||
class _FakeRedis:
|
||||
def __init__(self) -> None:
|
||||
self.values: list[str] = []
|
||||
|
||||
def lpush(self, key: str, value: str) -> None:
|
||||
self.values.insert(0, value)
|
||||
|
||||
def rpop(self, key: str) -> str | None:
|
||||
if not self.values:
|
||||
return None
|
||||
return self.values.pop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("attempts", [0, 1, 2])
|
||||
def test_webhook_queue_roundtrip(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None:
|
||||
fake = _FakeRedis()
|
||||
|
||||
def _fake_redis() -> _FakeRedis:
|
||||
return fake
|
||||
|
||||
board_id = uuid4()
|
||||
webhook_id = uuid4()
|
||||
payload_id = uuid4()
|
||||
payload = QueuedWebhookDelivery(
|
||||
board_id=board_id,
|
||||
webhook_id=webhook_id,
|
||||
payload_id=payload_id,
|
||||
payload_event="push",
|
||||
received_at=datetime.now(UTC),
|
||||
attempts=attempts,
|
||||
)
|
||||
|
||||
monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis)
|
||||
assert enqueue_webhook_delivery(payload)
|
||||
|
||||
dequeued = dequeue_webhook_delivery()
|
||||
assert dequeued is not None
|
||||
assert dequeued.board_id == board_id
|
||||
assert dequeued.webhook_id == webhook_id
|
||||
assert dequeued.payload_id == payload_id
|
||||
assert dequeued.payload_event == "push"
|
||||
assert dequeued.attempts == attempts
|
||||
|
||||
|
||||
@pytest.mark.parametrize("attempts", [0, 1, 2, 3])
|
||||
def test_requeue_respects_retry_cap(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None:
|
||||
fake = _FakeRedis()
|
||||
|
||||
def _fake_redis() -> _FakeRedis:
|
||||
return fake
|
||||
|
||||
monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis)
|
||||
|
||||
payload = QueuedWebhookDelivery(
|
||||
board_id=uuid4(),
|
||||
webhook_id=uuid4(),
|
||||
payload_id=uuid4(),
|
||||
payload_event="push",
|
||||
received_at=datetime.now(UTC),
|
||||
attempts=attempts,
|
||||
)
|
||||
|
||||
if attempts >= 3:
|
||||
assert requeue_if_failed(payload) is False
|
||||
assert fake.values == []
|
||||
else:
|
||||
assert requeue_if_failed(payload) is True
|
||||
requeued = dequeue_webhook_delivery()
|
||||
assert requeued is not None
|
||||
assert requeued.attempts == attempts + 1
|
||||
Reference in New Issue
Block a user