Compare commits
7 Commits
master
...
feat/webho
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bb66190913 | ||
|
|
4557fcc8ae | ||
|
|
554b73184a | ||
|
|
912387bd1c | ||
|
|
d01365abfb | ||
|
|
e535f377ff | ||
|
|
b987db58b8 |
@@ -12,12 +12,19 @@ BASE_URL=
|
||||
AUTH_MODE=local
|
||||
# REQUIRED when AUTH_MODE=local (must be non-placeholder and at least 50 chars).
|
||||
LOCAL_AUTH_TOKEN=
|
||||
|
||||
# Clerk (auth only; used when AUTH_MODE=clerk)
|
||||
CLERK_SECRET_KEY=
|
||||
CLERK_API_URL=https://api.clerk.com
|
||||
CLERK_VERIFY_IAT=true
|
||||
CLERK_LEEWAY=10.0
|
||||
|
||||
# Database
|
||||
DB_AUTO_MIGRATE=false
|
||||
# Webhook queue / worker
|
||||
WEBHOOK_REDIS_URL=redis://localhost:6379/0
|
||||
WEBHOOK_QUEUE_NAME=webhook-dispatch
|
||||
WEBHOOK_DISPATCH_THROTTLE_SECONDS=2.0
|
||||
WEBHOOK_DISPATCH_SCHEDULE_ID=webhook-dispatch-batch
|
||||
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS=900
|
||||
WEBHOOK_DISPATCH_MAX_RETRIES=3
|
||||
# Suppress routine GitHub CI telemetry events from lead notifications (still persisted to DB/memory).
|
||||
WEBHOOK_DISPATCH_SUPPRESS_ROUTINE_EVENTS=true
|
||||
|
||||
@@ -29,6 +29,7 @@ from app.schemas.board_webhooks import (
|
||||
from app.schemas.common import OkResponse
|
||||
from app.schemas.pagination import DefaultLimitOffsetPage
|
||||
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
|
||||
from app.services.webhooks.queue import QueuedWebhookDelivery, enqueue_webhook_delivery
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
@@ -166,6 +167,12 @@ def _captured_headers(request: Request) -> dict[str, str] | None:
|
||||
return captured or None
|
||||
|
||||
|
||||
def _extract_webhook_event(headers: dict[str, str] | None) -> str | None:
|
||||
if not headers:
|
||||
return None
|
||||
return headers.get("x-github-event") or headers.get("x-event-type")
|
||||
|
||||
|
||||
def _payload_preview(
|
||||
value: dict[str, object] | list[object] | str | int | float | bool | None,
|
||||
) -> str:
|
||||
@@ -412,6 +419,7 @@ async def ingest_board_webhook(
|
||||
)
|
||||
|
||||
content_type = request.headers.get("content-type")
|
||||
headers = _captured_headers(request)
|
||||
payload_value = _decode_payload(
|
||||
await request.body(),
|
||||
content_type=content_type,
|
||||
@@ -420,7 +428,7 @@ async def ingest_board_webhook(
|
||||
board_id=board.id,
|
||||
webhook_id=webhook.id,
|
||||
payload=payload_value,
|
||||
headers=_captured_headers(request),
|
||||
headers=headers,
|
||||
source_ip=request.client.host if request.client else None,
|
||||
content_type=content_type,
|
||||
)
|
||||
@@ -438,12 +446,25 @@ async def ingest_board_webhook(
|
||||
)
|
||||
session.add(memory)
|
||||
await session.commit()
|
||||
|
||||
enqueued = enqueue_webhook_delivery(
|
||||
QueuedWebhookDelivery(
|
||||
board_id=board.id,
|
||||
webhook_id=webhook.id,
|
||||
payload_id=payload.id,
|
||||
payload_event=_extract_webhook_event(headers),
|
||||
received_at=payload.received_at,
|
||||
),
|
||||
)
|
||||
if not enqueued:
|
||||
# Preserve historical behavior by still notifying synchronously if queueing fails.
|
||||
await _notify_lead_on_webhook_payload(
|
||||
session=session,
|
||||
board=board,
|
||||
webhook=webhook,
|
||||
payload=payload,
|
||||
)
|
||||
|
||||
return BoardWebhookIngestResponse(
|
||||
board_id=board.id,
|
||||
webhook_id=webhook.id,
|
||||
|
||||
@@ -53,6 +53,17 @@ class Settings(BaseSettings):
|
||||
# Database lifecycle
|
||||
db_auto_migrate: bool = False
|
||||
|
||||
# Webhook queueing / dispatch
|
||||
webhook_redis_url: str = "redis://localhost:6379/0"
|
||||
webhook_queue_name: str = "webhook-dispatch"
|
||||
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
|
||||
webhook_dispatch_throttle_seconds: float = 2.0
|
||||
webhook_dispatch_schedule_interval_seconds: int = 900
|
||||
webhook_dispatch_max_retries: int = 3
|
||||
# If true, suppress high-volume routine CI telemetry events (e.g. GitHub check_run success)
|
||||
# from lead notifications. Payloads are still persisted and recorded in board memory.
|
||||
webhook_dispatch_suppress_routine_events: bool = True
|
||||
|
||||
# Logging
|
||||
log_level: str = "INFO"
|
||||
log_format: str = "text"
|
||||
|
||||
3
backend/app/services/webhooks/__init__.py
Normal file
3
backend/app/services/webhooks/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""Webhook queueing and dispatch utilities."""
|
||||
|
||||
__all__ = ["dispatch", "queue", "scheduler"]
|
||||
334
backend/app/services/webhooks/dispatch.py
Normal file
334
backend/app/services/webhooks/dispatch.py
Normal file
@@ -0,0 +1,334 @@
|
||||
"""Webhook dispatch worker routines."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
from uuid import UUID
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
from app.db.session import async_session_maker
|
||||
from app.models.agents import Agent
|
||||
from app.models.board_webhook_payloads import BoardWebhookPayload
|
||||
from app.models.board_webhooks import BoardWebhook
|
||||
from app.models.boards import Board
|
||||
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
|
||||
from app.services.webhooks.queue import (
|
||||
QueuedWebhookDelivery,
|
||||
dequeue_webhook_delivery,
|
||||
requeue_if_failed,
|
||||
)
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_ROUTINE_GITHUB_EVENTS = frozenset({"check_run", "check_suite", "workflow_run"})
|
||||
_SUCCESS_GITHUB_CONCLUSIONS = frozenset({None, "success", "neutral", "skipped"})
|
||||
# Consider these actionable enough to page the lead / surface in task threads.
|
||||
_ACTIONABLE_GITHUB_CONCLUSIONS = frozenset(
|
||||
{
|
||||
"failure",
|
||||
"cancelled",
|
||||
"timed_out",
|
||||
"action_required",
|
||||
"stale",
|
||||
"startup_failure",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _as_dict(value: object) -> dict[str, object] | None:
|
||||
if isinstance(value, dict):
|
||||
# Keep only string keys; payloads can include non-str keys in edge cases.
|
||||
normalized: dict[str, object] = {}
|
||||
for k, v in value.items():
|
||||
if isinstance(k, str):
|
||||
normalized[k] = v
|
||||
return normalized
|
||||
return None
|
||||
|
||||
|
||||
def _str_or_none(value: object) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
return str(value)
|
||||
|
||||
|
||||
def _extract_github_conclusion(payload: dict[str, object], *, key: str) -> str | None:
|
||||
container = _as_dict(payload.get(key))
|
||||
if not container:
|
||||
return None
|
||||
return _str_or_none(container.get("conclusion"))
|
||||
|
||||
|
||||
def _extract_github_status(payload: dict[str, object], *, key: str) -> str | None:
|
||||
container = _as_dict(payload.get(key))
|
||||
if not container:
|
||||
return None
|
||||
return _str_or_none(container.get("status"))
|
||||
|
||||
|
||||
def _should_suppress_routine_delivery(
|
||||
*,
|
||||
payload_event: str | None,
|
||||
payload_value: object,
|
||||
) -> bool:
|
||||
"""Return True if this delivery is routine noise and should not notify leads.
|
||||
|
||||
This intentionally only targets high-volume GitHub CI telemetry events.
|
||||
We still persist the webhook payload + board memory entry for audit/debug.
|
||||
"""
|
||||
|
||||
if not settings.webhook_dispatch_suppress_routine_events:
|
||||
return False
|
||||
|
||||
if payload_event not in _ROUTINE_GITHUB_EVENTS:
|
||||
return False
|
||||
|
||||
payload = _as_dict(payload_value)
|
||||
if payload is None:
|
||||
return False
|
||||
|
||||
action = _str_or_none(payload.get("action"))
|
||||
# If GitHub hasn't marked it completed, it's almost always noise.
|
||||
if action and action != "completed":
|
||||
return True
|
||||
|
||||
if payload_event == "workflow_run":
|
||||
status = _extract_github_status(payload, key="workflow_run")
|
||||
if status and status != "completed":
|
||||
return True
|
||||
conclusion = _extract_github_conclusion(payload, key="workflow_run")
|
||||
elif payload_event == "check_run":
|
||||
status = _extract_github_status(payload, key="check_run")
|
||||
if status and status != "completed":
|
||||
return True
|
||||
conclusion = _extract_github_conclusion(payload, key="check_run")
|
||||
else: # check_suite
|
||||
status = _extract_github_status(payload, key="check_suite")
|
||||
if status and status != "completed":
|
||||
return True
|
||||
conclusion = _extract_github_conclusion(payload, key="check_suite")
|
||||
|
||||
if conclusion in _SUCCESS_GITHUB_CONCLUSIONS:
|
||||
return True
|
||||
|
||||
# Only page on explicitly non-success conclusions.
|
||||
return conclusion not in _ACTIONABLE_GITHUB_CONCLUSIONS
|
||||
|
||||
|
||||
def _build_payload_preview(payload_value: object) -> str:
|
||||
if isinstance(payload_value, str):
|
||||
return payload_value
|
||||
try:
|
||||
import json
|
||||
|
||||
return json.dumps(payload_value, indent=2, ensure_ascii=True)
|
||||
except TypeError:
|
||||
return str(payload_value)
|
||||
|
||||
|
||||
def _payload_preview(payload_value: object) -> str:
|
||||
preview = _build_payload_preview(payload_value)
|
||||
if len(preview) <= 1600:
|
||||
return preview
|
||||
return f"{preview[:1597]}..."
|
||||
|
||||
|
||||
def _webhook_message(
|
||||
*,
|
||||
board: Board,
|
||||
webhook: BoardWebhook,
|
||||
payload: BoardWebhookPayload,
|
||||
) -> str:
|
||||
preview = _payload_preview(payload.payload)
|
||||
return (
|
||||
"WEBHOOK EVENT RECEIVED\n"
|
||||
f"Board: {board.name}\n"
|
||||
f"Webhook ID: {webhook.id}\n"
|
||||
f"Payload ID: {payload.id}\n"
|
||||
f"Instruction: {webhook.description}\n\n"
|
||||
"Take action:\n"
|
||||
"1) Triage this payload against the webhook instruction.\n"
|
||||
"2) Create/update tasks as needed.\n"
|
||||
f"3) Reference payload ID {payload.id} in task descriptions.\n\n"
|
||||
"Payload preview:\n"
|
||||
f"{preview}\n\n"
|
||||
"To inspect board memory entries:\n"
|
||||
f"GET /api/v1/agent/boards/{board.id}/memory?is_chat=false"
|
||||
)
|
||||
|
||||
|
||||
async def _notify_lead(
|
||||
*,
|
||||
session: AsyncSession,
|
||||
board: Board,
|
||||
webhook: BoardWebhook,
|
||||
payload: BoardWebhookPayload,
|
||||
) -> None:
|
||||
lead = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(session)
|
||||
if lead is None or not lead.openclaw_session_id:
|
||||
return
|
||||
|
||||
dispatch = GatewayDispatchService(session)
|
||||
config = await dispatch.optional_gateway_config_for_board(board)
|
||||
if config is None:
|
||||
return
|
||||
|
||||
message = _webhook_message(board=board, webhook=webhook, payload=payload)
|
||||
await dispatch.try_send_agent_message(
|
||||
session_key=lead.openclaw_session_id,
|
||||
config=config,
|
||||
agent_name=lead.name,
|
||||
message=message,
|
||||
deliver=False,
|
||||
)
|
||||
|
||||
|
||||
async def _load_webhook_payload(
|
||||
*,
|
||||
session: AsyncSession,
|
||||
payload_id: UUID,
|
||||
webhook_id: UUID,
|
||||
board_id: UUID,
|
||||
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
|
||||
payload = await session.get(BoardWebhookPayload, payload_id)
|
||||
if payload is None:
|
||||
logger.warning(
|
||||
"webhook.queue.payload_missing",
|
||||
extra={
|
||||
"payload_id": str(payload_id),
|
||||
"webhook_id": str(webhook_id),
|
||||
"board_id": str(board_id),
|
||||
},
|
||||
)
|
||||
return None
|
||||
|
||||
if payload.board_id != board_id or payload.webhook_id != webhook_id:
|
||||
logger.warning(
|
||||
"webhook.queue.payload_mismatch",
|
||||
extra={
|
||||
"payload_id": str(payload_id),
|
||||
"payload_webhook_id": str(payload.webhook_id),
|
||||
"payload_board_id": str(payload.board_id),
|
||||
},
|
||||
)
|
||||
return None
|
||||
|
||||
board = await Board.objects.by_id(board_id).first(session)
|
||||
if board is None:
|
||||
logger.warning(
|
||||
"webhook.queue.board_missing",
|
||||
extra={"board_id": str(board_id), "payload_id": str(payload_id)},
|
||||
)
|
||||
return None
|
||||
|
||||
webhook = await session.get(BoardWebhook, webhook_id)
|
||||
if webhook is None:
|
||||
logger.warning(
|
||||
"webhook.queue.webhook_missing",
|
||||
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
|
||||
)
|
||||
return None
|
||||
|
||||
if webhook.board_id != board_id:
|
||||
logger.warning(
|
||||
"webhook.queue.webhook_board_mismatch",
|
||||
extra={
|
||||
"webhook_id": str(webhook_id),
|
||||
"payload_board_id": str(payload.board_id),
|
||||
"expected_board_id": str(board_id),
|
||||
},
|
||||
)
|
||||
return None
|
||||
|
||||
return board, webhook, payload
|
||||
|
||||
|
||||
async def _process_single_item(item: QueuedWebhookDelivery) -> None:
|
||||
async with async_session_maker() as session:
|
||||
loaded = await _load_webhook_payload(
|
||||
session=session,
|
||||
payload_id=item.payload_id,
|
||||
webhook_id=item.webhook_id,
|
||||
board_id=item.board_id,
|
||||
)
|
||||
if loaded is None:
|
||||
return
|
||||
|
||||
board, webhook, payload = loaded
|
||||
if _should_suppress_routine_delivery(
|
||||
payload_event=item.payload_event,
|
||||
payload_value=payload.payload,
|
||||
):
|
||||
logger.info(
|
||||
"webhook.dispatch.suppressed_routine",
|
||||
extra={
|
||||
"payload_id": str(item.payload_id),
|
||||
"webhook_id": str(item.webhook_id),
|
||||
"board_id": str(item.board_id),
|
||||
"event": item.payload_event,
|
||||
"attempt": item.attempts,
|
||||
},
|
||||
)
|
||||
return
|
||||
|
||||
await _notify_lead(session=session, board=board, webhook=webhook, payload=payload)
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def flush_webhook_delivery_queue() -> None:
|
||||
"""Consume queued webhook events and notify board leads in a throttled batch."""
|
||||
processed = 0
|
||||
while True:
|
||||
try:
|
||||
item = dequeue_webhook_delivery()
|
||||
except Exception:
|
||||
logger.exception("webhook.dispatch.dequeue_failed")
|
||||
continue
|
||||
|
||||
if item is None:
|
||||
break
|
||||
|
||||
try:
|
||||
await _process_single_item(item)
|
||||
processed += 1
|
||||
logger.info(
|
||||
"webhook.dispatch.success",
|
||||
extra={
|
||||
"payload_id": str(item.payload_id),
|
||||
"webhook_id": str(item.webhook_id),
|
||||
"board_id": str(item.board_id),
|
||||
"attempt": item.attempts,
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"webhook.dispatch.failed",
|
||||
extra={
|
||||
"payload_id": str(item.payload_id),
|
||||
"webhook_id": str(item.webhook_id),
|
||||
"board_id": str(item.board_id),
|
||||
"attempt": item.attempts,
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
requeue_if_failed(item)
|
||||
time.sleep(settings.webhook_dispatch_throttle_seconds)
|
||||
logger.info("webhook.dispatch.batch_complete", extra={"count": processed})
|
||||
|
||||
|
||||
def run_flush_webhook_delivery_queue() -> None:
|
||||
"""RQ entrypoint for running the async queue flush from worker jobs."""
|
||||
logger.info(
|
||||
"webhook.dispatch.batch_started",
|
||||
extra={"throttle_seconds": settings.webhook_dispatch_throttle_seconds},
|
||||
)
|
||||
start = time.time()
|
||||
asyncio.run(flush_webhook_delivery_queue())
|
||||
elapsed_ms = int((time.time() - start) * 1000)
|
||||
logger.info("webhook.dispatch.batch_finished", extra={"duration_ms": elapsed_ms})
|
||||
136
backend/app/services/webhooks/queue.py
Normal file
136
backend/app/services/webhooks/queue.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Webhook queue persistence and delivery helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from typing import cast
|
||||
|
||||
import redis
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.logging import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class QueuedWebhookDelivery:
|
||||
"""Payload metadata stored for deferred webhook lead dispatch."""
|
||||
|
||||
board_id: UUID
|
||||
webhook_id: UUID
|
||||
payload_id: UUID
|
||||
payload_event: str | None
|
||||
received_at: datetime
|
||||
attempts: int = 0
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(
|
||||
{
|
||||
"board_id": str(self.board_id),
|
||||
"webhook_id": str(self.webhook_id),
|
||||
"payload_id": str(self.payload_id),
|
||||
"payload_event": self.payload_event,
|
||||
"received_at": self.received_at.isoformat(),
|
||||
"attempts": self.attempts,
|
||||
},
|
||||
sort_keys=True,
|
||||
)
|
||||
|
||||
|
||||
def _redis_client() -> redis.Redis:
|
||||
return redis.Redis.from_url(settings.webhook_redis_url)
|
||||
|
||||
|
||||
def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
|
||||
"""Persist webhook metadata in a Redis queue for batch dispatch."""
|
||||
try:
|
||||
client = _redis_client()
|
||||
client.lpush(settings.webhook_queue_name, payload.to_json())
|
||||
logger.info(
|
||||
"webhook.queue.enqueued",
|
||||
extra={
|
||||
"board_id": str(payload.board_id),
|
||||
"webhook_id": str(payload.webhook_id),
|
||||
"payload_id": str(payload.payload_id),
|
||||
"attempt": payload.attempts,
|
||||
},
|
||||
)
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"webhook.queue.enqueue_failed",
|
||||
extra={
|
||||
"board_id": str(payload.board_id),
|
||||
"webhook_id": str(payload.webhook_id),
|
||||
"payload_id": str(payload.payload_id),
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
|
||||
"""Pop one queued webhook delivery payload."""
|
||||
client = _redis_client()
|
||||
raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
|
||||
if raw is None:
|
||||
return None
|
||||
if isinstance(raw, bytes):
|
||||
raw = raw.decode("utf-8")
|
||||
try:
|
||||
payload: dict[str, Any] = json.loads(raw)
|
||||
event = payload.get("payload_event")
|
||||
if event is not None:
|
||||
event = str(event)
|
||||
return QueuedWebhookDelivery(
|
||||
board_id=UUID(payload["board_id"]),
|
||||
webhook_id=UUID(payload["webhook_id"]),
|
||||
payload_id=UUID(payload["payload_id"]),
|
||||
payload_event=event,
|
||||
received_at=datetime.fromisoformat(payload["received_at"]),
|
||||
attempts=int(payload.get("attempts", 0)),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"webhook.queue.dequeue_failed",
|
||||
extra={"raw_payload": str(raw), "error": str(exc)},
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
def _requeue_with_attempt(payload: QueuedWebhookDelivery) -> None:
|
||||
payload = QueuedWebhookDelivery(
|
||||
board_id=payload.board_id,
|
||||
webhook_id=payload.webhook_id,
|
||||
payload_id=payload.payload_id,
|
||||
payload_event=payload.payload_event,
|
||||
received_at=payload.received_at,
|
||||
attempts=payload.attempts + 1,
|
||||
)
|
||||
enqueue_webhook_delivery(payload)
|
||||
|
||||
|
||||
def requeue_if_failed(payload: QueuedWebhookDelivery) -> bool:
|
||||
"""Requeue payload delivery with capped retries.
|
||||
|
||||
Returns True if requeued.
|
||||
"""
|
||||
if payload.attempts >= settings.webhook_dispatch_max_retries:
|
||||
logger.warning(
|
||||
"webhook.queue.drop_failed_delivery",
|
||||
extra={
|
||||
"board_id": str(payload.board_id),
|
||||
"webhook_id": str(payload.webhook_id),
|
||||
"payload_id": str(payload.payload_id),
|
||||
"attempts": payload.attempts,
|
||||
},
|
||||
)
|
||||
return False
|
||||
_requeue_with_attempt(payload)
|
||||
return True
|
||||
36
backend/app/services/webhooks/scheduler.py
Normal file
36
backend/app/services/webhooks/scheduler.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""Webhook dispatch scheduler bootstrap for rq-scheduler."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from redis import Redis
|
||||
from rq_scheduler import Scheduler # type: ignore[import-untyped]
|
||||
|
||||
from app.core.config import settings
|
||||
from app.services.webhooks import dispatch
|
||||
|
||||
|
||||
def bootstrap_webhook_dispatch_schedule(interval_seconds: int | None = None) -> None:
|
||||
"""Register a recurring queue-flush job and keep it idempotent."""
|
||||
connection = Redis.from_url(settings.webhook_redis_url)
|
||||
scheduler = Scheduler(queue_name=settings.webhook_queue_name, connection=connection)
|
||||
|
||||
for job in scheduler.get_jobs():
|
||||
if job.id == settings.webhook_dispatch_schedule_id:
|
||||
scheduler.cancel(job)
|
||||
|
||||
effective_interval_seconds = (
|
||||
settings.webhook_dispatch_schedule_interval_seconds
|
||||
if interval_seconds is None
|
||||
else interval_seconds
|
||||
)
|
||||
|
||||
scheduler.schedule(
|
||||
datetime.now(tz=timezone.utc) + timedelta(seconds=5),
|
||||
func=dispatch.run_flush_webhook_delivery_queue,
|
||||
interval=effective_interval_seconds,
|
||||
repeat=None,
|
||||
id=settings.webhook_dispatch_schedule_id,
|
||||
queue_name=settings.webhook_queue_name,
|
||||
)
|
||||
@@ -25,6 +25,9 @@ dependencies = [
|
||||
"sse-starlette==3.2.0",
|
||||
"uvicorn[standard]==0.40.0",
|
||||
"websockets==16.0",
|
||||
"redis==6.3.0",
|
||||
"rq==2.6.0",
|
||||
"rq-scheduler==0.14.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
||||
@@ -23,6 +23,7 @@ from app.models.board_webhooks import BoardWebhook
|
||||
from app.models.boards import Board
|
||||
from app.models.gateways import Gateway
|
||||
from app.models.organizations import Organization
|
||||
from app.services.webhooks.queue import QueuedWebhookDelivery
|
||||
|
||||
|
||||
async def _make_engine() -> AsyncEngine:
|
||||
@@ -112,7 +113,7 @@ async def _seed_webhook(
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
async def test_ingest_board_webhook_stores_payload_and_enqueues_for_lead_dispatch(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
engine = await _make_engine()
|
||||
@@ -122,16 +123,23 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
expire_on_commit=False,
|
||||
)
|
||||
app = _build_test_app(session_maker)
|
||||
enqueued: list[dict[str, object]] = []
|
||||
sent_messages: list[dict[str, str]] = []
|
||||
|
||||
async with session_maker() as session:
|
||||
board, webhook = await _seed_webhook(session, enabled=True)
|
||||
|
||||
async def _fake_optional_gateway_config_for_board(
|
||||
self: board_webhooks.GatewayDispatchService,
|
||||
_board: Board,
|
||||
) -> object:
|
||||
return object()
|
||||
def _fake_enqueue(payload: QueuedWebhookDelivery) -> bool:
|
||||
enqueued.append(
|
||||
{
|
||||
"board_id": str(payload.board_id),
|
||||
"webhook_id": str(payload.webhook_id),
|
||||
"payload_id": str(payload.payload_id),
|
||||
"attempts": payload.attempts,
|
||||
"event": payload.payload_event,
|
||||
},
|
||||
)
|
||||
return True
|
||||
|
||||
async def _fake_try_send_agent_message(
|
||||
self: board_webhooks.GatewayDispatchService,
|
||||
@@ -145,7 +153,7 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
del self, config, deliver
|
||||
sent_messages.append(
|
||||
{
|
||||
"session_key": session_key,
|
||||
"session_id": session_key,
|
||||
"agent_name": agent_name,
|
||||
"message": message,
|
||||
},
|
||||
@@ -153,9 +161,9 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(
|
||||
board_webhooks.GatewayDispatchService,
|
||||
"optional_gateway_config_for_board",
|
||||
_fake_optional_gateway_config_for_board,
|
||||
board_webhooks,
|
||||
"enqueue_webhook_delivery",
|
||||
_fake_enqueue,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
board_webhooks.GatewayDispatchService,
|
||||
@@ -204,11 +212,12 @@ async def test_ingest_board_webhook_stores_payload_and_notifies_lead(
|
||||
assert f"payload:{payload_id}" in memory_items[0].tags
|
||||
assert f"Payload ID: {payload_id}" in memory_items[0].content
|
||||
|
||||
assert len(sent_messages) == 1
|
||||
assert sent_messages[0]["session_key"] == "lead:session:key"
|
||||
assert "WEBHOOK EVENT RECEIVED" in sent_messages[0]["message"]
|
||||
assert str(payload_id) in sent_messages[0]["message"]
|
||||
assert webhook.description in sent_messages[0]["message"]
|
||||
assert len(enqueued) == 1
|
||||
assert enqueued[0]["board_id"] == str(board.id)
|
||||
assert enqueued[0]["webhook_id"] == str(webhook.id)
|
||||
assert enqueued[0]["payload_id"] == str(payload_id)
|
||||
|
||||
assert len(sent_messages) == 0
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
201
backend/tests/test_webhook_dispatch.py
Normal file
201
backend/tests/test_webhook_dispatch.py
Normal file
@@ -0,0 +1,201 @@
|
||||
# ruff: noqa: INP001
|
||||
"""Webhook queue and dispatch worker tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.webhooks import dispatch
|
||||
from app.services.webhooks.queue import (
|
||||
QueuedWebhookDelivery,
|
||||
dequeue_webhook_delivery,
|
||||
enqueue_webhook_delivery,
|
||||
requeue_if_failed,
|
||||
)
|
||||
|
||||
|
||||
class _FakeRedis:
|
||||
def __init__(self) -> None:
|
||||
self.values: list[str] = []
|
||||
|
||||
def lpush(self, key: str, value: str) -> None:
|
||||
self.values.insert(0, value)
|
||||
|
||||
def rpop(self, key: str) -> str | None:
|
||||
if not self.values:
|
||||
return None
|
||||
return self.values.pop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("attempts", [0, 1, 2])
|
||||
def test_webhook_queue_roundtrip(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None:
|
||||
fake = _FakeRedis()
|
||||
|
||||
def _fake_redis() -> _FakeRedis:
|
||||
return fake
|
||||
|
||||
board_id = uuid4()
|
||||
webhook_id = uuid4()
|
||||
payload_id = uuid4()
|
||||
payload = QueuedWebhookDelivery(
|
||||
board_id=board_id,
|
||||
webhook_id=webhook_id,
|
||||
payload_id=payload_id,
|
||||
payload_event="push",
|
||||
received_at=datetime.now(UTC),
|
||||
attempts=attempts,
|
||||
)
|
||||
|
||||
monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis)
|
||||
assert enqueue_webhook_delivery(payload)
|
||||
|
||||
dequeued = dequeue_webhook_delivery()
|
||||
assert dequeued is not None
|
||||
assert dequeued.board_id == board_id
|
||||
assert dequeued.webhook_id == webhook_id
|
||||
assert dequeued.payload_id == payload_id
|
||||
assert dequeued.payload_event == "push"
|
||||
assert dequeued.attempts == attempts
|
||||
|
||||
|
||||
@pytest.mark.parametrize("attempts", [0, 1, 2, 3])
|
||||
def test_requeue_respects_retry_cap(monkeypatch: pytest.MonkeyPatch, attempts: int) -> None:
|
||||
fake = _FakeRedis()
|
||||
|
||||
def _fake_redis() -> _FakeRedis:
|
||||
return fake
|
||||
|
||||
monkeypatch.setattr("app.services.webhooks.queue._redis_client", _fake_redis)
|
||||
|
||||
payload = QueuedWebhookDelivery(
|
||||
board_id=uuid4(),
|
||||
webhook_id=uuid4(),
|
||||
payload_id=uuid4(),
|
||||
payload_event="push",
|
||||
received_at=datetime.now(UTC),
|
||||
attempts=attempts,
|
||||
)
|
||||
|
||||
if attempts >= 3:
|
||||
assert requeue_if_failed(payload) is False
|
||||
assert fake.values == []
|
||||
else:
|
||||
assert requeue_if_failed(payload) is True
|
||||
requeued = dequeue_webhook_delivery()
|
||||
assert requeued is not None
|
||||
assert requeued.attempts == attempts + 1
|
||||
|
||||
|
||||
class _FakeQueuedItem:
|
||||
def __init__(self, attempts: int = 0) -> None:
|
||||
self.payload_id = uuid4()
|
||||
self.webhook_id = uuid4()
|
||||
self.board_id = uuid4()
|
||||
self.attempts = attempts
|
||||
|
||||
|
||||
def _patch_dequeue(monkeypatch: pytest.MonkeyPatch, items: list[QueuedWebhookDelivery | None]) -> None:
|
||||
def _dequeue() -> QueuedWebhookDelivery | None:
|
||||
if not items:
|
||||
return None
|
||||
return items.pop(0)
|
||||
|
||||
monkeypatch.setattr(dispatch, "dequeue_webhook_delivery", _dequeue)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dispatch_flush_processes_items_and_throttles(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
items: list[QueuedWebhookDelivery | None] = [
|
||||
_FakeQueuedItem(),
|
||||
_FakeQueuedItem(),
|
||||
None,
|
||||
]
|
||||
_patch_dequeue(monkeypatch, items)
|
||||
|
||||
processed: list[UUID] = []
|
||||
throttles: list[float] = []
|
||||
|
||||
async def _process(item: QueuedWebhookDelivery) -> None:
|
||||
processed.append(item.payload_id)
|
||||
|
||||
monkeypatch.setattr(dispatch, "_process_single_item", _process)
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
|
||||
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: throttles.append(seconds))
|
||||
|
||||
await dispatch.flush_webhook_delivery_queue()
|
||||
|
||||
assert len(processed) == 2
|
||||
assert throttles == [0.0, 0.0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dispatch_flush_requeues_on_process_error(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
item = _FakeQueuedItem()
|
||||
_patch_dequeue(monkeypatch, [item, None])
|
||||
|
||||
async def _process(_: QueuedWebhookDelivery) -> None:
|
||||
raise RuntimeError("boom")
|
||||
|
||||
requeued: list[QueuedWebhookDelivery] = []
|
||||
|
||||
def _requeue(payload: QueuedWebhookDelivery) -> bool:
|
||||
requeued.append(payload)
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(dispatch, "_process_single_item", _process)
|
||||
monkeypatch.setattr(dispatch, "requeue_if_failed", _requeue)
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
|
||||
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
|
||||
|
||||
await dispatch.flush_webhook_delivery_queue()
|
||||
|
||||
assert len(requeued) == 1
|
||||
assert requeued[0].payload_id == item.payload_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dispatch_flush_recovers_from_dequeue_error(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
item = _FakeQueuedItem()
|
||||
call_count = 0
|
||||
|
||||
def _dequeue() -> QueuedWebhookDelivery | None:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
raise RuntimeError("dequeue broken")
|
||||
if call_count == 2:
|
||||
return item
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(dispatch, "dequeue_webhook_delivery", _dequeue)
|
||||
|
||||
processed = 0
|
||||
|
||||
async def _process(_: QueuedWebhookDelivery) -> None:
|
||||
nonlocal processed
|
||||
processed += 1
|
||||
|
||||
monkeypatch.setattr(dispatch, "_process_single_item", _process)
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_throttle_seconds", 0)
|
||||
monkeypatch.setattr(dispatch.time, "sleep", lambda seconds: None)
|
||||
|
||||
await dispatch.flush_webhook_delivery_queue()
|
||||
|
||||
assert call_count == 3
|
||||
assert processed == 1
|
||||
|
||||
|
||||
def test_dispatch_run_entrypoint_calls_async_flush(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
called: list[bool] = []
|
||||
|
||||
async def _flush() -> None:
|
||||
called.append(True)
|
||||
|
||||
monkeypatch.setattr(dispatch, "flush_webhook_delivery_queue", _flush)
|
||||
|
||||
dispatch.run_flush_webhook_delivery_queue()
|
||||
|
||||
assert called == [True]
|
||||
71
backend/tests/test_webhook_routine_suppression.py
Normal file
71
backend/tests/test_webhook_routine_suppression.py
Normal file
@@ -0,0 +1,71 @@
|
||||
# ruff: noqa: INP001
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.webhooks import dispatch
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("payload_event", "payload_value", "expected"),
|
||||
[
|
||||
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "success"}}, True),
|
||||
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": None}}, True),
|
||||
("check_run", {"action": "created", "check_run": {"status": "queued"}}, True),
|
||||
("check_run", {"action": "completed", "check_run": {"status": "completed", "conclusion": "failure"}}, False),
|
||||
(
|
||||
"workflow_run",
|
||||
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "success"}},
|
||||
True,
|
||||
),
|
||||
(
|
||||
"workflow_run",
|
||||
{"action": "completed", "workflow_run": {"status": "completed", "conclusion": "cancelled"}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
"check_suite",
|
||||
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "timed_out"}},
|
||||
False,
|
||||
),
|
||||
(
|
||||
"check_suite",
|
||||
{"action": "completed", "check_suite": {"status": "completed", "conclusion": "neutral"}},
|
||||
True,
|
||||
),
|
||||
# Non-target events should not be suppressed by this helper.
|
||||
("pull_request", {"action": "opened"}, False),
|
||||
(None, {"action": "opened"}, False),
|
||||
# Non-dict payloads: don't suppress (we can't reason about it).
|
||||
("check_run", "raw", False),
|
||||
],
|
||||
)
|
||||
def test_should_suppress_routine_delivery(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
payload_event: str | None,
|
||||
payload_value: object,
|
||||
expected: bool,
|
||||
) -> None:
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", True)
|
||||
assert (
|
||||
dispatch._should_suppress_routine_delivery(
|
||||
payload_event=payload_event,
|
||||
payload_value=payload_value,
|
||||
)
|
||||
is expected
|
||||
)
|
||||
|
||||
|
||||
def test_suppression_disabled_via_settings(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
monkeypatch.setattr(dispatch.settings, "webhook_dispatch_suppress_routine_events", False)
|
||||
assert (
|
||||
dispatch._should_suppress_routine_delivery(
|
||||
payload_event="check_run",
|
||||
payload_value={
|
||||
"action": "completed",
|
||||
"check_run": {"status": "completed", "conclusion": "success"},
|
||||
},
|
||||
)
|
||||
is False
|
||||
)
|
||||
105
backend/uv.lock
generated
105
backend/uv.lock
generated
@@ -279,6 +279,25 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/0d/4a/331fe2caf6799d591109bb9c08083080f6de90a823695d412a935622abb2/coverage-7.13.4-py3-none-any.whl", hash = "sha256:1af1641e57cf7ba1bd67d677c9abdbcd6cc2ab7da3bca7fa1e2b7e50e65f2ad0", size = 211242, upload-time = "2026-02-09T12:59:02.032Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "croniter"
|
||||
version = "6.0.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "python-dateutil" },
|
||||
{ name = "pytz" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/ad/2f/44d1ae153a0e27be56be43465e5cb39b9650c781e001e7864389deb25090/croniter-6.0.0.tar.gz", hash = "sha256:37c504b313956114a983ece2c2b07790b1f1094fe9d81cc94739214748255577", size = 64481, upload-time = "2024-12-17T17:17:47.32Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/07/4b/290b4c3efd6417a8b0c284896de19b1d5855e6dbdb97d2a35e68fa42de85/croniter-6.0.0-py2.py3-none-any.whl", hash = "sha256:2f878c3856f17896979b2a4379ba1f09c83e374931ea15cc835c5dd2eee9b368", size = 25468, upload-time = "2024-12-17T17:17:45.359Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crontab"
|
||||
version = "1.0.5"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/d6/36/a255b6f5a2e22df03fd2b2f3088974b44b8c9e9407e26b44742cb7cfbf5b/crontab-1.0.5.tar.gz", hash = "sha256:f80e01b4f07219763a9869f926dd17147278e7965a928089bca6d3dc80ae46d5", size = 21963, upload-time = "2025-07-09T17:09:38.264Z" }
|
||||
|
||||
[[package]]
|
||||
name = "cryptography"
|
||||
version = "45.0.7"
|
||||
@@ -358,6 +377,18 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/9f/56/13ab06b4f93ca7cac71078fbe37fcea175d3216f31f85c3168a6bbd0bb9a/flake8-7.3.0-py2.py3-none-any.whl", hash = "sha256:b9696257b9ce8beb888cdbe31cf885c90d31928fe202be0889a7cdafad32f01e", size = 57922, upload-time = "2025-06-20T19:31:34.425Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "freezegun"
|
||||
version = "1.5.5"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "python-dateutil" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/95/dd/23e2f4e357f8fd3bdff613c1fe4466d21bfb00a6177f238079b17f7b1c84/freezegun-1.5.5.tar.gz", hash = "sha256:ac7742a6cc6c25a2c35e9292dfd554b897b517d2dec26891a2e8debf205cb94a", size = 35914, upload-time = "2025-08-09T10:39:08.338Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/5e/2e/b41d8a1a917d6581fc27a35d05561037b048e47df50f27f8ac9c7e27a710/freezegun-1.5.5-py3-none-any.whl", hash = "sha256:cd557f4a75cf074e84bc374249b9dd491eaeacd61376b9eb3c423282211619d2", size = 19266, upload-time = "2025-08-09T10:39:06.636Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "greenlet"
|
||||
version = "3.3.1"
|
||||
@@ -697,6 +728,9 @@ dependencies = [
|
||||
{ name = "psycopg", extra = ["binary"] },
|
||||
{ name = "pydantic-settings" },
|
||||
{ name = "python-dotenv" },
|
||||
{ name = "redis" },
|
||||
{ name = "rq" },
|
||||
{ name = "rq-scheduler" },
|
||||
{ name = "sqlalchemy", extra = ["asyncio"] },
|
||||
{ name = "sqlmodel" },
|
||||
{ name = "sse-starlette" },
|
||||
@@ -739,6 +773,9 @@ requires-dist = [
|
||||
{ name = "pytest-asyncio", marker = "extra == 'dev'", specifier = "==1.3.0" },
|
||||
{ name = "pytest-cov", marker = "extra == 'dev'", specifier = "==7.0.0" },
|
||||
{ name = "python-dotenv", specifier = "==1.2.1" },
|
||||
{ name = "redis", specifier = "==6.3.0" },
|
||||
{ name = "rq", specifier = "==2.6.0" },
|
||||
{ name = "rq-scheduler", specifier = "==0.14.0" },
|
||||
{ name = "ruff", marker = "extra == 'dev'", specifier = "==0.15.0" },
|
||||
{ name = "sqlalchemy", extras = ["asyncio"], specifier = "==2.0.46" },
|
||||
{ name = "sqlmodel", specifier = "==0.0.32" },
|
||||
@@ -1030,6 +1067,18 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "python-dateutil"
|
||||
version = "2.9.0.post0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "six" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432, upload-time = "2024-03-01T18:36:20.211Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload-time = "2024-03-01T18:36:18.57Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "python-dotenv"
|
||||
version = "1.2.1"
|
||||
@@ -1068,6 +1117,15 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c6/78/397db326746f0a342855b81216ae1f0a32965deccfd7c830a2dbc66d2483/pytokens-0.4.1-py3-none-any.whl", hash = "sha256:26cef14744a8385f35d0e095dc8b3a7583f6c953c2e3d269c7f82484bf5ad2de", size = 13729, upload-time = "2026-01-30T01:03:45.029Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytz"
|
||||
version = "2025.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/f8/bf/abbd3cdfb8fbc7fb3d4d38d320f2441b1e7cbe29be4f23797b4a2b5d8aac/pytz-2025.2.tar.gz", hash = "sha256:360b9e3dbb49a209c21ad61809c7fb453643e048b38924c765813546746e81c3", size = 320884, upload-time = "2025-03-25T02:25:00.538Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225, upload-time = "2025-03-25T02:24:58.468Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyyaml"
|
||||
version = "6.0.3"
|
||||
@@ -1114,6 +1172,44 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/f1/12/de94a39c2ef588c7e6455cfbe7343d3b2dc9d6b6b2f40c4c6565744c873d/pyyaml-6.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:ebc55a14a21cb14062aa4162f906cd962b28e2e9ea38f9b4391244cd8de4ae0b", size = 149341, upload-time = "2025-09-25T21:32:56.828Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis"
|
||||
version = "6.3.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/21/cd/030274634a1a052b708756016283ea3d84e91ae45f74d7f5dcf55d753a0f/redis-6.3.0.tar.gz", hash = "sha256:3000dbe532babfb0999cdab7b3e5744bcb23e51923febcfaeb52c8cfb29632ef", size = 4647275, upload-time = "2025-08-05T08:12:31.648Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/df/a7/2fe45801534a187543fc45d28b3844d84559c1589255bc2ece30d92dc205/redis-6.3.0-py3-none-any.whl", hash = "sha256:92f079d656ded871535e099080f70fab8e75273c0236797126ac60242d638e9b", size = 280018, upload-time = "2025-08-05T08:12:30.093Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rq"
|
||||
version = "2.6.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "click" },
|
||||
{ name = "croniter" },
|
||||
{ name = "redis" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/8e/f5/46e39abc46ff6ff4f3151ee4fd2c1bf7601a8d26bd30fd951c5496b1e6c6/rq-2.6.0.tar.gz", hash = "sha256:92ad55676cda14512c4eea5782f398a102dc3af108bea197c868c4c50c5d3e81", size = 675315, upload-time = "2025-09-06T03:15:12.854Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/cc/66/6cf141584526e3ed5b57a194e09cbdf7058334bd3926bb3f96e2453cf053/rq-2.6.0-py3-none-any.whl", hash = "sha256:be5ccc0f0fc5f32da0999648340e31476368f08067f0c3fce6768d00064edbb5", size = 112533, upload-time = "2025-09-06T03:15:09.894Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rq-scheduler"
|
||||
version = "0.14.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "crontab" },
|
||||
{ name = "freezegun" },
|
||||
{ name = "python-dateutil" },
|
||||
{ name = "rq" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/a0/4e/977bbcc1f3b25ed9ea60ec968b13f7147661defe5b2f9272b44fdb1c5549/rq-scheduler-0.14.0.tar.gz", hash = "sha256:2d5a14a1ab217f8693184ebaa1fe03838edcbc70b4f76572721c0b33058cd023", size = 16582, upload-time = "2024-10-29T13:30:32.641Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/bb/d0/28cedca9f3b321f30e69d644c2dcd7097ec21570ec9606fde56750621300/rq_scheduler-0.14.0-py2.py3-none-any.whl", hash = "sha256:d4ec221a3d8c11b3ff55e041f09d9af1e17f3253db737b6b97e86ab20fc3dc0d", size = 13874, upload-time = "2024-10-29T13:30:30.449Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ruff"
|
||||
version = "0.15.0"
|
||||
@@ -1139,6 +1235,15 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/f6/b0/2d823f6e77ebe560f4e397d078487e8d52c1516b331e3521bc75db4272ca/ruff-0.15.0-py3-none-win_arm64.whl", hash = "sha256:c480d632cc0ca3f0727acac8b7d053542d9e114a462a145d0b00e7cd658c515a", size = 10865753, upload-time = "2026-02-03T17:53:03.014Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "six"
|
||||
version = "1.17.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031, upload-time = "2024-12-04T17:35:28.174Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sqlalchemy"
|
||||
version = "2.0.46"
|
||||
|
||||
49
compose.yml
49
compose.yml
@@ -17,6 +17,11 @@ services:
|
||||
timeout: 3s
|
||||
retries: 20
|
||||
|
||||
redis:
|
||||
image: redis:7-alpine
|
||||
ports:
|
||||
- "${REDIS_PORT:-6379}:6379"
|
||||
|
||||
backend:
|
||||
build:
|
||||
# Build from repo root so the backend image can include repo-level assets
|
||||
@@ -32,9 +37,12 @@ services:
|
||||
DB_AUTO_MIGRATE: ${DB_AUTO_MIGRATE:-true}
|
||||
AUTH_MODE: ${AUTH_MODE}
|
||||
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
|
||||
WEBHOOK_REDIS_URL: redis://redis:6379/0
|
||||
depends_on:
|
||||
db:
|
||||
condition: service_healthy
|
||||
redis:
|
||||
condition: service_started
|
||||
ports:
|
||||
- "${BACKEND_PORT:-8000}:8000"
|
||||
|
||||
@@ -58,5 +66,46 @@ services:
|
||||
ports:
|
||||
- "${FRONTEND_PORT:-3000}:3000"
|
||||
|
||||
webhook-worker:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: backend/Dockerfile
|
||||
command: ["rq", "worker", "webhook-dispatch", "-u", "redis://redis:6379/0"]
|
||||
env_file:
|
||||
- ./backend/.env.example
|
||||
depends_on:
|
||||
redis:
|
||||
condition: service_started
|
||||
db:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
DATABASE_URL: postgresql+psycopg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-mission_control}
|
||||
AUTH_MODE: ${AUTH_MODE}
|
||||
LOCAL_AUTH_TOKEN: ${LOCAL_AUTH_TOKEN}
|
||||
WEBHOOK_REDIS_URL: redis://redis:6379/0
|
||||
WEBHOOK_QUEUE_NAME: webhook-dispatch
|
||||
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
|
||||
restart: unless-stopped
|
||||
|
||||
webhook-dispatch-cron:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: backend/Dockerfile
|
||||
command:
|
||||
- sh
|
||||
- -c
|
||||
- |
|
||||
python -c "from app.services.webhooks.scheduler import bootstrap_webhook_dispatch_schedule; bootstrap_webhook_dispatch_schedule()" && \
|
||||
rqscheduler -u redis://redis:6379/0 -i 60
|
||||
depends_on:
|
||||
- redis
|
||||
- webhook-worker
|
||||
environment:
|
||||
WEBHOOK_REDIS_URL: redis://redis:6379/0
|
||||
WEBHOOK_QUEUE_NAME: webhook-dispatch
|
||||
WEBHOOK_DISPATCH_SCHEDULE_ID: webhook-dispatch-batch
|
||||
WEBHOOK_DISPATCH_SCHEDULE_INTERVAL_SECONDS: 900
|
||||
restart: unless-stopped
|
||||
|
||||
volumes:
|
||||
postgres_data:
|
||||
|
||||
Reference in New Issue
Block a user