perf: bound concurrency for approval check reconciliation
This commit is contained in:
@@ -25,6 +25,8 @@ GH_TOKEN=
|
|||||||
# Periodic reconciliation safety net (rq-scheduler)
|
# Periodic reconciliation safety net (rq-scheduler)
|
||||||
GITHUB_APPROVAL_CHECK_SCHEDULE_ID=mission-control-approval-check-reconcile
|
GITHUB_APPROVAL_CHECK_SCHEDULE_ID=mission-control-approval-check-reconcile
|
||||||
GITHUB_APPROVAL_CHECK_SCHEDULE_INTERVAL_SECONDS=900
|
GITHUB_APPROVAL_CHECK_SCHEDULE_INTERVAL_SECONDS=900
|
||||||
|
GITHUB_APPROVAL_CHECK_RECONCILE_CONCURRENCY=3
|
||||||
|
GITHUB_APPROVAL_CHECK_RECONCILE_MAX_PR_URLS=500
|
||||||
|
|
||||||
# Webhook queue / worker
|
# Webhook queue / worker
|
||||||
WEBHOOK_REDIS_URL=redis://localhost:6379/0
|
WEBHOOK_REDIS_URL=redis://localhost:6379/0
|
||||||
|
|||||||
@@ -60,6 +60,8 @@ class Settings(BaseSettings):
|
|||||||
# Periodic reconciliation safety net for mission-control/approval checks.
|
# Periodic reconciliation safety net for mission-control/approval checks.
|
||||||
github_approval_check_schedule_id: str = "mission-control-approval-check-reconcile"
|
github_approval_check_schedule_id: str = "mission-control-approval-check-reconcile"
|
||||||
github_approval_check_schedule_interval_seconds: int = 900
|
github_approval_check_schedule_interval_seconds: int = 900
|
||||||
|
github_approval_check_reconcile_concurrency: int = Field(default=3, ge=1, le=10)
|
||||||
|
github_approval_check_reconcile_max_pr_urls: int = Field(default=500, ge=1)
|
||||||
|
|
||||||
# Database lifecycle
|
# Database lifecycle
|
||||||
db_auto_migrate: bool = False
|
db_auto_migrate: bool = False
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ A periodic reconciliation job should call the sync functions as a safety net.
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Literal, cast
|
from typing import Literal, cast
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
@@ -368,10 +370,35 @@ async def reconcile_github_approval_checks_for_board(
|
|||||||
if isinstance(value, str) and value.strip():
|
if isinstance(value, str) and value.strip():
|
||||||
pr_urls.add(value.strip())
|
pr_urls.add(value.strip())
|
||||||
|
|
||||||
for pr_url in sorted(pr_urls):
|
pr_url_list = sorted(pr_urls)
|
||||||
await sync_github_approval_check_for_pr_url(session, board_id=board_id, pr_url=pr_url)
|
max_urls = settings.github_approval_check_reconcile_max_pr_urls
|
||||||
|
if len(pr_url_list) > max_urls:
|
||||||
|
logger.warning(
|
||||||
|
"github.approval_check.reconcile.truncated_pr_urls",
|
||||||
|
extra={"board_id": str(board_id), "count": len(pr_url_list), "max": max_urls},
|
||||||
|
)
|
||||||
|
pr_url_list = pr_url_list[:max_urls]
|
||||||
|
|
||||||
return len(pr_urls)
|
sem = asyncio.Semaphore(settings.github_approval_check_reconcile_concurrency)
|
||||||
|
|
||||||
|
async def _run(url: str) -> None:
|
||||||
|
async with sem:
|
||||||
|
await sync_github_approval_check_for_pr_url(
|
||||||
|
session,
|
||||||
|
board_id=board_id,
|
||||||
|
pr_url=url,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process concurrently but bounded to avoid overwhelming GitHub.
|
||||||
|
results = await asyncio.gather(*[_run(url) for url in pr_url_list], return_exceptions=True)
|
||||||
|
for result in results:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
logger.exception(
|
||||||
|
"github.approval_check.reconcile.pr_failed",
|
||||||
|
extra={"board_id": str(board_id), "error": str(result)},
|
||||||
|
)
|
||||||
|
|
||||||
|
return len(pr_url_list)
|
||||||
|
|
||||||
|
|
||||||
async def reconcile_mission_control_approval_checks_for_all_boards() -> int:
|
async def reconcile_mission_control_approval_checks_for_all_boards() -> int:
|
||||||
|
|||||||
Reference in New Issue
Block a user