import asyncio
import aiohttp
import aiofiles
from pathlib import Path
import time
from typing import List, Tuple
from dataclasses import dataclass
import sys
@dataclass
class DownloadProgress:
url: str
filename: str
downloaded: int
total_size: int
completed: bool = False
class AsyncFileDownloader:
def __init__(self, max_concurrent: int = 5):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.progress_callbacks = []
def add_progress_callback(self, callback):
"""Add a callback function to receive progress updates"""
self.progress_callbacks.append(callback)
async def download_file(self, url: str, filename: str, chunk_size: int = 8192) -> DownloadProgress:
"""Download a single file asynchronously"""
async with self.semaphore: # Limit concurrent downloads
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
progress = DownloadProgress(url, filename, 0, total_size)
# Create directory if it doesn't exist
Path(filename).parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(filename, 'wb') as file:
async for chunk in response.content.iter_chunked(chunk_size):
await file.write(chunk)
progress.downloaded += len(chunk)
# Notify progress callbacks
for callback in self.progress_callbacks:
callback(progress)
progress.completed = True
for callback in self.progress_callbacks:
callback(progress)
return progress
except Exception as e:
print(f"Error downloading {url}: {str(e)}")
raise
async def download_files(self, urls_and_filenames: List[Tuple[str, str]]) -> List[DownloadProgress]:
"""Download multiple files concurrently"""
tasks = [
self.download_file(url, filename)
for url, filename in urls_and_filenames
]
return await asyncio.gather(*tasks, return_exceptions=True)
def print_progress(progress: DownloadProgress):
"""Print progress to console"""
if progress.total_size > 0:
percentage = (progress.downloaded / progress.total_size) * 100
print(f"\r{progress.filename}: {percentage:.1f}% ({progress.downloaded}/{progress.total_size} bytes)", end='')
else:
print(f"\r{progress.filename}: {progress.downloaded} bytes downloaded", end='')
def main():
# Example URLs to download (replace with actual file URLs)
files_to_download = [
# Example format: (url, local_filename)
# ("https://example.com/file1.pdf", "downloads/file1.pdf"),
# ("https://example.com/file2.jpg", "downloads/file2.jpg"),
# Add your actual files here
]
# For demonstration purposes, we'll use some test URLs
# Note: You'll need to replace these with actual URLs
files_to_download = [
("https://httpbin.org/bytes/102400", "test_files/file1.bin"),
("https://httpbin.org/bytes/204800", "test_files/file2.bin"),
("https://httpbin.org/bytes/51200", "test_files/file3.bin"),
]
print("Starting asynchronous file downloads...")
# Create downloader with 3 concurrent downloads
downloader = AsyncFileDownloader(max_concurrent=3)
# Add progress callback
downloader.add_progress_callback(print_progress)
start_time = time.time()
try:
# Run the download
results = asyncio.run(downloader.download_files(files_to_download))
end_time = time.time()
print(f"\n\nDownload completed in {end_time - start_time:.2f} seconds")
# Print results
successful_downloads = 0
for result in results:
if isinstance(result, DownloadProgress) and result.completed:
successful_downloads += 1
elif isinstance(result, Exception):
print(f"Download failed: {result}")
print(f"Successfully downloaded {successful_downloads}/{len(files_to_download)} files")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
This is an asynchronous file downloader that efficiently handles multiple file downloads concurrently. It uses modern Python asynchronous programming features to download files in parallel while maintaining system resources and providing real-time progress updates.
asyncio.Semaphore
to limit the number of concurrent downloads, preventing resource exhaustionpip install aiohttp aiofiles
files_to_download
list with your actual URLs and filenames:
files_to_download = [
("https://example.com/file1.pdf", "downloads/file1.pdf"),
("https://example.com/file2.jpg", "downloads/file2.jpg"),
]
python download_files.py
You can modify the max_concurrent
parameter to adjust the number of simultaneous downloads based on your system capabilities and network conditions. You can also add custom progress handlers for different visualization needs (e.g., GUI updates, logging).