Merge pull request #135 from abhi1693/feat/agent-webhook-payload-read
feat(agent): allow agents to read stored webhook payloads for backfill
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
from uuid import UUID
|
||||
@@ -20,6 +21,7 @@ from app.core.agent_auth import AgentAuthContext, get_agent_auth_context
|
||||
from app.db.pagination import paginate
|
||||
from app.db.session import get_session
|
||||
from app.models.agents import Agent
|
||||
from app.models.board_webhook_payloads import BoardWebhookPayload
|
||||
from app.models.boards import Board
|
||||
from app.models.tags import Tag
|
||||
from app.models.task_dependencies import TaskDependency
|
||||
@@ -33,6 +35,7 @@ from app.schemas.agents import (
|
||||
from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalStatus
|
||||
from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead
|
||||
from app.schemas.board_onboarding import BoardOnboardingAgentUpdate, BoardOnboardingRead
|
||||
from app.schemas.board_webhooks import BoardWebhookPayloadRead
|
||||
from app.schemas.boards import BoardRead
|
||||
from app.schemas.common import OkResponse
|
||||
from app.schemas.errors import LLMErrorResponse
|
||||
@@ -167,6 +170,53 @@ def _agent_board_openapi_hints(
|
||||
}
|
||||
|
||||
|
||||
def _truncate_preview(raw: str, max_chars: int) -> str:
|
||||
if len(raw) <= max_chars:
|
||||
return raw
|
||||
if max_chars <= 3:
|
||||
return raw[:max_chars]
|
||||
return f"{raw[: max_chars - 3]}..."
|
||||
|
||||
|
||||
def _payload_preview_with_limit(
|
||||
value: dict[str, object] | list[object] | str | int | float | bool | None,
|
||||
*,
|
||||
max_chars: int,
|
||||
) -> tuple[str, bool]:
|
||||
if isinstance(value, str):
|
||||
return _truncate_preview(value, max_chars), len(value) > max_chars
|
||||
|
||||
try:
|
||||
# Stream JSON chunks so we can stop once we know truncation is required.
|
||||
encoder = json.JSONEncoder(ensure_ascii=True)
|
||||
parts: list[str] = []
|
||||
current_len = 0
|
||||
truncated = False
|
||||
for chunk in encoder.iterencode(value):
|
||||
remaining = (max_chars + 1) - current_len
|
||||
if remaining <= 0:
|
||||
truncated = True
|
||||
break
|
||||
if len(chunk) <= remaining:
|
||||
parts.append(chunk)
|
||||
current_len += len(chunk)
|
||||
continue
|
||||
parts.append(chunk[:remaining])
|
||||
current_len += remaining
|
||||
truncated = True
|
||||
break
|
||||
raw = "".join(parts)
|
||||
except TypeError:
|
||||
raw = str(value)
|
||||
return _truncate_preview(raw, max_chars), len(raw) > max_chars
|
||||
|
||||
if len(raw) > max_chars:
|
||||
truncated = True
|
||||
if not truncated:
|
||||
return raw, False
|
||||
return _truncate_preview(raw, max_chars), True
|
||||
|
||||
|
||||
def _guard_board_access(agent_ctx: AgentAuthContext, board: Board) -> None:
|
||||
allowed = not (agent_ctx.agent.board_id and agent_ctx.agent.board_id != board.id)
|
||||
OpenClawAuthorizationPolicy.require_board_write_access(allowed=allowed)
|
||||
@@ -572,6 +622,73 @@ async def list_tags(
|
||||
]
|
||||
|
||||
|
||||
@router.get(
|
||||
"/boards/{board_id}/webhooks/{webhook_id}/payloads/{payload_id}",
|
||||
response_model=BoardWebhookPayloadRead,
|
||||
tags=AGENT_BOARD_TAGS,
|
||||
openapi_extra=_agent_board_openapi_hints(
|
||||
intent="agent_board_webhook_payload_read",
|
||||
when_to_use=[
|
||||
"Agent needs to inspect a previously captured webhook payload for this board.",
|
||||
"Agent is reconciling missed webhook events or deduping inbound processing.",
|
||||
],
|
||||
routing_examples=[
|
||||
{
|
||||
"input": {
|
||||
"intent": "inspect stored webhook payload by id",
|
||||
"required_privilege": "any_agent",
|
||||
},
|
||||
"decision": "agent_board_webhook_payload_read",
|
||||
},
|
||||
{
|
||||
"input": {
|
||||
"intent": "list tasks for planning",
|
||||
"required_privilege": "any_agent",
|
||||
},
|
||||
"decision": "agent_board_task_discovery",
|
||||
},
|
||||
],
|
||||
),
|
||||
)
|
||||
async def get_webhook_payload(
|
||||
webhook_id: UUID,
|
||||
payload_id: UUID,
|
||||
max_chars: int | None = Query(default=None, ge=1, le=1_000_000),
|
||||
board: Board = BOARD_DEP,
|
||||
session: AsyncSession = SESSION_DEP,
|
||||
agent_ctx: AgentAuthContext = AGENT_CTX_DEP,
|
||||
) -> BoardWebhookPayloadRead:
|
||||
"""Fetch a stored webhook payload (agent-accessible, read-only).
|
||||
|
||||
This enables board-scoped agents to backfill dropped webhook events and enforce
|
||||
idempotency by inspecting previously received payloads.
|
||||
|
||||
If `max_chars` is provided and the serialized payload exceeds the limit,
|
||||
the response payload is returned as a truncated string preview.
|
||||
"""
|
||||
|
||||
_guard_board_access(agent_ctx, board)
|
||||
|
||||
payload = (
|
||||
await session.exec(
|
||||
select(BoardWebhookPayload)
|
||||
.where(col(BoardWebhookPayload.id) == payload_id)
|
||||
.where(col(BoardWebhookPayload.board_id) == board.id)
|
||||
.where(col(BoardWebhookPayload.webhook_id) == webhook_id),
|
||||
)
|
||||
).first()
|
||||
if payload is None:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
||||
|
||||
response = BoardWebhookPayloadRead.model_validate(payload, from_attributes=True)
|
||||
if max_chars is not None and response.payload is not None:
|
||||
preview, was_truncated = _payload_preview_with_limit(response.payload, max_chars=max_chars)
|
||||
if was_truncated:
|
||||
response.payload = preview
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@router.post(
|
||||
"/boards/{board_id}/tasks",
|
||||
response_model=TaskRead,
|
||||
|
||||
287
backend/tests/test_agent_webhook_payload_read_api.py
Normal file
287
backend/tests/test_agent_webhook_payload_read_api.py
Normal file
@@ -0,0 +1,287 @@
|
||||
# ruff: noqa: INP001
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
import pytest
|
||||
from fastapi import APIRouter, Depends, FastAPI
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
|
||||
from sqlmodel import SQLModel
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
|
||||
from app.api.agent import router as agent_router
|
||||
from app.api.deps import get_board_or_404
|
||||
from app.core.agent_tokens import hash_agent_token
|
||||
from app.db.session import get_session
|
||||
from app.models.agents import Agent
|
||||
from app.models.board_webhook_payloads import BoardWebhookPayload
|
||||
from app.models.board_webhooks import BoardWebhook
|
||||
from app.models.boards import Board
|
||||
from app.models.gateways import Gateway
|
||||
from app.models.organizations import Organization
|
||||
|
||||
|
||||
async def _make_engine() -> AsyncEngine:
|
||||
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
|
||||
async with engine.connect() as conn, conn.begin():
|
||||
await conn.run_sync(SQLModel.metadata.create_all)
|
||||
return engine
|
||||
|
||||
|
||||
def _build_test_app(session_maker: async_sessionmaker[AsyncSession]) -> FastAPI:
|
||||
app = FastAPI()
|
||||
api_v1 = APIRouter(prefix="/api/v1")
|
||||
api_v1.include_router(agent_router)
|
||||
app.include_router(api_v1)
|
||||
|
||||
async def _override_get_session() -> AsyncSession:
|
||||
async with session_maker() as session:
|
||||
yield session
|
||||
|
||||
async def _override_get_board_or_404(
|
||||
board_id: str,
|
||||
session: AsyncSession = Depends(get_session),
|
||||
) -> Board:
|
||||
board = await Board.objects.by_id(UUID(board_id)).first(session)
|
||||
if board is None:
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
||||
return board
|
||||
|
||||
app.dependency_overrides[get_session] = _override_get_session
|
||||
app.dependency_overrides[get_board_or_404] = _override_get_board_or_404
|
||||
return app
|
||||
|
||||
|
||||
async def _seed_payload(
|
||||
session: AsyncSession,
|
||||
*,
|
||||
payload_value: dict[str, object] | list[object] | str | int | float | bool | None = None,
|
||||
) -> tuple[str, Board, BoardWebhook, BoardWebhookPayload]:
|
||||
token = "test-agent-token-" + uuid4().hex
|
||||
token_hash = hash_agent_token(token)
|
||||
|
||||
organization_id = uuid4()
|
||||
gateway_id = uuid4()
|
||||
board_id = uuid4()
|
||||
webhook_id = uuid4()
|
||||
agent_id = uuid4()
|
||||
payload_id = uuid4()
|
||||
|
||||
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
|
||||
session.add(
|
||||
Gateway(
|
||||
id=gateway_id,
|
||||
organization_id=organization_id,
|
||||
name="gateway",
|
||||
url="https://gateway.example.local",
|
||||
workspace_root="/tmp/workspace",
|
||||
),
|
||||
)
|
||||
board = Board(
|
||||
id=board_id,
|
||||
organization_id=organization_id,
|
||||
gateway_id=gateway_id,
|
||||
name="Board",
|
||||
slug="board",
|
||||
)
|
||||
session.add(board)
|
||||
session.add(
|
||||
Agent(
|
||||
id=agent_id,
|
||||
board_id=board_id,
|
||||
gateway_id=gateway_id,
|
||||
name="Lead Agent",
|
||||
status="online",
|
||||
is_board_lead=True,
|
||||
openclaw_session_id="agent:lead:session",
|
||||
agent_token_hash=token_hash,
|
||||
),
|
||||
)
|
||||
webhook = BoardWebhook(
|
||||
id=webhook_id,
|
||||
board_id=board_id,
|
||||
description="Triage payload",
|
||||
enabled=True,
|
||||
)
|
||||
session.add(webhook)
|
||||
payload = BoardWebhookPayload(
|
||||
id=payload_id,
|
||||
board_id=board_id,
|
||||
webhook_id=webhook_id,
|
||||
payload=payload_value or {"event": "push", "ref": "refs/heads/master"},
|
||||
headers={"x-github-event": "push"},
|
||||
content_type="application/json",
|
||||
source_ip="127.0.0.1",
|
||||
)
|
||||
session.add(payload)
|
||||
await session.commit()
|
||||
return token, board, webhook, payload
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_can_fetch_webhook_payload() -> None:
|
||||
engine = await _make_engine()
|
||||
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
app = _build_test_app(session_maker)
|
||||
|
||||
async with session_maker() as session:
|
||||
token, board, webhook, payload = await _seed_payload(session)
|
||||
|
||||
try:
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
||||
response = await client.get(
|
||||
f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}",
|
||||
headers={"X-Agent-Token": token},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert body["id"] == str(payload.id)
|
||||
assert body["board_id"] == str(board.id)
|
||||
assert body["webhook_id"] == str(webhook.id)
|
||||
assert body["payload"] == {"event": "push", "ref": "refs/heads/master"}
|
||||
assert body["headers"]["x-github-event"] == "push"
|
||||
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_payload_read_rejects_invalid_token() -> None:
|
||||
engine = await _make_engine()
|
||||
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
app = _build_test_app(session_maker)
|
||||
|
||||
async with session_maker() as session:
|
||||
_token, board, webhook, payload = await _seed_payload(session)
|
||||
|
||||
try:
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
||||
response = await client.get(
|
||||
f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}",
|
||||
headers={"X-Agent-Token": "invalid"},
|
||||
)
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_payload_read_truncates_json_preview_with_ellipsis() -> None:
|
||||
engine = await _make_engine()
|
||||
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
app = _build_test_app(session_maker)
|
||||
|
||||
async with session_maker() as session:
|
||||
payload_value: dict[str, object] = {"event": "push", "ref": "refs/heads/master"}
|
||||
token, board, webhook, payload = await _seed_payload(session, payload_value=payload_value)
|
||||
|
||||
max_chars = 12
|
||||
raw = json.dumps(payload_value, ensure_ascii=True)
|
||||
expected_preview = f"{raw[: max_chars - 3]}..."
|
||||
|
||||
try:
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
||||
response = await client.get(
|
||||
f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}",
|
||||
headers={"X-Agent-Token": token},
|
||||
params={"max_chars": max_chars},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert body["payload"] == expected_preview
|
||||
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_payload_read_truncates_string_preview_without_json_quoting() -> None:
|
||||
engine = await _make_engine()
|
||||
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
app = _build_test_app(session_maker)
|
||||
|
||||
async with session_maker() as session:
|
||||
token, board, webhook, payload = await _seed_payload(session, payload_value="abcdef")
|
||||
|
||||
try:
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
||||
response = await client.get(
|
||||
f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}",
|
||||
headers={"X-Agent-Token": token},
|
||||
params={"max_chars": 4},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert body["payload"] == "a..."
|
||||
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_agent_payload_read_rejects_cross_board_access() -> None:
|
||||
engine = await _make_engine()
|
||||
session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
app = _build_test_app(session_maker)
|
||||
|
||||
async with session_maker() as session:
|
||||
token, board, webhook, payload = await _seed_payload(session)
|
||||
|
||||
# Second board + payload that should be inaccessible to the first board agent.
|
||||
organization_id = uuid4()
|
||||
gateway_id = uuid4()
|
||||
other_board = Board(
|
||||
id=uuid4(),
|
||||
organization_id=organization_id,
|
||||
gateway_id=gateway_id,
|
||||
name="Other",
|
||||
slug="other",
|
||||
)
|
||||
session.add(Organization(id=organization_id, name=f"org-{organization_id}"))
|
||||
session.add(
|
||||
Gateway(
|
||||
id=gateway_id,
|
||||
organization_id=organization_id,
|
||||
name="gateway",
|
||||
url="https://gateway.example.local",
|
||||
workspace_root="/tmp/workspace",
|
||||
),
|
||||
)
|
||||
session.add(other_board)
|
||||
other_webhook = BoardWebhook(
|
||||
id=uuid4(),
|
||||
board_id=other_board.id,
|
||||
description="Other webhook",
|
||||
enabled=True,
|
||||
)
|
||||
session.add(other_webhook)
|
||||
other_payload = BoardWebhookPayload(
|
||||
id=uuid4(),
|
||||
board_id=other_board.id,
|
||||
webhook_id=other_webhook.id,
|
||||
payload={"event": "push"},
|
||||
)
|
||||
session.add(other_payload)
|
||||
await session.commit()
|
||||
|
||||
try:
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
||||
response = await client.get(
|
||||
f"/api/v1/agent/boards/{other_board.id}/webhooks/{other_webhook.id}/payloads/{other_payload.id}",
|
||||
headers={"X-Agent-Token": token},
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
|
||||
finally:
|
||||
await engine.dispose()
|
||||
16
backend/tests/test_openapi_agent_webhook_payload_endpoint.py
Normal file
16
backend/tests/test_openapi_agent_webhook_payload_endpoint.py
Normal file
@@ -0,0 +1,16 @@
|
||||
# ruff: noqa: INP001, S101
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from app.main import app
|
||||
|
||||
|
||||
def test_openapi_includes_agent_webhook_payload_read_endpoint() -> None:
|
||||
schema = app.openapi()
|
||||
|
||||
path = "/api/v1/agent/boards/{board_id}/webhooks/{webhook_id}/payloads/{payload_id}"
|
||||
assert path in schema["paths"]
|
||||
op = schema["paths"][path]["get"]
|
||||
tags = set(op.get("tags", []))
|
||||
assert "agent-worker" in tags
|
||||
assert op.get("x-llm-intent") == "agent_board_webhook_payload_read"
|
||||
Reference in New Issue
Block a user