diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 1475bd6c..78af0ab7 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -56,6 +56,7 @@ class Settings(BaseSettings): # Webhook queueing / dispatch webhook_redis_url: str = "redis://localhost:6379/0" webhook_queue_name: str = "webhook-dispatch" + webhook_dispatch_schedule_id: str = "webhook-dispatch-batch" webhook_dispatch_throttle_seconds: float = 2.0 webhook_dispatch_max_retries: int = 3 diff --git a/backend/app/services/webhooks/dispatch.py b/backend/app/services/webhooks/dispatch.py index 7952e46c..9fe8632b 100644 --- a/backend/app/services/webhooks/dispatch.py +++ b/backend/app/services/webhooks/dispatch.py @@ -5,7 +5,6 @@ from __future__ import annotations import asyncio import time -from sqlalchemy import col, select from sqlmodel.ext.asyncio.session import AsyncSession from uuid import UUID @@ -17,7 +16,11 @@ from app.models.board_webhook_payloads import BoardWebhookPayload from app.models.board_webhooks import BoardWebhook from app.models.boards import Board from app.services.openclaw.gateway_dispatch import GatewayDispatchService -from app.services.webhooks.queue import QueuedWebhookDelivery, dequeue_webhook_delivery, requeue_if_failed +from app.services.webhooks.queue import ( + QueuedWebhookDelivery, + dequeue_webhook_delivery, + requeue_if_failed, +) logger = get_logger(__name__) @@ -71,11 +74,7 @@ async def _notify_lead( webhook: BoardWebhook, payload: BoardWebhookPayload, ) -> None: - lead = ( - await Agent.objects.filter_by(board_id=board.id) - .filter(col(Agent.is_board_lead).is_(True)) - .first(session) - ) + lead = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(session) if lead is None or not lead.openclaw_session_id: return @@ -101,14 +100,7 @@ async def _load_webhook_payload( webhook_id: UUID, board_id: UUID, ) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None: - payload = ( - await session.exec( - select(BoardWebhookPayload) - .where(col(BoardWebhookPayload.id) == payload_id) - .where(col(BoardWebhookPayload.board_id) == board_id) - .where(col(BoardWebhookPayload.webhook_id) == webhook_id), - ) - ).first() + payload = await session.get(BoardWebhookPayload, payload_id) if payload is None: logger.warning( "webhook.queue.payload_missing", @@ -120,6 +112,17 @@ async def _load_webhook_payload( ) return None + if payload.board_id != board_id or payload.webhook_id != webhook_id: + logger.warning( + "webhook.queue.payload_mismatch", + extra={ + "payload_id": str(payload_id), + "payload_webhook_id": str(payload.webhook_id), + "payload_board_id": str(payload.board_id), + }, + ) + return None + board = await Board.objects.by_id(board_id).first(session) if board is None: logger.warning( @@ -128,19 +131,25 @@ async def _load_webhook_payload( ) return None - webhook = ( - await session.exec( - select(BoardWebhook) - .where(col(BoardWebhook.id) == webhook_id) - .where(col(BoardWebhook.board_id) == board_id), - ) - ).first() + webhook = await session.get(BoardWebhook, webhook_id) if webhook is None: logger.warning( "webhook.queue.webhook_missing", extra={"webhook_id": str(webhook_id), "board_id": str(board_id)}, ) return None + + if webhook.board_id != board_id: + logger.warning( + "webhook.queue.webhook_board_mismatch", + extra={ + "webhook_id": str(webhook_id), + "payload_board_id": str(payload.board_id), + "expected_board_id": str(board_id), + }, + ) + return None + return board, webhook, payload diff --git a/backend/app/services/webhooks/queue.py b/backend/app/services/webhooks/queue.py index 41237ec1..b2d37797 100644 --- a/backend/app/services/webhooks/queue.py +++ b/backend/app/services/webhooks/queue.py @@ -8,6 +8,8 @@ from datetime import datetime from typing import Any from uuid import UUID +from typing import cast + import redis from app.core.config import settings @@ -76,7 +78,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool: def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None: """Pop one queued webhook delivery payload.""" client = _redis_client() - raw = client.rpop(settings.webhook_queue_name) + raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name)) if raw is None: return None if isinstance(raw, bytes): diff --git a/backend/app/services/webhooks/scheduler.py b/backend/app/services/webhooks/scheduler.py index 6c452f5f..fd1c4aec 100644 --- a/backend/app/services/webhooks/scheduler.py +++ b/backend/app/services/webhooks/scheduler.py @@ -5,7 +5,7 @@ from __future__ import annotations from datetime import datetime, timedelta, timezone from redis import Redis -from rq_scheduler import Scheduler +from rq_scheduler import Scheduler # type: ignore[import-untyped] from app.core.config import settings from app.services.webhooks import dispatch