Python Snippets

API Rate Limiter with Sliding Window Algorithm

import time
import threading
from collections import deque
from typing import Dict, Optional

class SlidingWindowRateLimiter:
    def __init__(self, max_requests: int, window_size: int):
        """
        Initialize the rate limiter with sliding window algorithm.
        
        Args:
            max_requests (int): Maximum number of requests allowed in the window
            window_size (int): Time window in seconds
        """
        self.max_requests = max_requests
        self.window_size = window_size
        self.requests: Dict[str, deque] = {}
        self.lock = threading.Lock()
    
    def is_allowed(self, client_id: str) -> bool:
        """
        Check if a request from a client is allowed based on rate limits.
        
        Args:
            client_id (str): Unique identifier for the client
            
        Returns:
            bool: True if request is allowed, False otherwise
        """
        current_time = time.time()
        
        with self.lock:
            # Initialize deque for new client
            if client_id not in self.requests:
                self.requests[client_id] = deque()
            
            # Remove outdated requests outside the window
            while (self.requests[client_id] and 
                   current_time - self.requests[client_id][0] > self.window_size):
                self.requests[client_id].popleft()
            
            # Check if request is allowed
            if len(self.requests[client_id]) < self.max_requests:
                self.requests[client_id].append(current_time)
                return True
            else:
                return False
    
    def get_retry_after(self, client_id: str) -> Optional[float]:
        """
        Calculate how long a client needs to wait before making another request.
        
        Args:
            client_id (str): Unique identifier for the client
            
        Returns:
            Optional[float]: Seconds to wait, or None if request would be allowed immediately
        """
        current_time = time.time()
        
        with self.lock:
            if client_id not in self.requests or not self.requests[client_id]:
                return None
            
            # Get the oldest request in window
            oldest_request = self.requests[client_id][0]
            window_reset = oldest_request + self.window_size
            
            if current_time < window_reset:
                return window_reset - current_time
            else:
                return None

# Example usage
if __name__ == "__main__":
    # Initialize rate limiter: 5 requests per 10 seconds
    limiter = SlidingWindowRateLimiter(max_requests=5, window_size=10)
    
    def simulate_requests():
        client_id = "user_123"
        
        print("Testing rate limiter with sliding window algorithm:")
        print(f"Rate limit: 5 requests per 10 seconds\n")
        
        # Simulate 8 requests
        for i in range(1, 9):
            allowed = limiter.is_allowed(client_id)
            retry_after = limiter.get_retry_after(client_id)
            
            status = "ALLOWED" if allowed else "DENIED"
            retry_info = f" (retry after {retry_after:.1f}s)" if retry_after else ""
            
            print(f"Request {i}: {status}{retry_info}")
            
            # Add delay between requests
            time.sleep(1)
    
    simulate_requests()

What This Code Does

This implementation provides a rate limiter using the sliding window algorithm, which is more precise than fixed window approaches. The rate limiter tracks requests for each client using a unique identifier and allows a configurable maximum number of requests within a specified time window.

Key features:

Why This Is Useful

Rate limiting is crucial for:

  1. Protecting APIs from abuse and denial-of-service attacks
  2. Ensuring fair usage among multiple clients
  3. Preventing system overload during traffic spikes
  4. Complying with service level agreements and usage policies

The sliding window approach is superior to fixed window algorithms because it prevents request bursts at window boundaries, providing more consistent rate limiting.

How to Run

  1. Save the code to a file (e.g., rate_limiter.py)
  2. Run directly with Python: python rate_limiter.py
  3. For integration in your application:
    limiter = SlidingWindowRateLimiter(max_requests=100, window_size=60)
       
    client_id = "api_client_456"
    if limiter.is_allowed(client_id):
        # Process the request
        pass
    else:
        retry_after = limiter.get_retry_after(client_id)
        # Return 429 Too Many Requests with retry_after header
    

The example simulates 8 requests from a single client with a limit of 5 requests per 10 seconds, demonstrating when requests are allowed or denied.