Python Snippets

Asynchronous Web Scraper with Rate Limiting and Data Export

This snippet demonstrates a modern asynchronous web scraper that respects rate limits, handles errors gracefully, and exports data to multiple formats. It’s particularly useful for collecting data from APIs or websites while being respectful of server resources.

import asyncio
import aiohttp
import json
import csv
from datetime import datetime
from typing import List, Dict, Any
from dataclasses import dataclass, asdict
from urllib.parse import urljoin, urlparse
import time

@dataclass
class ScrapedData:
    url: str
    title: str
    status_code: int
    timestamp: str
    content_length: int

class RateLimiter:
    def __init__(self, max_requests: int, time_window: float):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = []
    
    async def acquire(self):
        now = time.time()
        # Remove requests outside the time window
        self.requests = [req_time for req_time in self.requests if now - req_time < self.time_window]
        
        if len(self.requests) >= self.max_requests:
            # Wait until we can make another request
            sleep_time = self.time_window - (now - self.requests[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)
        
        self.requests.append(now)

class AsyncWebScraper:
    def __init__(self, max_concurrent: int = 10, rate_limit: int = 5, time_window: float = 1.0):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.rate_limiter = RateLimiter(rate_limit, time_window)
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_page(self, url: str) -> ScrapedData:
        await self.rate_limiter.acquire()
        async with self.semaphore:
            try:
                async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                    # For HTML pages, we'd parse with BeautifulSoup here
                    # For this example, we'll just get basic info
                    content = await response.text()
                    return ScrapedData(
                        url=url,
                        title=f"Page at {urlparse(url).netloc}",
                        status_code=response.status,
                        timestamp=datetime.now().isoformat(),
                        content_length=len(content)
                    )
            except Exception as e:
                return ScrapedData(
                    url=url,
                    title=f"Error: {str(e)}",
                    status_code=0,
                    timestamp=datetime.now().isoformat(),
                    content_length=0
                )
    
    async def scrape_urls(self, urls: List[str]) -> List[ScrapedData]:
        tasks = [self.fetch_page(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        # Filter out exceptions
        return [r for r in results if not isinstance(r, Exception)]

def export_to_json(data: List[ScrapedData], filename: str):
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump([asdict(item) for item in data], f, indent=2, ensure_ascii=False)

def export_to_csv(data: List[ScrapedData], filename: str):
    if not data:
        return
    
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=data[0].__dataclass_fields__.keys())
        writer.writeheader()
        for item in data:
            writer.writerow(asdict(item))

# Example usage
async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
        "https://httpbin.org/json",
        "https://httpbin.org/xml"
    ]
    
    # Scrape with 3 concurrent connections, max 2 requests per second
    async with AsyncWebScraper(max_concurrent=3, rate_limit=2, time_window=1.0) as scraper:
        print("Starting scrape...")
        results = await scraper.scrape_urls(urls)
        print(f"Completed scraping {len(results)} URLs")
        
        # Export results
        export_to_json(results, "scraped_data.json")
        export_to_csv(results, "scraped_data.csv")
        
        # Print results
        for result in results:
            print(f"{result.status_code} - {result.url} ({result.content_length} bytes)")

if __name__ == "__main__":
    asyncio.run(main())

What This Code Does

This asynchronous web scraper provides a robust framework for collecting data from multiple URLs while respecting server resources and handling errors gracefully. Here’s what makes it special:

  1. Asynchronous Processing: Utilizes aiohttp and asyncio to fetch multiple URLs concurrently, dramatically reducing total scraping time compared to synchronous approaches.

  2. Rate Limiting: Implements a token-bucket style rate limiter that prevents overwhelming target servers. You can configure how many requests are allowed per time window.

  3. Concurrency Control: Uses semaphores to limit the maximum number of simultaneous connections, preventing resource exhaustion.

  4. Error Handling: Gracefully handles network errors, timeouts, and other exceptions without stopping the entire scraping process.

  5. Data Export: Automatically exports results to both JSON and CSV formats for easy integration with other tools.

  6. Resource Management: Properly manages HTTP connections using async context managers.

Key Features

How to Run It

  1. Install required dependencies:
    pip install aiohttp
    
  2. Save the code to a file (e.g., scraper.py)

  3. Run the script:
    python scraper.py
    
  4. Check the generated scraped_data.json and scraped_data.csv files for results

Customization

To scrape your own URLs:

  1. Replace the urls list in the main() function with your target URLs
  2. Adjust max_concurrent and rate limit parameters based on the target server’s capacity
  3. Modify the ScrapedData class and fetch_page method to extract the specific data you need (e.g., using BeautifulSoup for HTML parsing)
  4. Add additional export formats as needed

This scraper is particularly useful for monitoring websites, collecting API data, or performing competitive analysis while being respectful of server resources.