import asyncio
import aiohttp
import time
from typing import Optional, Dict, Any
import logging
class AsyncHTTPClient:
def __init__(self,
max_retries: int = 3,
retry_delay: float = 1.0,
timeout: int = 30,
max_concurrent: int = 100):
self.max_retries = max_retries
self.retry_delay = retry_delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _request_with_retry(self,
method: str,
url: str,
**kwargs) -> Optional[aiohttp.ClientResponse]:
"""Execute HTTP request with exponential backoff retry logic."""
for attempt in range(self.max_retries + 1):
try:
async with self.semaphore: # Limit concurrent requests
async with self.session.request(method, url, **kwargs) as response:
# Raise exception for bad status codes (4xx, 5xx)
response.raise_for_status()
return response
except aiohttp.ClientResponseError as e:
if attempt == self.max_retries or e.status < 500:
# Don't retry client errors (4xx) or if max retries exceeded
logging.error(f"Request failed after {attempt+1} attempts: {e}")
raise
else:
# Retry server errors (5xx) and network issues
wait_time = self.retry_delay * (2 ** attempt) # Exponential backoff
logging.warning(f"Attempt {attempt+1} failed: {e}. Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == self.max_retries:
logging.error(f"Request failed after {attempt+1} attempts: {e}")
raise
else:
wait_time = self.retry_delay * (2 ** attempt)
logging.warning(f"Attempt {attempt+1} failed: {e}. Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
return None
async def get(self, url: str, **kwargs) -> Dict[str, Any]:
"""Perform GET request and return JSON response."""
response = await self._request_with_retry("GET", url, **kwargs)
if response:
return {
"status": response.status,
"headers": dict(response.headers),
"data": await response.json() if response.content_type == "application/json" else await response.text()
}
return {"status": None, "data": None}
async def post(self, url: str, **kwargs) -> Dict[str, Any]:
"""Perform POST request and return JSON response."""
response = await self._request_with_retry("POST", url, **kwargs)
if response:
return {
"status": response.status,
"headers": dict(response.headers),
"data": await response.json() if response.content_type == "application/json" else await response.text()
}
return {"status": None, "data": None}
# Example usage
async def main():
logging.basicConfig(level=logging.INFO)
# Example 1: Simple GET request
async with AsyncHTTPClient(max_retries=2) as client:
try:
result = await client.get("https://httpbin.org/get")
print(f"GET Status: {result['status']}")
print(f"Response Data: {result['data']}")
except Exception as e:
print(f"GET Request failed: {e}")
# Example 2: POST request with JSON data
async with AsyncHTTPClient() as client:
try:
post_data = {"name": "test", "value": 123}
result = await client.post(
"https://httpbin.org/post",
json=post_data
)
print(f"POST Status: {result['status']}")
except Exception as e:
print(f"POST Request failed: {e}")
# Example 3: Concurrent requests with error handling
async with AsyncHTTPClient(max_concurrent=5) as client:
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
"https://httpbin.org/status/500", # This will trigger retries
"https://httpbin.org/get"
]
tasks = [client.get(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Request to {urls[i]} failed: {result}")
else:
print(f"Request to {urls[i]}: {result['status']}")
if __name__ == "__main__":
asyncio.run(main())
This async HTTP client provides a robust way to make HTTP requests in Python with built-in retry logic, timeout handling, and concurrency control. It features:
Retry with Exponential Backoff: Automatically retries failed requests with increasing delays between attempts, focusing on server errors (5xx) while immediately failing on client errors (4xx).
Timeout Management: Configurable request timeouts to prevent hanging connections.
Concurrency Control: Limits the number of simultaneous connections to prevent overwhelming servers or hitting resource limits.
Context Manager Support: Ensures proper resource cleanup through async context managers.
Exception Handling: Comprehensive error handling for different types of network failures.
JSON Response Parsing: Automatic handling of JSON responses with fallback to text for other content types.
This snippet solves common problems with HTTP requests in production environments:
pip install aiohttp
python async_http_client.py
The example demonstrates three use cases:
You can customize parameters like max_retries, retry_delay, timeout, and max_concurrent based on your needs. The client works with any HTTP API and handles common failure scenarios gracefully.