fix: resolve mypy typing issues in webhook queue dispatch

This commit is contained in:
Abhimanyu Saharan
2026-02-14 06:33:35 +00:00
parent 8aed721ce0
commit 130f2b36f0
4 changed files with 36 additions and 24 deletions

View File

@@ -56,6 +56,7 @@ class Settings(BaseSettings):
# Webhook queueing / dispatch # Webhook queueing / dispatch
webhook_redis_url: str = "redis://localhost:6379/0" webhook_redis_url: str = "redis://localhost:6379/0"
webhook_queue_name: str = "webhook-dispatch" webhook_queue_name: str = "webhook-dispatch"
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
webhook_dispatch_throttle_seconds: float = 2.0 webhook_dispatch_throttle_seconds: float = 2.0
webhook_dispatch_max_retries: int = 3 webhook_dispatch_max_retries: int = 3

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
import asyncio import asyncio
import time import time
from sqlalchemy import col, select
from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel.ext.asyncio.session import AsyncSession
from uuid import UUID from uuid import UUID
@@ -17,7 +16,11 @@ from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board from app.models.boards import Board
from app.services.openclaw.gateway_dispatch import GatewayDispatchService from app.services.openclaw.gateway_dispatch import GatewayDispatchService
from app.services.webhooks.queue import QueuedWebhookDelivery, dequeue_webhook_delivery, requeue_if_failed from app.services.webhooks.queue import (
QueuedWebhookDelivery,
dequeue_webhook_delivery,
requeue_if_failed,
)
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -71,11 +74,7 @@ async def _notify_lead(
webhook: BoardWebhook, webhook: BoardWebhook,
payload: BoardWebhookPayload, payload: BoardWebhookPayload,
) -> None: ) -> None:
lead = ( lead = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(session)
await Agent.objects.filter_by(board_id=board.id)
.filter(col(Agent.is_board_lead).is_(True))
.first(session)
)
if lead is None or not lead.openclaw_session_id: if lead is None or not lead.openclaw_session_id:
return return
@@ -101,14 +100,7 @@ async def _load_webhook_payload(
webhook_id: UUID, webhook_id: UUID,
board_id: UUID, board_id: UUID,
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None: ) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
payload = ( payload = await session.get(BoardWebhookPayload, payload_id)
await session.exec(
select(BoardWebhookPayload)
.where(col(BoardWebhookPayload.id) == payload_id)
.where(col(BoardWebhookPayload.board_id) == board_id)
.where(col(BoardWebhookPayload.webhook_id) == webhook_id),
)
).first()
if payload is None: if payload is None:
logger.warning( logger.warning(
"webhook.queue.payload_missing", "webhook.queue.payload_missing",
@@ -120,6 +112,17 @@ async def _load_webhook_payload(
) )
return None return None
if payload.board_id != board_id or payload.webhook_id != webhook_id:
logger.warning(
"webhook.queue.payload_mismatch",
extra={
"payload_id": str(payload_id),
"payload_webhook_id": str(payload.webhook_id),
"payload_board_id": str(payload.board_id),
},
)
return None
board = await Board.objects.by_id(board_id).first(session) board = await Board.objects.by_id(board_id).first(session)
if board is None: if board is None:
logger.warning( logger.warning(
@@ -128,19 +131,25 @@ async def _load_webhook_payload(
) )
return None return None
webhook = ( webhook = await session.get(BoardWebhook, webhook_id)
await session.exec(
select(BoardWebhook)
.where(col(BoardWebhook.id) == webhook_id)
.where(col(BoardWebhook.board_id) == board_id),
)
).first()
if webhook is None: if webhook is None:
logger.warning( logger.warning(
"webhook.queue.webhook_missing", "webhook.queue.webhook_missing",
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)}, extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
) )
return None return None
if webhook.board_id != board_id:
logger.warning(
"webhook.queue.webhook_board_mismatch",
extra={
"webhook_id": str(webhook_id),
"payload_board_id": str(payload.board_id),
"expected_board_id": str(board_id),
},
)
return None
return board, webhook, payload return board, webhook, payload

View File

@@ -8,6 +8,8 @@ from datetime import datetime
from typing import Any from typing import Any
from uuid import UUID from uuid import UUID
from typing import cast
import redis import redis
from app.core.config import settings from app.core.config import settings
@@ -76,7 +78,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None: def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
"""Pop one queued webhook delivery payload.""" """Pop one queued webhook delivery payload."""
client = _redis_client() client = _redis_client()
raw = client.rpop(settings.webhook_queue_name) raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
if raw is None: if raw is None:
return None return None
if isinstance(raw, bytes): if isinstance(raw, bytes):

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from redis import Redis from redis import Redis
from rq_scheduler import Scheduler from rq_scheduler import Scheduler # type: ignore[import-untyped]
from app.core.config import settings from app.core.config import settings
from app.services.webhooks import dispatch from app.services.webhooks import dispatch