This snippet demonstrates how to efficiently execute multiple tasks in parallel using Python’s concurrent.futures.ThreadPoolExecutor. It’s useful for I/O-bound operations (e.g., web scraping, file operations) where tasks can run concurrently to save time.
import concurrent.futures
import time
from typing import List, Callable
def parallel_execute(
tasks: List[Callable],
max_workers: int = 5
) -> List:
"""
Execute a list of tasks in parallel using ThreadPoolExecutor.
Args:
tasks: List of callable tasks to execute.
max_workers: Maximum number of worker threads.
Returns:
List of results in the order of task completion.
"""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(task) for task in tasks}
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
# Example usage
def mock_task(duration: float) -> str:
"""Simulate a time-consuming task (e.g., API call)."""
time.sleep(duration)
return f"Task completed in {duration}s"
if __name__ == "__main__":
tasks = [lambda: mock_task(1), lambda: mock_task(2), lambda: mock_task(0.5)]
results = parallel_execute(tasks, max_workers=3)
print("Results:", results)
parallel_execute Function:
ThreadPoolExecutor to manage worker threads efficiently.max_workers=3, all tasks run in parallel, completing in ~2 seconds (the longest task) instead of ~3.5 seconds sequentially.thread.start() or thread.join()).parallel_tasks.py).python parallel_tasks.py.max_workers based on your system’s capabilities.For CPU-bound tasks, replace ThreadPoolExecutor with ProcessPoolExecutor for true parallelism.