security: add HMAC signature verification to webhook ingest

Webhook ingest endpoint was completely unauthenticated. Add an optional
`secret` field to BoardWebhook. When configured, inbound requests must
include a valid HMAC-SHA256 signature in X-Hub-Signature-256 or
X-Webhook-Signature headers. Uses hmac.compare_digest for timing safety.
Includes migration to add the secret column.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Hugh Brown
2026-03-03 13:33:28 -07:00
committed by Abhimanyu Saharan
parent 10848b98cb
commit 4d1dbb4098
4 changed files with 87 additions and 1 deletions

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
import hashlib
import hmac
import json
from typing import TYPE_CHECKING
from uuid import UUID
@@ -67,6 +69,7 @@ def _to_webhook_read(webhook: BoardWebhook) -> BoardWebhookRead:
agent_id=webhook.agent_id,
description=webhook.description,
enabled=webhook.enabled,
has_secret=webhook.secret is not None,
endpoint_path=endpoint_path,
endpoint_url=_webhook_endpoint_url(endpoint_path),
created_at=webhook.created_at,
@@ -160,6 +163,43 @@ def _decode_payload(
return body_text
def _verify_webhook_signature(
webhook: BoardWebhook,
raw_body: bytes,
request: Request,
) -> None:
"""Verify HMAC-SHA256 signature if the webhook has a secret configured.
When a secret is set, the sender must include a valid signature in one of:
X-Hub-Signature-256: sha256=<hex-digest> (GitHub-style)
X-Webhook-Signature: sha256=<hex-digest>
If no secret is configured, signature verification is skipped.
"""
if not webhook.secret:
return
sig_header = (
request.headers.get("x-hub-signature-256")
or request.headers.get("x-webhook-signature")
)
if not sig_header:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing webhook signature header.",
)
if sig_header.startswith("sha256="):
sig_header = sig_header[7:]
expected = hmac.new(
webhook.secret.encode("utf-8"),
raw_body,
hashlib.sha256,
).hexdigest()
if not hmac.compare_digest(sig_header, expected):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid webhook signature.",
)
def _captured_headers(request: Request) -> dict[str, str] | None:
captured: dict[str, str] = {}
for header, value in request.headers.items():
@@ -454,10 +494,13 @@ async def ingest_board_webhook(
detail="Webhook is disabled.",
)
raw_body = await request.body()
_verify_webhook_signature(webhook, raw_body, request)
content_type = request.headers.get("content-type")
headers = _captured_headers(request)
payload_value = _decode_payload(
await request.body(),
raw_body,
content_type=content_type,
)
payload = BoardWebhookPayload(

View File

@@ -23,5 +23,6 @@ class BoardWebhook(QueryModel, table=True):
agent_id: UUID | None = Field(default=None, foreign_key="agents.id", index=True)
description: str
enabled: bool = Field(default=True, index=True)
secret: str | None = Field(default=None)
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)

View File

@@ -18,6 +18,7 @@ class BoardWebhookCreate(SQLModel):
description: NonEmptyStr
enabled: bool = True
agent_id: UUID | None = None
secret: str | None = None
class BoardWebhookUpdate(SQLModel):
@@ -26,6 +27,7 @@ class BoardWebhookUpdate(SQLModel):
description: NonEmptyStr | None = None
enabled: bool | None = None
agent_id: UUID | None = None
secret: str | None = None
class BoardWebhookRead(SQLModel):
@@ -36,6 +38,7 @@ class BoardWebhookRead(SQLModel):
agent_id: UUID | None = None
description: str
enabled: bool
has_secret: bool = False
endpoint_path: str
endpoint_url: str | None = None
created_at: datetime

View File

@@ -0,0 +1,39 @@
"""Add optional secret column to board_webhooks for HMAC signature verification.
Revision ID: a1b2c3d4e5f6
Revises: fa6e83f8d9a1
Create Date: 2026-03-03 00:00:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "a1b2c3d4e5f6"
down_revision = "f1b2c3d4e5a6"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Add secret column to board_webhooks table."""
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = {c["name"] for c in inspector.get_columns("board_webhooks")}
if "secret" not in columns:
op.add_column(
"board_webhooks",
sa.Column("secret", sa.String(), nullable=True),
)
def downgrade() -> None:
"""Remove secret column from board_webhooks table."""
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = {c["name"] for c in inspector.get_columns("board_webhooks")}
if "secret" in columns:
op.drop_column("board_webhooks", "secret")