import time
import threading
from collections import deque
from typing import Dict, Optional
class SlidingWindowRateLimiter:
def __init__(self, max_requests: int, window_size: int):
"""
Initialize the rate limiter with sliding window algorithm.
Args:
max_requests (int): Maximum number of requests allowed in the window
window_size (int): Time window in seconds
"""
self.max_requests = max_requests
self.window_size = window_size
self.requests: Dict[str, deque] = {}
self.lock = threading.Lock()
def is_allowed(self, client_id: str) -> bool:
"""
Check if a request from a client is allowed based on rate limits.
Args:
client_id (str): Unique identifier for the client
Returns:
bool: True if request is allowed, False otherwise
"""
current_time = time.time()
with self.lock:
# Initialize deque for new client
if client_id not in self.requests:
self.requests[client_id] = deque()
# Remove outdated requests outside the window
while (self.requests[client_id] and
current_time - self.requests[client_id][0] > self.window_size):
self.requests[client_id].popleft()
# Check if request is allowed
if len(self.requests[client_id]) < self.max_requests:
self.requests[client_id].append(current_time)
return True
else:
return False
def get_retry_after(self, client_id: str) -> Optional[float]:
"""
Calculate how long a client needs to wait before making another request.
Args:
client_id (str): Unique identifier for the client
Returns:
Optional[float]: Seconds to wait, or None if request would be allowed immediately
"""
current_time = time.time()
with self.lock:
if client_id not in self.requests or not self.requests[client_id]:
return None
# Get the oldest request in window
oldest_request = self.requests[client_id][0]
window_reset = oldest_request + self.window_size
if current_time < window_reset:
return window_reset - current_time
else:
return None
# Example usage
if __name__ == "__main__":
# Initialize rate limiter: 5 requests per 10 seconds
limiter = SlidingWindowRateLimiter(max_requests=5, window_size=10)
def simulate_requests():
client_id = "user_123"
print("Testing rate limiter with sliding window algorithm:")
print(f"Rate limit: 5 requests per 10 seconds\n")
# Simulate 8 requests
for i in range(1, 9):
allowed = limiter.is_allowed(client_id)
retry_after = limiter.get_retry_after(client_id)
status = "ALLOWED" if allowed else "DENIED"
retry_info = f" (retry after {retry_after:.1f}s)" if retry_after else ""
print(f"Request {i}: {status}{retry_info}")
# Add delay between requests
time.sleep(1)
simulate_requests()
This implementation provides a rate limiter using the sliding window algorithm, which is more precise than fixed window approaches. The rate limiter tracks requests for each client using a unique identifier and allows a configurable maximum number of requests within a specified time window.
Key features:
Rate limiting is crucial for:
The sliding window approach is superior to fixed window algorithms because it prevents request bursts at window boundaries, providing more consistent rate limiting.
rate_limiter.py)python rate_limiter.pylimiter = SlidingWindowRateLimiter(max_requests=100, window_size=60)
client_id = "api_client_456"
if limiter.is_allowed(client_id):
# Process the request
pass
else:
retry_after = limiter.get_retry_after(client_id)
# Return 429 Too Many Requests with retry_after header
The example simulates 8 requests from a single client with a limit of 5 requests per 10 seconds, demonstrating when requests are allowed or denied.