DeepBoner / docs /future-roadmap /phases /17_PHASE_RATE_LIMITING.md
VibecoderMcSwaggins's picture
docs: reorganize documentation structure for clarity
631e5fc
# Phase 17: Rate Limiting with `limits` Library
**Priority**: P0 CRITICAL - Prevents API blocks
**Effort**: ~1 hour
**Dependencies**: None
---
## CRITICAL: Async Safety Requirements
**WARNING**: The rate limiter MUST be async-safe. Blocking the event loop will freeze:
- The Gradio UI
- All parallel searches
- The orchestrator
**Rules**:
1. **NEVER use `time.sleep()`** - Always use `await asyncio.sleep()`
2. **NEVER use blocking while loops** - Use async-aware polling
3. **The `limits` library check is synchronous** - Wrap it carefully
The implementation below uses a polling pattern that:
- Checks the limit (synchronous, fast)
- If exceeded, `await asyncio.sleep()` (non-blocking)
- Retry the check
**Alternative**: If `limits` proves problematic, use `aiolimiter` which is pure-async.
---
## Overview
Replace naive `asyncio.sleep` rate limiting with proper rate limiter using the `limits` library, which provides:
- Moving window rate limiting
- Per-API configurable limits
- Thread-safe storage
- Already used in reference repo
**Why This Matters?**
- NCBI will block us without proper rate limiting (3/sec without key, 10/sec with)
- Current implementation only has simple sleep delay
- Need coordinated limits across all PubMed calls
- Professional-grade rate limiting prevents production issues
---
## Current State
### What We Have (`src/tools/pubmed.py:20-21, 34-41`)
```python
RATE_LIMIT_DELAY = 0.34 # ~3 requests/sec without API key
async def _rate_limit(self) -> None:
"""Enforce NCBI rate limiting."""
loop = asyncio.get_running_loop()
now = loop.time()
elapsed = now - self._last_request_time
if elapsed < self.RATE_LIMIT_DELAY:
await asyncio.sleep(self.RATE_LIMIT_DELAY - elapsed)
self._last_request_time = loop.time()
```
### Problems
1. **Not shared across instances**: Each `PubMedTool()` has its own counter
2. **Simple delay vs moving window**: Doesn't handle bursts properly
3. **Hardcoded rate**: Doesn't adapt to API key presence
4. **No backoff on 429**: Just retries blindly
---
## TDD Implementation Plan
### Step 1: Add Dependency
**File**: `pyproject.toml`
```toml
dependencies = [
# ... existing deps ...
"limits>=3.0",
]
```
Then run:
```bash
uv sync
```
---
### Step 2: Write the Tests First
**File**: `tests/unit/tools/test_rate_limiting.py`
```python
"""Tests for rate limiting functionality."""
import asyncio
import time
import pytest
from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter
class TestRateLimiter:
"""Test suite for rate limiter."""
def test_create_limiter_without_api_key(self) -> None:
"""Should create 3/sec limiter without API key."""
limiter = RateLimiter(rate="3/second")
assert limiter.rate == "3/second"
def test_create_limiter_with_api_key(self) -> None:
"""Should create 10/sec limiter with API key."""
limiter = RateLimiter(rate="10/second")
assert limiter.rate == "10/second"
@pytest.mark.asyncio
async def test_limiter_allows_requests_under_limit(self) -> None:
"""Should allow requests under the rate limit."""
limiter = RateLimiter(rate="10/second")
# 3 requests should all succeed immediately
for _ in range(3):
allowed = await limiter.acquire()
assert allowed is True
@pytest.mark.asyncio
async def test_limiter_blocks_when_exceeded(self) -> None:
"""Should wait when rate limit exceeded."""
limiter = RateLimiter(rate="2/second")
# First 2 should be instant
await limiter.acquire()
await limiter.acquire()
# Third should block briefly
start = time.monotonic()
await limiter.acquire()
elapsed = time.monotonic() - start
# Should have waited ~0.5 seconds (half second window for 2/sec)
assert elapsed >= 0.3
@pytest.mark.asyncio
async def test_limiter_resets_after_window(self) -> None:
"""Rate limit should reset after time window."""
limiter = RateLimiter(rate="5/second")
# Use up the limit
for _ in range(5):
await limiter.acquire()
# Wait for window to pass
await asyncio.sleep(1.1)
# Should be allowed again
start = time.monotonic()
await limiter.acquire()
elapsed = time.monotonic() - start
assert elapsed < 0.1 # Should be nearly instant
class TestGetPubmedLimiter:
"""Test PubMed-specific limiter factory."""
def test_limiter_without_api_key(self) -> None:
"""Should return 3/sec limiter without key."""
limiter = get_pubmed_limiter(api_key=None)
assert "3" in limiter.rate
def test_limiter_with_api_key(self) -> None:
"""Should return 10/sec limiter with key."""
limiter = get_pubmed_limiter(api_key="my-api-key")
assert "10" in limiter.rate
def test_limiter_is_singleton(self) -> None:
"""Same API key should return same limiter instance."""
limiter1 = get_pubmed_limiter(api_key="key1")
limiter2 = get_pubmed_limiter(api_key="key1")
assert limiter1 is limiter2
def test_different_keys_different_limiters(self) -> None:
"""Different API keys should return different limiters."""
limiter1 = get_pubmed_limiter(api_key="key1")
limiter2 = get_pubmed_limiter(api_key="key2")
# Clear cache for clean test
# Actually, different keys SHOULD share the same limiter
# since we're limiting against the same API
assert limiter1 is limiter2 # Shared NCBI rate limit
```
---
### Step 3: Create Rate Limiter Module
**File**: `src/tools/rate_limiter.py`
```python
"""Rate limiting utilities using the limits library."""
import asyncio
from typing import ClassVar
from limits import RateLimitItem, parse
from limits.storage import MemoryStorage
from limits.strategies import MovingWindowRateLimiter
class RateLimiter:
"""
Async-compatible rate limiter using limits library.
Uses moving window algorithm for smooth rate limiting.
"""
def __init__(self, rate: str) -> None:
"""
Initialize rate limiter.
Args:
rate: Rate string like "3/second" or "10/second"
"""
self.rate = rate
self._storage = MemoryStorage()
self._limiter = MovingWindowRateLimiter(self._storage)
self._rate_limit: RateLimitItem = parse(rate)
self._identity = "default" # Single identity for shared limiting
async def acquire(self, wait: bool = True) -> bool:
"""
Acquire permission to make a request.
ASYNC-SAFE: Uses asyncio.sleep(), never time.sleep().
The polling pattern allows other coroutines to run while waiting.
Args:
wait: If True, wait until allowed. If False, return immediately.
Returns:
True if allowed, False if not (only when wait=False)
"""
while True:
# Check if we can proceed (synchronous, fast - ~microseconds)
if self._limiter.hit(self._rate_limit, self._identity):
return True
if not wait:
return False
# CRITICAL: Use asyncio.sleep(), NOT time.sleep()
# This yields control to the event loop, allowing other
# coroutines (UI, parallel searches) to run
await asyncio.sleep(0.1)
def reset(self) -> None:
"""Reset the rate limiter (for testing)."""
self._storage.reset()
# Singleton limiter for PubMed/NCBI
_pubmed_limiter: RateLimiter | None = None
def get_pubmed_limiter(api_key: str | None = None) -> RateLimiter:
"""
Get the shared PubMed rate limiter.
Rate depends on whether API key is provided:
- Without key: 3 requests/second
- With key: 10 requests/second
Args:
api_key: NCBI API key (optional)
Returns:
Shared RateLimiter instance
"""
global _pubmed_limiter
if _pubmed_limiter is None:
rate = "10/second" if api_key else "3/second"
_pubmed_limiter = RateLimiter(rate)
return _pubmed_limiter
def reset_pubmed_limiter() -> None:
"""Reset the PubMed limiter (for testing)."""
global _pubmed_limiter
_pubmed_limiter = None
# Factory for other APIs
class RateLimiterFactory:
"""Factory for creating/getting rate limiters for different APIs."""
_limiters: ClassVar[dict[str, RateLimiter]] = {}
@classmethod
def get(cls, api_name: str, rate: str) -> RateLimiter:
"""
Get or create a rate limiter for an API.
Args:
api_name: Unique identifier for the API
rate: Rate limit string (e.g., "10/second")
Returns:
RateLimiter instance (shared for same api_name)
"""
if api_name not in cls._limiters:
cls._limiters[api_name] = RateLimiter(rate)
return cls._limiters[api_name]
@classmethod
def reset_all(cls) -> None:
"""Reset all limiters (for testing)."""
cls._limiters.clear()
```
---
### Step 4: Update PubMed Tool
**File**: `src/tools/pubmed.py` (replace rate limiting code)
```python
# Replace imports and rate limiting
from src.tools.rate_limiter import get_pubmed_limiter
class PubMedTool:
"""Search tool for PubMed/NCBI."""
BASE_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
HTTP_TOO_MANY_REQUESTS = 429
def __init__(self, api_key: str | None = None) -> None:
self.api_key = api_key or settings.ncbi_api_key
if self.api_key == "your-ncbi-key-here":
self.api_key = None
# Use shared rate limiter
self._limiter = get_pubmed_limiter(self.api_key)
async def _rate_limit(self) -> None:
"""Enforce NCBI rate limiting using shared limiter."""
await self._limiter.acquire()
# ... rest of class unchanged ...
```
---
### Step 5: Add Rate Limiters for Other APIs
**File**: `src/tools/clinicaltrials.py` (optional)
```python
from src.tools.rate_limiter import RateLimiterFactory
class ClinicalTrialsTool:
def __init__(self) -> None:
# ClinicalTrials.gov doesn't document limits, but be conservative
self._limiter = RateLimiterFactory.get("clinicaltrials", "5/second")
async def search(self, query: str, max_results: int = 10) -> list[Evidence]:
await self._limiter.acquire()
# ... rest of method ...
```
**File**: `src/tools/europepmc.py` (optional)
```python
from src.tools.rate_limiter import RateLimiterFactory
class EuropePMCTool:
def __init__(self) -> None:
# Europe PMC is generous, but still be respectful
self._limiter = RateLimiterFactory.get("europepmc", "10/second")
async def search(self, query: str, max_results: int = 10) -> list[Evidence]:
await self._limiter.acquire()
# ... rest of method ...
```
---
## Demo Script
**File**: `examples/rate_limiting_demo.py`
```python
#!/usr/bin/env python3
"""Demo script to verify rate limiting works correctly."""
import asyncio
import time
from src.tools.rate_limiter import RateLimiter, get_pubmed_limiter, reset_pubmed_limiter
from src.tools.pubmed import PubMedTool
async def test_basic_limiter():
"""Test basic rate limiter behavior."""
print("=" * 60)
print("Rate Limiting Demo")
print("=" * 60)
# Test 1: Basic limiter
print("\n[Test 1] Testing 3/second limiter...")
limiter = RateLimiter("3/second")
start = time.monotonic()
for i in range(6):
await limiter.acquire()
elapsed = time.monotonic() - start
print(f" Request {i+1} at {elapsed:.2f}s")
total = time.monotonic() - start
print(f" Total time for 6 requests: {total:.2f}s (expected ~2s)")
async def test_pubmed_limiter():
"""Test PubMed-specific limiter."""
print("\n[Test 2] Testing PubMed limiter (shared)...")
reset_pubmed_limiter() # Clean state
# Without API key: 3/sec
limiter = get_pubmed_limiter(api_key=None)
print(f" Rate without key: {limiter.rate}")
# Multiple tools should share the same limiter
tool1 = PubMedTool()
tool2 = PubMedTool()
# Verify they share the limiter
print(f" Tools share limiter: {tool1._limiter is tool2._limiter}")
async def test_concurrent_requests():
"""Test rate limiting under concurrent load."""
print("\n[Test 3] Testing concurrent request limiting...")
limiter = RateLimiter("5/second")
async def make_request(i: int):
await limiter.acquire()
return time.monotonic()
start = time.monotonic()
# Launch 10 concurrent requests
tasks = [make_request(i) for i in range(10)]
times = await asyncio.gather(*tasks)
# Calculate distribution
relative_times = [t - start for t in times]
print(f" Request times: {[f'{t:.2f}s' for t in sorted(relative_times)]}")
total = max(relative_times)
print(f" All 10 requests completed in {total:.2f}s (expected ~2s)")
async def main():
await test_basic_limiter()
await test_pubmed_limiter()
await test_concurrent_requests()
print("\n" + "=" * 60)
print("Demo complete!")
if __name__ == "__main__":
asyncio.run(main())
```
---
## Verification Checklist
### Unit Tests
```bash
# Run rate limiting tests
uv run pytest tests/unit/tools/test_rate_limiting.py -v
# Expected: All tests pass
```
### Integration Test (Manual)
```bash
# Run demo
uv run python examples/rate_limiting_demo.py
# Expected: Requests properly spaced
```
### Full Test Suite
```bash
make check
# Expected: All tests pass, mypy clean
```
---
## Success Criteria
1. **`limits` library installed**: Dependency added to pyproject.toml
2. **RateLimiter class works**: Can create and use limiters
3. **PubMed uses new limiter**: Shared limiter across instances
4. **Rate adapts to API key**: 3/sec without, 10/sec with
5. **Concurrent requests handled**: Multiple async requests properly queued
6. **No regressions**: All existing tests pass
---
## API Rate Limit Reference
| API | Without Key | With Key |
|-----|-------------|----------|
| PubMed/NCBI | 3/sec | 10/sec |
| ClinicalTrials.gov | Undocumented (~5/sec safe) | N/A |
| Europe PMC | ~10-20/sec (generous) | N/A |
| OpenAlex | ~100k/day (no per-sec limit) | Faster with `mailto` |
---
## Notes
- `limits` library uses moving window algorithm (fairer than fixed window)
- Singleton pattern ensures all PubMed calls share the limit
- The factory pattern allows easy extension to other APIs
- Consider adding 429 response detection + exponential backoff
- In production, consider Redis storage for distributed rate limiting