feat(agent): add read-only webhook payload fetch endpoint for backfill

This commit is contained in:
Abhimanyu Saharan
2026-02-14 19:05:33 +00:00
committed by Abhimanyu Saharan
parent 9ccb9aefd8
commit 3fc96baa10
3 changed files with 297 additions and 0 deletions

View File

@@ -21,6 +21,7 @@ from app.db.pagination import paginate
from app.db.session import get_session from app.db.session import get_session
from app.models.agents import Agent from app.models.agents import Agent
from app.models.boards import Board from app.models.boards import Board
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.tags import Tag from app.models.tags import Tag
from app.models.task_dependencies import TaskDependency from app.models.task_dependencies import TaskDependency
from app.models.tasks import Task from app.models.tasks import Task
@@ -33,6 +34,7 @@ from app.schemas.agents import (
from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalStatus from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalStatus
from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
from app.schemas.board_onboarding import BoardOnboardingAgentUpdate, BoardOnboardingRead from app.schemas.board_onboarding import BoardOnboardingAgentUpdate, BoardOnboardingRead
from app.schemas.board_webhooks import BoardWebhookPayloadRead
from app.schemas.boards import BoardRead from app.schemas.boards import BoardRead
from app.schemas.common import OkResponse from app.schemas.common import OkResponse
from app.schemas.errors import LLMErrorResponse from app.schemas.errors import LLMErrorResponse
@@ -572,6 +574,58 @@ async def list_tags(
] ]
@router.get(
"/boards/{board_id}/webhooks/{webhook_id}/payloads/{payload_id}",
response_model=BoardWebhookPayloadRead,
tags=AGENT_BOARD_TAGS,
)
async def get_webhook_payload(
webhook_id: UUID,
payload_id: UUID,
max_chars: int | None = Query(default=None, ge=1, le=1_000_000),
board: Board = BOARD_DEP,
session: AsyncSession = SESSION_DEP,
agent_ctx: AgentAuthContext = AGENT_CTX_DEP,
) -> BoardWebhookPayloadRead:
"""Fetch a stored webhook payload (agent-accessible, read-only).
This enables lead agents to backfill dropped webhook events and enforce
idempotency by inspecting previously received payloads.
If `max_chars` is provided and the serialized payload exceeds the limit,
the response payload is returned as a truncated string preview.
"""
_guard_board_access(agent_ctx, board)
payload = (
await session.exec(
select(BoardWebhookPayload)
.where(col(BoardWebhookPayload.id) == payload_id)
.where(col(BoardWebhookPayload.board_id) == board.id)
.where(col(BoardWebhookPayload.webhook_id) == webhook_id),
)
).first()
if payload is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
response = BoardWebhookPayloadRead.model_validate(payload, from_attributes=True)
if max_chars is not None and response.payload is not None:
import json
try:
raw = json.dumps(response.payload, ensure_ascii=True)
except TypeError:
raw = str(response.payload)
if len(raw) > max_chars:
if max_chars <= 3:
response.payload = raw[:max_chars]
else:
response.payload = f"{raw[: max_chars - 3]}..."
return response
@router.post( @router.post(
"/boards/{board_id}/tasks", "/boards/{board_id}/tasks",
response_model=TaskRead, response_model=TaskRead,
@@ -647,6 +701,7 @@ async def list_tags(
], ],
}, },
) )
async def create_task( async def create_task(
payload: TaskCreate, payload: TaskCreate,
board: Board = BOARD_DEP, board: Board = BOARD_DEP,

View File

@@ -0,0 +1,227 @@
# ruff: noqa: INP001
from __future__ import annotations
from uuid import UUID, uuid4
import pytest
from fastapi import APIRouter, Depends, FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
from app.api.agent import router as agent_router
from app.api.deps import get_board_or_404
from app.core.agent_tokens import hash_agent_token
from app.db.session import get_session
from app.models.agents import Agent
from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board
from app.models.gateways import Gateway
from app.models.organizations import Organization
async def _make_engine() -> AsyncEngine:
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.connect() as conn, conn.begin():
await conn.run_sync(SQLModel.metadata.create_all)
return engine
def _build_test_app(session_maker: async_sessionmaker[AsyncSession]) -> FastAPI:
app = FastAPI()
api_v1 = APIRouter(prefix="/api/v1")
api_v1.include_router(agent_router)
app.include_router(api_v1)
async def _override_get_session() -> AsyncSession:
async with session_maker() as session:
yield session
async def _override_get_board_or_404(
board_id: str,
session: AsyncSession = Depends(get_session),
) -> Board:
board = await Board.objects.by_id(UUID(board_id)).first(session)
if board is None:
from fastapi import HTTPException, status
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return board
app.dependency_overrides[get_session] = _override_get_session
app.dependency_overrides[get_board_or_404] = _override_get_board_or_404
return app
async def _seed_payload(session: AsyncSession) -> tuple[str, Board, BoardWebhook, BoardWebhookPayload]:
token = "test-agent-token-" + uuid4().hex
token_hash = hash_agent_token(token)
organization_id = uuid4()
gateway_id = uuid4()
board_id = uuid4()
webhook_id = uuid4()
agent_id = uuid4()
payload_id = uuid4()
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
session.add(
Gateway(
id=gateway_id,
organization_id=organization_id,
name="gateway",
url="https://gateway.example.local",
workspace_root="/tmp/workspace",
),
)
board = Board(
id=board_id,
organization_id=organization_id,
gateway_id=gateway_id,
name="Board",
slug="board",
)
session.add(board)
session.add(
Agent(
id=agent_id,
board_id=board_id,
gateway_id=gateway_id,
name="Lead Agent",
status="online",
is_board_lead=True,
openclaw_session_id="agent:lead:session",
agent_token_hash=token_hash,
),
)
webhook = BoardWebhook(
id=webhook_id,
board_id=board_id,
description="Triage payload",
enabled=True,
)
session.add(webhook)
payload = BoardWebhookPayload(
id=payload_id,
board_id=board_id,
webhook_id=webhook_id,
payload={"event": "push", "ref": "refs/heads/master"},
headers={"x-github-event": "push"},
content_type="application/json",
source_ip="127.0.0.1",
)
session.add(payload)
await session.commit()
return token, board, webhook, payload
@pytest.mark.asyncio
async def test_agent_can_fetch_webhook_payload() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
token, board, webhook, payload = await _seed_payload(session)
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get(
f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}",
headers={"X-Agent-Token": token},
)
assert response.status_code == 200
body = response.json()
assert body["id"] == str(payload.id)
assert body["board_id"] == str(board.id)
assert body["webhook_id"] == str(webhook.id)
assert body["payload"] == {"event": "push", "ref": "refs/heads/master"}
assert body["headers"]["x-github-event"] == "push"
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_agent_payload_read_rejects_invalid_token() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
_token, board, webhook, payload = await _seed_payload(session)
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get(
f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}",
headers={"X-Agent-Token": "invalid"},
)
assert response.status_code == 401
finally:
await engine.dispose()
@pytest.mark.asyncio
async def test_agent_payload_read_rejects_cross_board_access() -> None:
engine = await _make_engine()
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = _build_test_app(session_maker)
async with session_maker() as session:
token, board, webhook, payload = await _seed_payload(session)
# Second board + payload that should be inaccessible to the first board agent.
organization_id = uuid4()
gateway_id = uuid4()
other_board = Board(
id=uuid4(),
organization_id=organization_id,
gateway_id=gateway_id,
name="Other",
slug="other",
)
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
session.add(
Gateway(
id=gateway_id,
organization_id=organization_id,
name="gateway",
url="https://gateway.example.local",
workspace_root="/tmp/workspace",
),
)
session.add(other_board)
other_webhook = BoardWebhook(
id=uuid4(),
board_id=other_board.id,
description="Other webhook",
enabled=True,
)
session.add(other_webhook)
other_payload = BoardWebhookPayload(
id=uuid4(),
board_id=other_board.id,
webhook_id=other_webhook.id,
payload={"event": "push"},
)
session.add(other_payload)
await session.commit()
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.get(
f"/api/v1/agent/boards/{other_board.id}/webhooks/{other_webhook.id}/payloads/{other_payload.id}",
headers={"X-Agent-Token": token},
)
assert response.status_code == 403
finally:
await engine.dispose()

View File

@@ -0,0 +1,15 @@
# ruff: noqa: INP001, S101
from __future__ import annotations
from app.main import app
def test_openapi_includes_agent_webhook_payload_read_endpoint() -> None:
schema = app.openapi()
path = "/api/v1/agent/boards/{board_id}/webhooks/{webhook_id}/payloads/{payload_id}"
assert path in schema["paths"]
op = schema["paths"][path]["get"]
tags = set(op.get("tags", []))
assert "agent-worker" in tags