import time
import functools
from collections import defaultdict
from typing import Dict, Callable, Any
class RateLimiter:
def __init__(self, calls: int, period: float):
"""
Initialize rate limiter.
Args:
calls: Number of allowed calls
period: Time period in seconds
"""
self.calls = calls
self.period = period
self.call_times: Dict[str, list] = defaultdict(list)
def is_allowed(self, key: str) -> bool:
"""Check if call is allowed for given key."""
now = time.time()
# Remove old calls outside the time window
self.call_times[key] = [
call_time for call_time in self.call_times[key]
if now - call_time < self.period
]
# Check if we're under the limit
if len(self.call_times[key]) < self.calls:
self.call_times[key].append(now)
return True
return False
def wait_time(self, key: str) -> float:
"""Calculate time to wait before next allowed call."""
if not self.call_times[key]:
return 0
oldest_call = min(self.call_times[key])
now = time.time()
return max(0, self.period - (now - oldest_call))
def rate_limit(calls: int, period: float, key_func: Callable = None):
"""
Decorator to rate limit function calls.
Args:
calls: Number of allowed calls per period
period: Time period in seconds
key_func: Function to generate key from args (defaults to function name)
"""
limiter = RateLimiter(calls, period)
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# Generate key for this call
if key_func:
key = key_func(*args, **kwargs)
else:
key = func.__name__
# Check if call is allowed
if not limiter.is_allowed(key):
wait = limiter.wait_time(key)
raise Exception(f"Rate limit exceeded. Wait {wait:.2f} seconds.")
return func(*args, **kwargs)
return wrapper
return decorator
# Example usage with different rate limiting strategies
@rate_limit(calls=5, period=60) # 5 calls per minute
def api_call(endpoint: str) -> dict:
"""Simulate an API call."""
print(f"Calling API endpoint: {endpoint}")
return {"status": "success", "endpoint": endpoint}
@rate_limit(calls=10, period=1, key_func=lambda user_id, *args, **kwargs: f"user_{user_id}")
def process_user_data(user_id: int, data: dict) -> str:
"""Process user data with per-user rate limiting."""
print(f"Processing data for user {user_id}")
return f"Processed for user {user_id}"
# Example with custom key based on multiple parameters
def generate_key(resource_type: str, user_id: int, *args, **kwargs) -> str:
return f"{resource_type}:{user_id}"
@rate_limit(calls=3, period=10, key_func=generate_key)
def access_resource(resource_type: str, user_id: int) -> dict:
"""Access a resource with composite key rate limiting."""
print(f"Accessing {resource_type} for user {user_id}")
return {"resource": resource_type, "user": user_id}
# Example usage
if __name__ == "__main__":
# Test basic rate limiting
try:
for i in range(7):
result = api_call(f"/data/{i}")
print(f"API Result: {result}")
time.sleep(0.1) # Small delay
except Exception as e:
print(f"Error: {e}")
print("\n" + "="*50 + "\n")
# Test per-user rate limiting
try:
for i in range(15):
user_id = i % 3 # Cycle through 3 users
result = process_user_data(user_id, {"value": i})
print(f"Processing Result: {result}")
time.sleep(0.05)
except Exception as e:
print(f"Error: {e}")
This snippet implements a flexible rate limiting system using Python decorators. It prevents functions from being called too frequently, which is crucial for:
The implementation features:
@rate_limit(calls=10, period=60) limits to 10 calls per minutekey_func to generate keys based on user identifiersRate limiting is essential in production applications to:
The decorator approach makes it easy to add rate limiting to existing functions without changing their implementation, promoting clean separation of concerns.