fix: resolve mypy typing issues in webhook queue dispatch
This commit is contained in:
@@ -56,6 +56,7 @@ class Settings(BaseSettings):
|
|||||||
# Webhook queueing / dispatch
|
# Webhook queueing / dispatch
|
||||||
webhook_redis_url: str = "redis://localhost:6379/0"
|
webhook_redis_url: str = "redis://localhost:6379/0"
|
||||||
webhook_queue_name: str = "webhook-dispatch"
|
webhook_queue_name: str = "webhook-dispatch"
|
||||||
|
webhook_dispatch_schedule_id: str = "webhook-dispatch-batch"
|
||||||
webhook_dispatch_throttle_seconds: float = 2.0
|
webhook_dispatch_throttle_seconds: float = 2.0
|
||||||
webhook_dispatch_max_retries: int = 3
|
webhook_dispatch_max_retries: int = 3
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from sqlalchemy import col, select
|
|
||||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
@@ -17,7 +16,11 @@ from app.models.board_webhook_payloads import BoardWebhookPayload
|
|||||||
from app.models.board_webhooks import BoardWebhook
|
from app.models.board_webhooks import BoardWebhook
|
||||||
from app.models.boards import Board
|
from app.models.boards import Board
|
||||||
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
|
from app.services.openclaw.gateway_dispatch import GatewayDispatchService
|
||||||
from app.services.webhooks.queue import QueuedWebhookDelivery, dequeue_webhook_delivery, requeue_if_failed
|
from app.services.webhooks.queue import (
|
||||||
|
QueuedWebhookDelivery,
|
||||||
|
dequeue_webhook_delivery,
|
||||||
|
requeue_if_failed,
|
||||||
|
)
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
@@ -71,11 +74,7 @@ async def _notify_lead(
|
|||||||
webhook: BoardWebhook,
|
webhook: BoardWebhook,
|
||||||
payload: BoardWebhookPayload,
|
payload: BoardWebhookPayload,
|
||||||
) -> None:
|
) -> None:
|
||||||
lead = (
|
lead = await Agent.objects.filter_by(board_id=board.id, is_board_lead=True).first(session)
|
||||||
await Agent.objects.filter_by(board_id=board.id)
|
|
||||||
.filter(col(Agent.is_board_lead).is_(True))
|
|
||||||
.first(session)
|
|
||||||
)
|
|
||||||
if lead is None or not lead.openclaw_session_id:
|
if lead is None or not lead.openclaw_session_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -101,14 +100,7 @@ async def _load_webhook_payload(
|
|||||||
webhook_id: UUID,
|
webhook_id: UUID,
|
||||||
board_id: UUID,
|
board_id: UUID,
|
||||||
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
|
) -> tuple[Board, BoardWebhook, BoardWebhookPayload] | None:
|
||||||
payload = (
|
payload = await session.get(BoardWebhookPayload, payload_id)
|
||||||
await session.exec(
|
|
||||||
select(BoardWebhookPayload)
|
|
||||||
.where(col(BoardWebhookPayload.id) == payload_id)
|
|
||||||
.where(col(BoardWebhookPayload.board_id) == board_id)
|
|
||||||
.where(col(BoardWebhookPayload.webhook_id) == webhook_id),
|
|
||||||
)
|
|
||||||
).first()
|
|
||||||
if payload is None:
|
if payload is None:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"webhook.queue.payload_missing",
|
"webhook.queue.payload_missing",
|
||||||
@@ -120,6 +112,17 @@ async def _load_webhook_payload(
|
|||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
if payload.board_id != board_id or payload.webhook_id != webhook_id:
|
||||||
|
logger.warning(
|
||||||
|
"webhook.queue.payload_mismatch",
|
||||||
|
extra={
|
||||||
|
"payload_id": str(payload_id),
|
||||||
|
"payload_webhook_id": str(payload.webhook_id),
|
||||||
|
"payload_board_id": str(payload.board_id),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
board = await Board.objects.by_id(board_id).first(session)
|
board = await Board.objects.by_id(board_id).first(session)
|
||||||
if board is None:
|
if board is None:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
@@ -128,19 +131,25 @@ async def _load_webhook_payload(
|
|||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
webhook = (
|
webhook = await session.get(BoardWebhook, webhook_id)
|
||||||
await session.exec(
|
|
||||||
select(BoardWebhook)
|
|
||||||
.where(col(BoardWebhook.id) == webhook_id)
|
|
||||||
.where(col(BoardWebhook.board_id) == board_id),
|
|
||||||
)
|
|
||||||
).first()
|
|
||||||
if webhook is None:
|
if webhook is None:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"webhook.queue.webhook_missing",
|
"webhook.queue.webhook_missing",
|
||||||
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
|
extra={"webhook_id": str(webhook_id), "board_id": str(board_id)},
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
if webhook.board_id != board_id:
|
||||||
|
logger.warning(
|
||||||
|
"webhook.queue.webhook_board_mismatch",
|
||||||
|
extra={
|
||||||
|
"webhook_id": str(webhook_id),
|
||||||
|
"payload_board_id": str(payload.board_id),
|
||||||
|
"expected_board_id": str(board_id),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
return board, webhook, payload
|
return board, webhook, payload
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ from datetime import datetime
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
|
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
import redis
|
import redis
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
@@ -76,7 +78,7 @@ def enqueue_webhook_delivery(payload: QueuedWebhookDelivery) -> bool:
|
|||||||
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
|
def dequeue_webhook_delivery() -> QueuedWebhookDelivery | None:
|
||||||
"""Pop one queued webhook delivery payload."""
|
"""Pop one queued webhook delivery payload."""
|
||||||
client = _redis_client()
|
client = _redis_client()
|
||||||
raw = client.rpop(settings.webhook_queue_name)
|
raw = cast(str | bytes | None, client.rpop(settings.webhook_queue_name))
|
||||||
if raw is None:
|
if raw is None:
|
||||||
return None
|
return None
|
||||||
if isinstance(raw, bytes):
|
if isinstance(raw, bytes):
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ from __future__ import annotations
|
|||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
from redis import Redis
|
from redis import Redis
|
||||||
from rq_scheduler import Scheduler
|
from rq_scheduler import Scheduler # type: ignore[import-untyped]
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.services.webhooks import dispatch
|
from app.services.webhooks import dispatch
|
||||||
|
|||||||
Reference in New Issue
Block a user