Python Snippets

Database Connection Pool Manager with Context Manager Support

import sqlite3
from queue import Queue, Empty
from threading import Lock, Thread
import time
from contextlib import contextmanager
from typing import Optional, Generator
import logging

class DatabaseConnectionPool:
    """
    A thread-safe database connection pool with context manager support.
    Manages a pool of database connections to improve performance and resource utilization.
    """
    
    def __init__(self, db_path: str, min_connections: int = 2, max_connections: int = 10):
        self.db_path = db_path
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.pool = Queue(maxsize=max_connections)
        self.lock = Lock()
        self.current_connections = 0
        
        # Initialize minimum connections
        for _ in range(min_connections):
            self._create_connection()
    
    def _create_connection(self) -> sqlite3.Connection:
        """Create a new database connection."""
        conn = sqlite3.connect(self.db_path, check_same_thread=False)
        conn.execute("PRAGMA journal_mode=WAL")  # Enable WAL mode for better concurrency
        self.current_connections += 1
        return conn
    
    def _close_connection(self, conn: sqlite3.Connection) -> None:
        """Close a database connection."""
        try:
            conn.close()
        except:
            pass
        finally:
            self.current_connections -= 1
    
    @contextmanager
    def get_connection(self) -> Generator[sqlite3.Connection, None, None]:
        """
        Get a database connection from the pool.
        Automatically returns the connection to the pool when done.
        """
        conn = None
        try:
            # Try to get connection from pool
            try:
                conn = self.pool.get_nowait()
            except Empty:
                # If pool is empty, create new connection if under limit
                with self.lock:
                    if self.current_connections < self.max_connections:
                        conn = self._create_connection()
                    else:
                        # Wait for available connection
                        conn = self.pool.get(timeout=5)
            
            # Verify connection is still valid
            conn.execute("SELECT 1")
            yield conn
            
        except Exception as e:
            # If there's an error, close the connection and don't return to pool
            if conn:
                with self.lock:
                    self._close_connection(conn)
            raise e
        else:
            # Return connection to pool
            try:
                self.pool.put_nowait(conn)
            except:
                # If pool is full, close the connection
                with self.lock:
                    self._close_connection(conn)
    
    def close_all(self) -> None:
        """Close all connections in the pool."""
        with self.lock:
            while not self.pool.empty():
                try:
                    conn = self.pool.get_nowait()
                    self._close_connection(conn)
                except Empty:
                    break

# Example usage
if __name__ == "__main__":
    # Set up logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Create database and table for demo
    demo_db = "demo.db"
    with sqlite3.connect(demo_db) as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
    
    # Initialize connection pool
    pool = DatabaseConnectionPool(demo_db, min_connections=3, max_connections=8)
    
    def insert_user(name: str, email: str) -> None:
        """Insert a user into the database using connection pool."""
        try:
            with pool.get_connection() as conn:
                conn.execute(
                    "INSERT INTO users (name, email) VALUES (?, ?)",
                    (name, email)
                )
                conn.commit()
                logger.info(f"Inserted user: {name} ({email})")
        except sqlite3.IntegrityError:
            logger.warning(f"User with email {email} already exists")
        except Exception as e:
            logger.error(f"Error inserting user {name}: {e}")
    
    def get_user_count() -> int:
        """Get the total number of users."""
        try:
            with pool.get_connection() as conn:
                cursor = conn.execute("SELECT COUNT(*) FROM users")
                count = cursor.fetchone()[0]
                return count
        except Exception as e:
            logger.error(f"Error getting user count: {e}")
            return 0
    
    # Simulate concurrent database operations
    from concurrent.futures import ThreadPoolExecutor
    
    # Insert multiple users concurrently
    users_data = [
        ("Alice Johnson", "alice@example.com"),
        ("Bob Smith", "bob@example.com"),
        ("Charlie Brown", "charlie@example.com"),
        ("Diana Prince", "diana@example.com"),
        ("Eve Wilson", "eve@example.com"),
    ]
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        # Submit insert tasks
        futures = [
            executor.submit(insert_user, name, email)
            for name, email in users_data
        ]
        
        # Wait for all insertions to complete
        for future in futures:
            future.result()
    
    # Check final count
    count = get_user_count()
    logger.info(f"Total users in database: {count}")
    
    # Clean up
    pool.close_all()

What This Code Does

This snippet implements a Database Connection Pool Manager with the following key features:

  1. Thread-Safe Connection Management: Uses a queue-based approach with locks to ensure thread safety when multiple threads request connections simultaneously.

  2. Context Manager Support: Provides a clean get_connection() method that works as a context manager (with statement), automatically returning connections to the pool.

  3. Dynamic Connection Creation: Creates new connections when needed (up to a maximum limit) and reuses existing ones when possible.

  4. Connection Validation: Checks that connections are still valid before yielding them to callers.

  5. Automatic Resource Management: Automatically handles connection cleanup and ensures connections are properly closed when the pool is shut down.

  6. Graceful Error Handling: Handles database errors appropriately, ensuring that broken connections don’t get returned to the pool.

Why This Is Useful

Connection pooling is essential for applications that:

This implementation is particularly useful for:

How to Run

  1. Save the code to a file (e.g., db_pool.py)
  2. Run it directly: python db_pool.py
  3. The script will:
    • Create a SQLite database file (demo.db)
    • Initialize a connection pool
    • Insert sample users concurrently
    • Report the total number of users inserted
    • Clean up all connections

Customization Options

You can customize the pool behavior by adjusting:

The pool works with SQLite by default but can be adapted for other databases by changing the connection creation logic.