186 lines
6.2 KiB
Python
186 lines
6.2 KiB
Python
"""Tests for trusted client-IP extraction."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from unittest.mock import patch
|
|
|
|
from app.core.client_ip import (
|
|
_extract_from_forwarded,
|
|
_extract_from_x_forwarded_for,
|
|
_parse_trusted_networks,
|
|
_strip_port,
|
|
get_client_ip,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _FakeClient:
|
|
def __init__(self, host: str) -> None:
|
|
self.host = host
|
|
|
|
|
|
class _FakeRequest:
|
|
def __init__(self, peer_ip: str, headers: dict[str, str] | None = None) -> None:
|
|
self.client = _FakeClient(peer_ip)
|
|
self._headers = headers or {}
|
|
|
|
@property
|
|
def headers(self) -> dict[str, str]:
|
|
return self._headers
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unit tests for internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_strip_port_ipv4() -> None:
|
|
assert _strip_port("1.2.3.4:8080") == "1.2.3.4"
|
|
|
|
|
|
def test_strip_port_ipv4_no_port() -> None:
|
|
assert _strip_port("1.2.3.4") == "1.2.3.4"
|
|
|
|
|
|
def test_strip_port_ipv6_bracketed_with_port() -> None:
|
|
assert _strip_port("[::1]:8080") == "::1"
|
|
|
|
|
|
def test_strip_port_ipv6_bracketed_no_port() -> None:
|
|
assert _strip_port("[::1]") == "::1"
|
|
|
|
|
|
def test_extract_forwarded_simple() -> None:
|
|
assert _extract_from_forwarded("for=192.0.2.60") == "192.0.2.60"
|
|
|
|
|
|
def test_extract_forwarded_quoted_with_port() -> None:
|
|
assert _extract_from_forwarded('for="192.0.2.60:8080"') == "192.0.2.60"
|
|
|
|
|
|
def test_extract_forwarded_ipv6() -> None:
|
|
assert _extract_from_forwarded('for="[2001:db8::1]"') == "2001:db8::1"
|
|
|
|
|
|
def test_extract_forwarded_multiple_takes_first() -> None:
|
|
assert _extract_from_forwarded("for=203.0.113.50, for=198.51.100.1") == "203.0.113.50"
|
|
|
|
|
|
def test_extract_forwarded_with_other_directives() -> None:
|
|
assert _extract_from_forwarded("for=192.0.2.43;proto=https;by=203.0.113.60") == "192.0.2.43"
|
|
|
|
|
|
def test_extract_forwarded_empty() -> None:
|
|
assert _extract_from_forwarded("proto=https") is None
|
|
|
|
|
|
def test_extract_xff_simple() -> None:
|
|
assert _extract_from_x_forwarded_for("203.0.113.50") == "203.0.113.50"
|
|
|
|
|
|
def test_extract_xff_multiple_takes_first() -> None:
|
|
assert _extract_from_x_forwarded_for("203.0.113.50, 198.51.100.1, 10.0.0.1") == "203.0.113.50"
|
|
|
|
|
|
def test_extract_xff_empty() -> None:
|
|
assert _extract_from_x_forwarded_for("") is None
|
|
|
|
|
|
def test_parse_trusted_networks_valid() -> None:
|
|
nets = _parse_trusted_networks("127.0.0.1, 10.0.0.0/8, ::1")
|
|
assert len(nets) == 3
|
|
|
|
|
|
def test_parse_trusted_networks_empty() -> None:
|
|
assert _parse_trusted_networks("") == []
|
|
|
|
|
|
def test_parse_trusted_networks_ignores_invalid() -> None:
|
|
nets = _parse_trusted_networks("127.0.0.1, not-an-ip, 10.0.0.0/8")
|
|
assert len(nets) == 2
|
|
|
|
|
|
def test_parse_trusted_networks_does_not_log_invalid_value(caplog) -> None:
|
|
with caplog.at_level(logging.WARNING, logger="app.core.client_ip"):
|
|
_parse_trusted_networks("127.0.0.1, not-an-ip, 10.0.0.0/8")
|
|
|
|
assert "trusted_proxies: ignoring invalid entry in configuration" in caplog.text
|
|
assert "not-an-ip" not in caplog.text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Integration tests for get_client_ip
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_returns_peer_ip_when_no_trusted_proxies() -> None:
|
|
req = _FakeRequest("10.0.0.1", {"x-forwarded-for": "203.0.113.50"})
|
|
with patch("app.core.client_ip._trusted_networks", []):
|
|
assert get_client_ip(req) == "10.0.0.1" # type: ignore[arg-type]
|
|
|
|
|
|
def test_returns_peer_ip_when_peer_not_trusted() -> None:
|
|
nets = _parse_trusted_networks("172.16.0.0/12")
|
|
req = _FakeRequest("10.0.0.1", {"x-forwarded-for": "203.0.113.50"})
|
|
with patch("app.core.client_ip._trusted_networks", nets):
|
|
assert get_client_ip(req) == "10.0.0.1" # type: ignore[arg-type]
|
|
|
|
|
|
def test_extracts_from_x_forwarded_for() -> None:
|
|
nets = _parse_trusted_networks("10.0.0.1")
|
|
req = _FakeRequest("10.0.0.1", {"x-forwarded-for": "203.0.113.50, 10.0.0.1"})
|
|
with patch("app.core.client_ip._trusted_networks", nets):
|
|
assert get_client_ip(req) == "203.0.113.50" # type: ignore[arg-type]
|
|
|
|
|
|
def test_extracts_from_forwarded_header() -> None:
|
|
nets = _parse_trusted_networks("10.0.0.1")
|
|
req = _FakeRequest("10.0.0.1", {"forwarded": "for=203.0.113.50;proto=https"})
|
|
with patch("app.core.client_ip._trusted_networks", nets):
|
|
assert get_client_ip(req) == "203.0.113.50" # type: ignore[arg-type]
|
|
|
|
|
|
def test_forwarded_takes_precedence_over_xff() -> None:
|
|
nets = _parse_trusted_networks("10.0.0.1")
|
|
req = _FakeRequest(
|
|
"10.0.0.1",
|
|
{
|
|
"forwarded": "for=198.51.100.1",
|
|
"x-forwarded-for": "203.0.113.50",
|
|
},
|
|
)
|
|
with patch("app.core.client_ip._trusted_networks", nets):
|
|
assert get_client_ip(req) == "198.51.100.1" # type: ignore[arg-type]
|
|
|
|
|
|
def test_returns_peer_when_headers_empty() -> None:
|
|
nets = _parse_trusted_networks("10.0.0.1")
|
|
req = _FakeRequest("10.0.0.1", {})
|
|
with patch("app.core.client_ip._trusted_networks", nets):
|
|
assert get_client_ip(req) == "10.0.0.1" # type: ignore[arg-type]
|
|
|
|
|
|
def test_cidr_matching() -> None:
|
|
nets = _parse_trusted_networks("10.0.0.0/8")
|
|
req = _FakeRequest("10.255.0.1", {"x-forwarded-for": "203.0.113.50"})
|
|
with patch("app.core.client_ip._trusted_networks", nets):
|
|
assert get_client_ip(req) == "203.0.113.50" # type: ignore[arg-type]
|
|
|
|
|
|
def test_strips_port_from_forwarded() -> None:
|
|
nets = _parse_trusted_networks("10.0.0.1")
|
|
req = _FakeRequest("10.0.0.1", {"forwarded": 'for="192.0.2.60:8080"'})
|
|
with patch("app.core.client_ip._trusted_networks", nets):
|
|
assert get_client_ip(req) == "192.0.2.60" # type: ignore[arg-type]
|
|
|
|
|
|
def test_strips_port_from_forwarded_ipv6() -> None:
|
|
nets = _parse_trusted_networks("10.0.0.1")
|
|
req = _FakeRequest("10.0.0.1", {"forwarded": 'for="[2001:db8::1]:9090"'})
|
|
with patch("app.core.client_ip._trusted_networks", nets):
|
|
assert get_client_ip(req) == "2001:db8::1" # type: ignore[arg-type]
|