From 3fd5fe5f8ca3b85f59935b1a8e7a38e8209ca7bc Mon Sep 17 00:00:00 2001 From: Simone Date: Tue, 24 Feb 2026 17:36:44 +0100 Subject: [PATCH] fix: make security headers configurable and add tests --- backend/.env.example | 5 + backend/app/core/config.py | 5 + backend/app/core/security_headers.py | 81 ++++++++++++ backend/app/main.py | 21 ++-- .../tests/test_security_headers_middleware.py | 119 ++++++++++++++++++ 5 files changed, 219 insertions(+), 12 deletions(-) create mode 100644 backend/app/core/security_headers.py create mode 100644 backend/tests/test_security_headers_middleware.py diff --git a/backend/.env.example b/backend/.env.example index dd3f52c6..098f36b1 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -7,6 +7,11 @@ REQUEST_LOG_INCLUDE_HEALTH=false DATABASE_URL=postgresql+psycopg://postgres:postgres@localhost:5432/mission_control CORS_ORIGINS=http://localhost:3000 BASE_URL= +# Security response headers (blank values disable each header). +SECURITY_HEADER_X_CONTENT_TYPE_OPTIONS= +SECURITY_HEADER_X_FRAME_OPTIONS= +SECURITY_HEADER_REFERRER_POLICY= +SECURITY_HEADER_PERMISSIONS_POLICY= # Auth mode: clerk or local. AUTH_MODE=local diff --git a/backend/app/core/config.py b/backend/app/core/config.py index a6df6a01..67822b57 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -49,6 +49,11 @@ class Settings(BaseSettings): cors_origins: str = "" base_url: str = "" + # Security response headers (blank disables header injection) + security_header_x_content_type_options: str = "" + security_header_x_frame_options: str = "" + security_header_referrer_policy: str = "" + security_header_permissions_policy: str = "" # Database lifecycle db_auto_migrate: bool = False diff --git a/backend/app/core/security_headers.py b/backend/app/core/security_headers.py new file mode 100644 index 00000000..3aa3a78e --- /dev/null +++ b/backend/app/core/security_headers.py @@ -0,0 +1,81 @@ +"""ASGI middleware for configurable security response headers.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: # pragma: no cover + from starlette.types import ASGIApp, Message, Receive, Scope, Send + + +class SecurityHeadersMiddleware: + """Inject configured security headers into every HTTP response.""" + + _X_CONTENT_TYPE_OPTIONS = b"X-Content-Type-Options" + _X_FRAME_OPTIONS = b"X-Frame-Options" + _REFERRER_POLICY = b"Referrer-Policy" + _PERMISSIONS_POLICY = b"Permissions-Policy" + + def __init__( + self, + app: ASGIApp, + *, + x_content_type_options: str = "", + x_frame_options: str = "", + referrer_policy: str = "", + permissions_policy: str = "", + ) -> None: + self._app = app + self._configured_headers = self._build_configured_headers( + x_content_type_options=x_content_type_options, + x_frame_options=x_frame_options, + referrer_policy=referrer_policy, + permissions_policy=permissions_policy, + ) + + @classmethod + def _build_configured_headers( + cls, + *, + x_content_type_options: str, + x_frame_options: str, + referrer_policy: str, + permissions_policy: str, + ) -> tuple[tuple[bytes, bytes, bytes], ...]: + configured: list[tuple[bytes, bytes, bytes]] = [] + for header_name, value in ( + (cls._X_CONTENT_TYPE_OPTIONS, x_content_type_options), + (cls._X_FRAME_OPTIONS, x_frame_options), + (cls._REFERRER_POLICY, referrer_policy), + (cls._PERMISSIONS_POLICY, permissions_policy), + ): + normalized = value.strip() + if not normalized: + continue + configured.append( + ( + header_name.lower(), + header_name, + normalized.encode("latin-1"), + ) + ) + return tuple(configured) + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + """Append configured security headers unless already present.""" + if scope["type"] != "http" or not self._configured_headers: + await self._app(scope, receive, send) + return + + async def send_with_security_headers(message: Message) -> None: + if message["type"] == "http.response.start": + # Starlette uses `list[tuple[bytes, bytes]]` for raw headers. + headers: list[tuple[bytes, bytes]] = message.setdefault("headers", []) + existing = {key.lower() for key, _ in headers} + for key_lower, key, value in self._configured_headers: + if key_lower not in existing: + headers.append((key, value)) + existing.add(key_lower) + await send(message) + + await self._app(scope, receive, send_with_security_headers) diff --git a/backend/app/main.py b/backend/app/main.py index d922241a..4761c24f 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -5,7 +5,7 @@ from __future__ import annotations from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any -from fastapi import APIRouter, FastAPI, Request, Response, status +from fastapi import APIRouter, FastAPI, status from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.utils import get_openapi from fastapi_pagination import add_pagination @@ -34,6 +34,7 @@ from app.api.users import router as users_router from app.core.config import settings from app.core.error_handling import install_error_handling from app.core.logging import configure_logging, get_logger +from app.core.security_headers import SecurityHeadersMiddleware from app.db.session import init_db from app.schemas.health import HealthStatusResponse @@ -464,20 +465,16 @@ if origins: else: logger.info("app.cors.disabled") +app.add_middleware( + SecurityHeadersMiddleware, + x_content_type_options=settings.security_header_x_content_type_options, + x_frame_options=settings.security_header_x_frame_options, + referrer_policy=settings.security_header_referrer_policy, + permissions_policy=settings.security_header_permissions_policy, +) install_error_handling(app) -@app.middleware("http") -async def security_headers(request: Request, call_next: Any) -> Response: - """Inject standard security headers into every response.""" - response: Response = await call_next(request) - response.headers.setdefault("X-Content-Type-Options", "nosniff") - response.headers.setdefault("X-Frame-Options", "DENY") - response.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin") - response.headers.setdefault("Permissions-Policy", "camera=(), microphone=(), geolocation=()") - return response - - @app.get( "/health", tags=["health"], diff --git a/backend/tests/test_security_headers_middleware.py b/backend/tests/test_security_headers_middleware.py new file mode 100644 index 00000000..978b6c19 --- /dev/null +++ b/backend/tests/test_security_headers_middleware.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import pytest +from fastapi import FastAPI, Response +from fastapi.middleware.cors import CORSMiddleware +from fastapi.testclient import TestClient + +from app.core.security_headers import SecurityHeadersMiddleware + + +@pytest.mark.asyncio +async def test_security_headers_middleware_passes_through_non_http_scope() -> None: + called = False + + async def app(scope, receive, send): # type: ignore[no-untyped-def] + _ = receive + _ = send + nonlocal called + called = scope["type"] == "websocket" + + middleware = SecurityHeadersMiddleware(app, x_frame_options="SAMEORIGIN") + await middleware({"type": "websocket", "headers": []}, lambda: None, lambda _: None) + + assert called is True + + +def test_security_headers_middleware_injects_configured_headers() -> None: + app = FastAPI() + app.add_middleware( + SecurityHeadersMiddleware, + x_content_type_options="nosniff", + x_frame_options="SAMEORIGIN", + referrer_policy="strict-origin-when-cross-origin", + permissions_policy="camera=(), microphone=(), geolocation=()", + ) + + @app.get("/ok") + def ok() -> dict[str, bool]: + return {"ok": True} + + response = TestClient(app).get("/ok") + + assert response.status_code == 200 + assert response.headers["x-content-type-options"] == "nosniff" + assert response.headers["x-frame-options"] == "SAMEORIGIN" + assert response.headers["referrer-policy"] == "strict-origin-when-cross-origin" + assert response.headers["permissions-policy"] == "camera=(), microphone=(), geolocation=()" + + +def test_security_headers_middleware_does_not_override_existing_values() -> None: + app = FastAPI() + app.add_middleware( + SecurityHeadersMiddleware, + x_content_type_options="nosniff", + x_frame_options="SAMEORIGIN", + referrer_policy="strict-origin-when-cross-origin", + permissions_policy="camera=(), microphone=(), geolocation=()", + ) + + @app.get("/already-set") + def already_set(response: Response) -> dict[str, bool]: + response.headers["X-Frame-Options"] = "ALLOWALL" + response.headers["Referrer-Policy"] = "unsafe-url" + return {"ok": True} + + response = TestClient(app).get("/already-set") + + assert response.status_code == 200 + assert response.headers["x-content-type-options"] == "nosniff" + assert response.headers["x-frame-options"] == "ALLOWALL" + assert response.headers["referrer-policy"] == "unsafe-url" + assert response.headers["permissions-policy"] == "camera=(), microphone=(), geolocation=()" + + +def test_security_headers_middleware_includes_headers_on_cors_preflight() -> None: + app = FastAPI() + app.add_middleware( + CORSMiddleware, + allow_origins=["https://example.com"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + app.add_middleware( + SecurityHeadersMiddleware, + x_content_type_options="nosniff", + ) + + @app.get("/ok") + def ok() -> dict[str, bool]: + return {"ok": True} + + response = TestClient(app).options( + "/ok", + headers={ + "Origin": "https://example.com", + "Access-Control-Request-Method": "GET", + }, + ) + + assert response.status_code == 200 + assert response.headers["x-content-type-options"] == "nosniff" + + +def test_security_headers_middleware_skips_blank_config_values() -> None: + app = FastAPI() + app.add_middleware(SecurityHeadersMiddleware) + + @app.get("/ok") + def ok() -> dict[str, bool]: + return {"ok": True} + + response = TestClient(app).get("/ok") + + assert response.status_code == 200 + assert response.headers.get("x-content-type-options") is None + assert response.headers.get("x-frame-options") is None + assert response.headers.get("referrer-policy") is None + assert response.headers.get("permissions-policy") is None