Add get_client_ip() helper that inspects Forwarded and X-Forwarded-For headers only when the direct peer is in TRUSTED_PROXIES (comma-separated IPs/CIDRs). Replaces raw request.client.host in rate-limit and webhook source_ip to prevent all traffic collapsing behind a reverse proxy IP. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
122 lines
3.8 KiB
Python
122 lines
3.8 KiB
Python
"""Trusted client-IP extraction from proxy headers.
|
|
|
|
In proxied deployments ``request.client.host`` returns the reverse-proxy
|
|
IP. This module provides :func:`get_client_ip` which inspects the
|
|
``Forwarded`` and ``X-Forwarded-For`` headers **only** when the direct
|
|
peer is in the configured ``TRUSTED_PROXIES`` list.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import re
|
|
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
|
|
from typing import TYPE_CHECKING
|
|
|
|
from app.core.logging import get_logger
|
|
|
|
if TYPE_CHECKING:
|
|
from fastapi import Request
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# RFC 7239 ``for=`` directive value. Handles quoted/unquoted, optional
|
|
# port, and IPv6 bracket notation.
|
|
_FORWARDED_FOR_RE = re.compile(r'for="?(\[[^\]]+\]|[^";,\s]+)', re.IGNORECASE)
|
|
|
|
|
|
def _parse_trusted_networks(raw: str) -> list[IPv4Network | IPv6Network]:
|
|
"""Parse a comma-separated list of IPs/CIDRs into network objects."""
|
|
networks: list[IPv4Network | IPv6Network] = []
|
|
for entry in raw.split(","):
|
|
entry = entry.strip()
|
|
if not entry:
|
|
continue
|
|
try:
|
|
networks.append(ipaddress.ip_network(entry, strict=False))
|
|
except ValueError:
|
|
logger.warning("trusted_proxies: ignoring invalid entry %r", entry)
|
|
return networks
|
|
|
|
|
|
def _is_trusted(peer_ip: str, networks: list[IPv4Network | IPv6Network]) -> bool:
|
|
"""Check whether *peer_ip* falls within any trusted network."""
|
|
try:
|
|
addr = ipaddress.ip_address(peer_ip)
|
|
except ValueError:
|
|
return False
|
|
return any(addr in net for net in networks)
|
|
|
|
|
|
def _strip_port(raw: str) -> str:
|
|
"""Strip port suffix from a ``Forwarded: for=`` value.
|
|
|
|
Handles ``1.2.3.4:8080``, ``[::1]:8080``, and bare addresses.
|
|
"""
|
|
# Bracketed IPv6: ``[::1]:port`` or ``[::1]``
|
|
if raw.startswith("["):
|
|
bracket_end = raw.find("]")
|
|
if bracket_end != -1:
|
|
return raw[1:bracket_end]
|
|
return raw.strip("[]")
|
|
# IPv4 with port: ``1.2.3.4:8080``
|
|
if "." in raw and ":" in raw:
|
|
return raw.rsplit(":", 1)[0]
|
|
return raw
|
|
|
|
|
|
def _extract_from_forwarded(header: str) -> str | None:
|
|
"""Return the leftmost ``for=`` client IP from an RFC 7239 Forwarded header."""
|
|
match = _FORWARDED_FOR_RE.search(header)
|
|
if not match:
|
|
return None
|
|
value = _strip_port(match.group(1))
|
|
return value or None
|
|
|
|
|
|
def _extract_from_x_forwarded_for(header: str) -> str | None:
|
|
"""Return the leftmost entry from an X-Forwarded-For header."""
|
|
first = header.split(",", 1)[0].strip()
|
|
return first or None
|
|
|
|
|
|
def get_client_ip(request: Request) -> str:
|
|
"""Extract the real client IP, respecting proxy headers when trusted.
|
|
|
|
Falls back to ``request.client.host`` when no trusted proxies are
|
|
configured or the direct peer is not trusted.
|
|
"""
|
|
peer_ip = request.client.host if request.client else "unknown"
|
|
|
|
if not _trusted_networks:
|
|
return peer_ip
|
|
|
|
if not _is_trusted(peer_ip, _trusted_networks):
|
|
return peer_ip
|
|
|
|
# Prefer standardized Forwarded header (RFC 7239).
|
|
forwarded = request.headers.get("forwarded")
|
|
if forwarded:
|
|
client_ip = _extract_from_forwarded(forwarded)
|
|
if client_ip:
|
|
return client_ip
|
|
|
|
# Fall back to de-facto X-Forwarded-For.
|
|
xff = request.headers.get("x-forwarded-for")
|
|
if xff:
|
|
client_ip = _extract_from_x_forwarded_for(xff)
|
|
if client_ip:
|
|
return client_ip
|
|
|
|
return peer_ip
|
|
|
|
|
|
def _load_trusted_networks() -> list[IPv4Network | IPv6Network]:
|
|
"""Load trusted proxy networks from settings (called once at import)."""
|
|
from app.core.config import settings
|
|
|
|
return _parse_trusted_networks(settings.trusted_proxies)
|
|
|
|
|
|
_trusted_networks: list[IPv4Network | IPv6Network] = _load_trusted_networks()
|