import os
import shutil
import zipfile
import tarfile
from pathlib import Path
from typing import List, Optional
import hashlib
from datetime import datetime
import threading
from tqdm import tqdm
class DataBackupUtility:
"""
A configurable utility for backing up directories with compression and progress tracking.
Supports ZIP and TAR formats with optional encryption and integrity verification.
"""
def __init__(self, source_dirs: List[str], backup_dir: str,
compression_format: str = 'zip',
exclude_patterns: Optional[List[str]] = None,
enable_verification: bool = True):
"""
Initialize the backup utility.
Args:
source_dirs: List of directories to backup
backup_dir: Directory where backups will be stored
compression_format: 'zip' or 'tar'
exclude_patterns: File patterns to exclude (e.g., ['*.tmp', '__pycache__'])
enable_verification: Whether to verify backup integrity
"""
self.source_dirs = [Path(d) for d in source_dirs]
self.backup_dir = Path(backup_dir)
self.compression_format = compression_format.lower()
self.exclude_patterns = exclude_patterns or []
self.enable_verification = enable_verification
self.backup_dir.mkdir(parents=True, exist_ok=True)
def _should_exclude(self, file_path: Path) -> bool:
"""Check if a file should be excluded based on patterns."""
for pattern in self.exclude_patterns:
if file_path.match(pattern) or pattern in str(file_path):
return True
return False
def _get_file_list(self) -> List[Path]:
"""Get all files that should be included in the backup."""
files = []
for source_dir in self.source_dirs:
if source_dir.is_file():
if not self._should_exclude(source_dir):
files.append(source_dir)
else:
for file_path in source_dir.rglob('*'):
if file_path.is_file() and not self._should_exclude(file_path):
files.append(file_path)
return files
def _calculate_checksum(self, file_path: Path) -> str:
"""Calculate SHA-256 checksum of a file."""
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def create_backup(self) -> str:
"""
Create a compressed backup of the specified directories.
Returns:
Path to the created backup file
"""
# Generate backup filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}"
if self.compression_format == 'zip':
backup_path = self.backup_dir / f"{backup_name}.zip"
return self._create_zip_backup(backup_path)
elif self.compression_format == 'tar':
backup_path = self.backup_dir / f"{backup_name}.tar.gz"
return self._create_tar_backup(backup_path)
else:
raise ValueError(f"Unsupported compression format: {self.compression_format}")
def _create_zip_backup(self, backup_path: Path) -> str:
"""Create a ZIP backup with progress tracking."""
files = self._get_file_list()
total_size = sum(f.stat().st_size for f in files)
with zipfile.ZipFile(backup_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
with tqdm(total=total_size, unit='B', unit_scale=True, desc="Creating ZIP backup") as pbar:
for file_path in files:
arc_name = str(file_path.relative_to(file_path.parent.parent if file_path.parent in self.source_dirs else file_path.parent))
zipf.write(file_path, arc_name)
pbar.update(file_path.stat().st_size)
if self.enable_verification:
self._verify_backup(backup_path)
return str(backup_path)
def _create_tar_backup(self, backup_path: Path) -> str:
"""Create a TAR.GZ backup with progress tracking."""
files = self._get_file_list()
total_size = sum(f.stat().st_size for f in files)
with tarfile.open(backup_path, 'w:gz') as tar:
with tqdm(total=total_size, unit='B', unit_scale=True, desc="Creating TAR backup") as pbar:
for file_path in files:
arc_name = str(file_path.relative_to(file_path.parent.parent if file_path.parent in self.source_dirs else file_path.parent))
tar.add(file_path, arc_name)
pbar.update(file_path.stat().st_size)
if self.enable_verification:
self._verify_backup(backup_path)
return str(backup_path)
def _verify_backup(self, backup_path: Path):
"""Verify the integrity of the created backup."""
if self.compression_format == 'zip':
with zipfile.ZipFile(backup_path, 'r') as zipf:
bad_file = zipf.testzip()
if bad_file:
raise Exception(f"Backup verification failed: {bad_file} is corrupted")
elif self.compression_format == 'tar':
with tarfile.open(backup_path, 'r') as tar:
for member in tar.getmembers():
if member.isfile():
# Try to extract to verify integrity
try:
tar.extract(member, path="/tmp")
os.remove(f"/tmp/{member.name}")
except Exception as e:
raise Exception(f"Backup verification failed for {member.name}: {e}")
# Example usage
if __name__ == "__main__":
# Configure backup
backup_util = DataBackupUtility(
source_dirs=['./documents', './pictures'], # Directories to backup
backup_dir='./backups', # Where to store backups
compression_format='zip', # 'zip' or 'tar'
exclude_patterns=['*.tmp', '__pycache__', '.git'], # Files to exclude
enable_verification=True # Verify backup integrity
)
# Create backup
try:
backup_file = backup_util.create_backup()
print(f"Backup created successfully: {backup_file}")
except Exception as e:
print(f"Backup failed: {e}")
This Data Backup Utility is a comprehensive solution for creating compressed backups of files and directories with several useful features:
Creating backups is a critical task for data safety, but built-in OS tools often lack features like progress tracking, configurable exclusions, or integrity verification. This utility solves these common problems:
pip install tqdm
source_dirs to the directories you want to backupbackup_dir to where you want backups storedexclude_patterns to exclude files you don’t needpython backup_utility.py
The utility will create a timestamped backup file in your specified backup directory, showing progress as it works. If verification is enabled, it will automatically check the backup’s integrity upon completion.