diff --git a/backend/app/api/agent.py b/backend/app/api/agent.py index 54f34ae1..fbff168f 100644 --- a/backend/app/api/agent.py +++ b/backend/app/api/agent.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json from enum import Enum from typing import TYPE_CHECKING, Any, cast from uuid import UUID @@ -20,6 +21,7 @@ from app.core.agent_auth import AgentAuthContext, get_agent_auth_context from app.db.pagination import paginate from app.db.session import get_session from app.models.agents import Agent +from app.models.board_webhook_payloads import BoardWebhookPayload from app.models.boards import Board from app.models.tags import Tag from app.models.task_dependencies import TaskDependency @@ -33,6 +35,7 @@ from app.schemas.agents import ( from app.schemas.approvals import ApprovalCreate, ApprovalRead, ApprovalStatus from app.schemas.board_memory import BoardMemoryCreate, BoardMemoryRead from app.schemas.board_onboarding import BoardOnboardingAgentUpdate, BoardOnboardingRead +from app.schemas.board_webhooks import BoardWebhookPayloadRead from app.schemas.boards import BoardRead from app.schemas.common import OkResponse from app.schemas.errors import LLMErrorResponse @@ -167,6 +170,53 @@ def _agent_board_openapi_hints( } +def _truncate_preview(raw: str, max_chars: int) -> str: + if len(raw) <= max_chars: + return raw + if max_chars <= 3: + return raw[:max_chars] + return f"{raw[: max_chars - 3]}..." + + +def _payload_preview_with_limit( + value: dict[str, object] | list[object] | str | int | float | bool | None, + *, + max_chars: int, +) -> tuple[str, bool]: + if isinstance(value, str): + return _truncate_preview(value, max_chars), len(value) > max_chars + + try: + # Stream JSON chunks so we can stop once we know truncation is required. + encoder = json.JSONEncoder(ensure_ascii=True) + parts: list[str] = [] + current_len = 0 + truncated = False + for chunk in encoder.iterencode(value): + remaining = (max_chars + 1) - current_len + if remaining <= 0: + truncated = True + break + if len(chunk) <= remaining: + parts.append(chunk) + current_len += len(chunk) + continue + parts.append(chunk[:remaining]) + current_len += remaining + truncated = True + break + raw = "".join(parts) + except TypeError: + raw = str(value) + return _truncate_preview(raw, max_chars), len(raw) > max_chars + + if len(raw) > max_chars: + truncated = True + if not truncated: + return raw, False + return _truncate_preview(raw, max_chars), True + + def _guard_board_access(agent_ctx: AgentAuthContext, board: Board) -> None: allowed = not (agent_ctx.agent.board_id and agent_ctx.agent.board_id != board.id) OpenClawAuthorizationPolicy.require_board_write_access(allowed=allowed) @@ -572,6 +622,73 @@ async def list_tags( ] +@router.get( + "/boards/{board_id}/webhooks/{webhook_id}/payloads/{payload_id}", + response_model=BoardWebhookPayloadRead, + tags=AGENT_BOARD_TAGS, + openapi_extra=_agent_board_openapi_hints( + intent="agent_board_webhook_payload_read", + when_to_use=[ + "Agent needs to inspect a previously captured webhook payload for this board.", + "Agent is reconciling missed webhook events or deduping inbound processing.", + ], + routing_examples=[ + { + "input": { + "intent": "inspect stored webhook payload by id", + "required_privilege": "any_agent", + }, + "decision": "agent_board_webhook_payload_read", + }, + { + "input": { + "intent": "list tasks for planning", + "required_privilege": "any_agent", + }, + "decision": "agent_board_task_discovery", + }, + ], + ), +) +async def get_webhook_payload( + webhook_id: UUID, + payload_id: UUID, + max_chars: int | None = Query(default=None, ge=1, le=1_000_000), + board: Board = BOARD_DEP, + session: AsyncSession = SESSION_DEP, + agent_ctx: AgentAuthContext = AGENT_CTX_DEP, +) -> BoardWebhookPayloadRead: + """Fetch a stored webhook payload (agent-accessible, read-only). + + This enables board-scoped agents to backfill dropped webhook events and enforce + idempotency by inspecting previously received payloads. + + If `max_chars` is provided and the serialized payload exceeds the limit, + the response payload is returned as a truncated string preview. + """ + + _guard_board_access(agent_ctx, board) + + payload = ( + await session.exec( + select(BoardWebhookPayload) + .where(col(BoardWebhookPayload.id) == payload_id) + .where(col(BoardWebhookPayload.board_id) == board.id) + .where(col(BoardWebhookPayload.webhook_id) == webhook_id), + ) + ).first() + if payload is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + + response = BoardWebhookPayloadRead.model_validate(payload, from_attributes=True) + if max_chars is not None and response.payload is not None: + preview, was_truncated = _payload_preview_with_limit(response.payload, max_chars=max_chars) + if was_truncated: + response.payload = preview + + return response + + @router.post( "/boards/{board_id}/tasks", response_model=TaskRead, diff --git a/backend/tests/test_agent_webhook_payload_read_api.py b/backend/tests/test_agent_webhook_payload_read_api.py new file mode 100644 index 00000000..a4c45ece --- /dev/null +++ b/backend/tests/test_agent_webhook_payload_read_api.py @@ -0,0 +1,287 @@ +# ruff: noqa: INP001 + +from __future__ import annotations + +import json +from uuid import UUID, uuid4 + +import pytest +from fastapi import APIRouter, Depends, FastAPI +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine +from sqlmodel import SQLModel +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.api.agent import router as agent_router +from app.api.deps import get_board_or_404 +from app.core.agent_tokens import hash_agent_token +from app.db.session import get_session +from app.models.agents import Agent +from app.models.board_webhook_payloads import BoardWebhookPayload +from app.models.board_webhooks import BoardWebhook +from app.models.boards import Board +from app.models.gateways import Gateway +from app.models.organizations import Organization + + +async def _make_engine() -> AsyncEngine: + engine = create_async_engine("sqlite+aiosqlite:///:memory:") + async with engine.connect() as conn, conn.begin(): + await conn.run_sync(SQLModel.metadata.create_all) + return engine + + +def _build_test_app(session_maker: async_sessionmaker[AsyncSession]) -> FastAPI: + app = FastAPI() + api_v1 = APIRouter(prefix="/api/v1") + api_v1.include_router(agent_router) + app.include_router(api_v1) + + async def _override_get_session() -> AsyncSession: + async with session_maker() as session: + yield session + + async def _override_get_board_or_404( + board_id: str, + session: AsyncSession = Depends(get_session), + ) -> Board: + board = await Board.objects.by_id(UUID(board_id)).first(session) + if board is None: + from fastapi import HTTPException, status + + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) + return board + + app.dependency_overrides[get_session] = _override_get_session + app.dependency_overrides[get_board_or_404] = _override_get_board_or_404 + return app + + +async def _seed_payload( + session: AsyncSession, + *, + payload_value: dict[str, object] | list[object] | str | int | float | bool | None = None, +) -> tuple[str, Board, BoardWebhook, BoardWebhookPayload]: + token = "test-agent-token-" + uuid4().hex + token_hash = hash_agent_token(token) + + organization_id = uuid4() + gateway_id = uuid4() + board_id = uuid4() + webhook_id = uuid4() + agent_id = uuid4() + payload_id = uuid4() + + session.add(Organization(id=organization_id, name=f"org-{organization_id}")) + session.add( + Gateway( + id=gateway_id, + organization_id=organization_id, + name="gateway", + url="https://gateway.example.local", + workspace_root="/tmp/workspace", + ), + ) + board = Board( + id=board_id, + organization_id=organization_id, + gateway_id=gateway_id, + name="Board", + slug="board", + ) + session.add(board) + session.add( + Agent( + id=agent_id, + board_id=board_id, + gateway_id=gateway_id, + name="Lead Agent", + status="online", + is_board_lead=True, + openclaw_session_id="agent:lead:session", + agent_token_hash=token_hash, + ), + ) + webhook = BoardWebhook( + id=webhook_id, + board_id=board_id, + description="Triage payload", + enabled=True, + ) + session.add(webhook) + payload = BoardWebhookPayload( + id=payload_id, + board_id=board_id, + webhook_id=webhook_id, + payload=payload_value or {"event": "push", "ref": "refs/heads/master"}, + headers={"x-github-event": "push"}, + content_type="application/json", + source_ip="127.0.0.1", + ) + session.add(payload) + await session.commit() + return token, board, webhook, payload + + +@pytest.mark.asyncio +async def test_agent_can_fetch_webhook_payload() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + token, board, webhook, payload = await _seed_payload(session) + + try: + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + response = await client.get( + f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}", + headers={"X-Agent-Token": token}, + ) + + assert response.status_code == 200 + body = response.json() + assert body["id"] == str(payload.id) + assert body["board_id"] == str(board.id) + assert body["webhook_id"] == str(webhook.id) + assert body["payload"] == {"event": "push", "ref": "refs/heads/master"} + assert body["headers"]["x-github-event"] == "push" + + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_agent_payload_read_rejects_invalid_token() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + _token, board, webhook, payload = await _seed_payload(session) + + try: + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + response = await client.get( + f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}", + headers={"X-Agent-Token": "invalid"}, + ) + + assert response.status_code == 401 + + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_agent_payload_read_truncates_json_preview_with_ellipsis() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + payload_value: dict[str, object] = {"event": "push", "ref": "refs/heads/master"} + token, board, webhook, payload = await _seed_payload(session, payload_value=payload_value) + + max_chars = 12 + raw = json.dumps(payload_value, ensure_ascii=True) + expected_preview = f"{raw[: max_chars - 3]}..." + + try: + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + response = await client.get( + f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}", + headers={"X-Agent-Token": token}, + params={"max_chars": max_chars}, + ) + + assert response.status_code == 200 + body = response.json() + assert body["payload"] == expected_preview + + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_agent_payload_read_truncates_string_preview_without_json_quoting() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + token, board, webhook, payload = await _seed_payload(session, payload_value="abcdef") + + try: + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + response = await client.get( + f"/api/v1/agent/boards/{board.id}/webhooks/{webhook.id}/payloads/{payload.id}", + headers={"X-Agent-Token": token}, + params={"max_chars": 4}, + ) + + assert response.status_code == 200 + body = response.json() + assert body["payload"] == "a..." + + finally: + await engine.dispose() + + +@pytest.mark.asyncio +async def test_agent_payload_read_rejects_cross_board_access() -> None: + engine = await _make_engine() + session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + app = _build_test_app(session_maker) + + async with session_maker() as session: + token, board, webhook, payload = await _seed_payload(session) + + # Second board + payload that should be inaccessible to the first board agent. + organization_id = uuid4() + gateway_id = uuid4() + other_board = Board( + id=uuid4(), + organization_id=organization_id, + gateway_id=gateway_id, + name="Other", + slug="other", + ) + session.add(Organization(id=organization_id, name=f"org-{organization_id}")) + session.add( + Gateway( + id=gateway_id, + organization_id=organization_id, + name="gateway", + url="https://gateway.example.local", + workspace_root="/tmp/workspace", + ), + ) + session.add(other_board) + other_webhook = BoardWebhook( + id=uuid4(), + board_id=other_board.id, + description="Other webhook", + enabled=True, + ) + session.add(other_webhook) + other_payload = BoardWebhookPayload( + id=uuid4(), + board_id=other_board.id, + webhook_id=other_webhook.id, + payload={"event": "push"}, + ) + session.add(other_payload) + await session.commit() + + try: + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + response = await client.get( + f"/api/v1/agent/boards/{other_board.id}/webhooks/{other_webhook.id}/payloads/{other_payload.id}", + headers={"X-Agent-Token": token}, + ) + + assert response.status_code == 403 + + finally: + await engine.dispose() diff --git a/backend/tests/test_openapi_agent_webhook_payload_endpoint.py b/backend/tests/test_openapi_agent_webhook_payload_endpoint.py new file mode 100644 index 00000000..f84fc95a --- /dev/null +++ b/backend/tests/test_openapi_agent_webhook_payload_endpoint.py @@ -0,0 +1,16 @@ +# ruff: noqa: INP001, S101 + +from __future__ import annotations + +from app.main import app + + +def test_openapi_includes_agent_webhook_payload_read_endpoint() -> None: + schema = app.openapi() + + path = "/api/v1/agent/boards/{board_id}/webhooks/{webhook_id}/payloads/{payload_id}" + assert path in schema["paths"] + op = schema["paths"][path]["get"] + tags = set(op.get("tags", [])) + assert "agent-worker" in tags + assert op.get("x-llm-intent") == "agent_board_webhook_payload_read"