Python Snippets

Parallel Task Execution with ThreadPoolExecutor

This snippet demonstrates how to efficiently execute multiple tasks in parallel using Python’s concurrent.futures.ThreadPoolExecutor. It’s useful for I/O-bound operations (e.g., web scraping, file operations) where tasks can run concurrently to save time.

import concurrent.futures
import time
from typing import List, Callable

def parallel_execute(
    tasks: List[Callable], 
    max_workers: int = 5
) -> List:
    """
    Execute a list of tasks in parallel using ThreadPoolExecutor.
    
    Args:
        tasks: List of callable tasks to execute.
        max_workers: Maximum number of worker threads.
    
    Returns:
        List of results in the order of task completion.
    """
    results = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(task) for task in tasks}
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
    return results

# Example usage
def mock_task(duration: float) -> str:
    """Simulate a time-consuming task (e.g., API call)."""
    time.sleep(duration)
    return f"Task completed in {duration}s"

if __name__ == "__main__":
    tasks = [lambda: mock_task(1), lambda: mock_task(2), lambda: mock_task(0.5)]
    results = parallel_execute(tasks, max_workers=3)
    print("Results:", results)

Explanation

  1. parallel_execute Function:
    • Takes a list of callable tasks and executes them concurrently.
    • Uses ThreadPoolExecutor to manage worker threads efficiently.
    • Returns results in the order tasks complete (not submission order).
  2. Example Use Case:
    • Simulates 3 tasks with different durations (e.g., API calls or file reads).
    • With max_workers=3, all tasks run in parallel, completing in ~2 seconds (the longest task) instead of ~3.5 seconds sequentially.
  3. Why It’s Useful:
    • Speeds up I/O-bound workflows (e.g., fetching multiple URLs, processing files).
    • Threads are managed automatically (no manual thread.start() or thread.join()).
    • ThreadPoolExecutor avoids resource exhaustion by reusing threads.
  4. How to Run:
    • Copy the code into a Python file (e.g., parallel_tasks.py).
    • Run with python parallel_tasks.py.
    • Adjust max_workers based on your system’s capabilities.

For CPU-bound tasks, replace ThreadPoolExecutor with ProcessPoolExecutor for true parallelism.