Files
openclaw-mission-control/backend/tests/test_rate_limit.py
Hugh Brown 4960d8561b security: fix fail-open auth, streaming payload limit, and rate limiter memory leak
- agent.py: Fail closed when gateway lookup returns None instead of
  silently dropping the organization filter (cross-tenant board leak)
- board_webhooks.py: Read request body via streaming chunks so an
  oversized payload is rejected before it is fully loaded into memory
- rate_limit.py: Add periodic sweep of expired keys to prevent
  unbounded memory growth from inactive clients
- test_rate_limit.py: Add test for the new sweep behavior

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 23:35:10 +05:30

72 lines
2.5 KiB
Python

"""Tests for the in-memory rate limiter."""
from __future__ import annotations
import time
from unittest.mock import patch
from app.core.rate_limit import InMemoryRateLimiter
def test_allows_requests_within_limit() -> None:
limiter = InMemoryRateLimiter(max_requests=5, window_seconds=60.0)
for _ in range(5):
assert limiter.is_allowed("client-a") is True
def test_blocks_requests_over_limit() -> None:
limiter = InMemoryRateLimiter(max_requests=3, window_seconds=60.0)
for _ in range(3):
assert limiter.is_allowed("client-a") is True
assert limiter.is_allowed("client-a") is False
assert limiter.is_allowed("client-a") is False
def test_separate_keys_have_independent_limits() -> None:
limiter = InMemoryRateLimiter(max_requests=2, window_seconds=60.0)
assert limiter.is_allowed("client-a") is True
assert limiter.is_allowed("client-a") is True
assert limiter.is_allowed("client-a") is False
# Different key still allowed
assert limiter.is_allowed("client-b") is True
assert limiter.is_allowed("client-b") is True
assert limiter.is_allowed("client-b") is False
def test_window_expiry_resets_limit() -> None:
limiter = InMemoryRateLimiter(max_requests=2, window_seconds=1.0)
assert limiter.is_allowed("client-a") is True
assert limiter.is_allowed("client-a") is True
assert limiter.is_allowed("client-a") is False
# Simulate time passing beyond the window
future = time.monotonic() + 2.0
with patch("time.monotonic", return_value=future):
assert limiter.is_allowed("client-a") is True
def test_sweep_removes_expired_keys() -> None:
"""Keys whose timestamps have all expired should be evicted during periodic sweep."""
from app.core.rate_limit import _CLEANUP_INTERVAL
limiter = InMemoryRateLimiter(max_requests=100, window_seconds=1.0)
# Fill with many unique IPs
for i in range(10):
limiter.is_allowed(f"stale-{i}")
assert len(limiter._buckets) == 10
# Advance time so all timestamps expire, then trigger enough calls to
# hit the cleanup interval.
future = time.monotonic() + 2.0
with patch("time.monotonic", return_value=future):
# Drive the call count up to a multiple of _CLEANUP_INTERVAL
remaining = _CLEANUP_INTERVAL - (limiter._call_count % _CLEANUP_INTERVAL)
for i in range(remaining):
limiter.is_allowed("trigger-sweep")
# Stale keys should have been swept; only "trigger-sweep" should remain
assert "stale-0" not in limiter._buckets
assert "trigger-sweep" in limiter._buckets