This snippet demonstrates a modern asynchronous web scraper that respects rate limits, handles errors gracefully, and exports data to multiple formats. It’s particularly useful for collecting data from APIs or websites while being respectful of server resources.
import asyncio
import aiohttp
import json
import csv
from datetime import datetime
from typing import List, Dict, Any
from dataclasses import dataclass, asdict
from urllib.parse import urljoin, urlparse
import time
@dataclass
class ScrapedData:
url: str
title: str
status_code: int
timestamp: str
content_length: int
class RateLimiter:
def __init__(self, max_requests: int, time_window: float):
self.max_requests = max_requests
self.time_window = time_window
self.requests = []
async def acquire(self):
now = time.time()
# Remove requests outside the time window
self.requests = [req_time for req_time in self.requests if now - req_time < self.time_window]
if len(self.requests) >= self.max_requests:
# Wait until we can make another request
sleep_time = self.time_window - (now - self.requests[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.requests.append(now)
class AsyncWebScraper:
def __init__(self, max_concurrent: int = 10, rate_limit: int = 5, time_window: float = 1.0):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.rate_limiter = RateLimiter(rate_limit, time_window)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_page(self, url: str) -> ScrapedData:
await self.rate_limiter.acquire()
async with self.semaphore:
try:
async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
# For HTML pages, we'd parse with BeautifulSoup here
# For this example, we'll just get basic info
content = await response.text()
return ScrapedData(
url=url,
title=f"Page at {urlparse(url).netloc}",
status_code=response.status,
timestamp=datetime.now().isoformat(),
content_length=len(content)
)
except Exception as e:
return ScrapedData(
url=url,
title=f"Error: {str(e)}",
status_code=0,
timestamp=datetime.now().isoformat(),
content_length=0
)
async def scrape_urls(self, urls: List[str]) -> List[ScrapedData]:
tasks = [self.fetch_page(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
return [r for r in results if not isinstance(r, Exception)]
def export_to_json(data: List[ScrapedData], filename: str):
with open(filename, 'w', encoding='utf-8') as f:
json.dump([asdict(item) for item in data], f, indent=2, ensure_ascii=False)
def export_to_csv(data: List[ScrapedData], filename: str):
if not data:
return
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=data[0].__dataclass_fields__.keys())
writer.writeheader()
for item in data:
writer.writerow(asdict(item))
# Example usage
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/json",
"https://httpbin.org/xml"
]
# Scrape with 3 concurrent connections, max 2 requests per second
async with AsyncWebScraper(max_concurrent=3, rate_limit=2, time_window=1.0) as scraper:
print("Starting scrape...")
results = await scraper.scrape_urls(urls)
print(f"Completed scraping {len(results)} URLs")
# Export results
export_to_json(results, "scraped_data.json")
export_to_csv(results, "scraped_data.csv")
# Print results
for result in results:
print(f"{result.status_code} - {result.url} ({result.content_length} bytes)")
if __name__ == "__main__":
asyncio.run(main())
This asynchronous web scraper provides a robust framework for collecting data from multiple URLs while respecting server resources and handling errors gracefully. Here’s what makes it special:
Asynchronous Processing: Utilizes aiohttp and asyncio to fetch multiple URLs concurrently, dramatically reducing total scraping time compared to synchronous approaches.
Rate Limiting: Implements a token-bucket style rate limiter that prevents overwhelming target servers. You can configure how many requests are allowed per time window.
Concurrency Control: Uses semaphores to limit the maximum number of simultaneous connections, preventing resource exhaustion.
Error Handling: Gracefully handles network errors, timeouts, and other exceptions without stopping the entire scraping process.
Data Export: Automatically exports results to both JSON and CSV formats for easy integration with other tools.
Resource Management: Properly manages HTTP connections using async context managers.
pip install aiohttp
Save the code to a file (e.g., scraper.py)
python scraper.py
scraped_data.json and scraped_data.csv files for resultsTo scrape your own URLs:
urls list in the main() function with your target URLsmax_concurrent and rate limit parameters based on the target server’s capacityScrapedData class and fetch_page method to extract the specific data you need (e.g., using BeautifulSoup for HTML parsing)This scraper is particularly useful for monitoring websites, collecting API data, or performing competitive analysis while being respectful of server resources.