Agent token auth performed O(n) PBKDF2 operations per request with no rate limiting, enabling CPU exhaustion attacks. Webhook ingest had no rate limits either. Add an in-memory token-bucket rate limiter: - Agent auth: 20 requests/minute per IP - Webhook ingest: 60 requests/minute per IP Includes unit tests for the rate limiter. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
46 lines
1.6 KiB
Python
46 lines
1.6 KiB
Python
"""Tests for the in-memory rate limiter."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from unittest.mock import patch
|
|
|
|
from app.core.rate_limit import InMemoryRateLimiter
|
|
|
|
|
|
def test_allows_requests_within_limit() -> None:
|
|
limiter = InMemoryRateLimiter(max_requests=5, window_seconds=60.0)
|
|
for _ in range(5):
|
|
assert limiter.is_allowed("client-a") is True
|
|
|
|
|
|
def test_blocks_requests_over_limit() -> None:
|
|
limiter = InMemoryRateLimiter(max_requests=3, window_seconds=60.0)
|
|
for _ in range(3):
|
|
assert limiter.is_allowed("client-a") is True
|
|
assert limiter.is_allowed("client-a") is False
|
|
assert limiter.is_allowed("client-a") is False
|
|
|
|
|
|
def test_separate_keys_have_independent_limits() -> None:
|
|
limiter = InMemoryRateLimiter(max_requests=2, window_seconds=60.0)
|
|
assert limiter.is_allowed("client-a") is True
|
|
assert limiter.is_allowed("client-a") is True
|
|
assert limiter.is_allowed("client-a") is False
|
|
# Different key still allowed
|
|
assert limiter.is_allowed("client-b") is True
|
|
assert limiter.is_allowed("client-b") is True
|
|
assert limiter.is_allowed("client-b") is False
|
|
|
|
|
|
def test_window_expiry_resets_limit() -> None:
|
|
limiter = InMemoryRateLimiter(max_requests=2, window_seconds=1.0)
|
|
assert limiter.is_allowed("client-a") is True
|
|
assert limiter.is_allowed("client-a") is True
|
|
assert limiter.is_allowed("client-a") is False
|
|
|
|
# Simulate time passing beyond the window
|
|
future = time.monotonic() + 2.0
|
|
with patch("time.monotonic", return_value=future):
|
|
assert limiter.is_allowed("client-a") is True
|