Python Snippets

Database Connection Pool Manager with Context Manager

import sqlite3
from contextlib import contextmanager
from queue import Queue, Empty
from threading import Lock, Thread
import time
from typing import Optional, Generator

class DatabaseConnectionPool:
    def __init__(self, db_path: str, min_connections: int = 2, max_connections: int = 10):
        self.db_path = db_path
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.pool = Queue(maxsize=max_connections)
        self.lock = Lock()
        self.current_connections = 0
        
        # Initialize with minimum connections
        for _ in range(min_connections):
            self._create_connection()
    
    def _create_connection(self) -> sqlite3.Connection:
        """Create a new database connection"""
        conn = sqlite3.connect(self.db_path, check_same_thread=False)
        conn.row_factory = sqlite3.Row  # Enable dictionary-like access to rows
        with self.lock:
            self.current_connections += 1
        return conn
    
    def _close_connection(self, conn: sqlite3.Connection) -> None:
        """Close a database connection"""
        try:
            conn.close()
        except:
            pass
        finally:
            with self.lock:
                self.current_connections -= 1
    
    def get_connection(self, timeout: float = 5.0) -> sqlite3.Connection:
        """Get a connection from the pool or create a new one if needed"""
        try:
            # Try to get existing connection
            return self.pool.get(timeout=timeout)
        except Empty:
            # No available connections, create new one if under limit
            with self.lock:
                if self.current_connections < self.max_connections:
                    return self._create_connection()
            
            # Wait for connection to become available
            return self.pool.get(timeout=timeout)
    
    def return_connection(self, conn: sqlite3.Connection) -> None:
        """Return a connection to the pool"""
        if conn:
            try:
                # Reset connection state
                conn.rollback()
                # Try to put back in pool, close if pool is full
                self.pool.put(conn, timeout=0.1)
            except:
                # Pool is full, close the connection
                self._close_connection(conn)
    
    def close_all(self) -> None:
        """Close all connections in the pool"""
        while not self.pool.empty():
            try:
                conn = self.pool.get_nowait()
                self._close_connection(conn)
            except Empty:
                break

@contextmanager
def get_db_connection(pool: DatabaseConnectionPool) -> Generator[sqlite3.Connection, None, None]:
    """Context manager for database connections"""
    conn = None
    try:
        conn = pool.get_connection()
        yield conn
    except Exception as e:
        if conn:
            conn.rollback()
        raise e
    finally:
        if conn:
            pool.return_connection(conn)

# Example usage
if __name__ == "__main__":
    # Create database and table for demonstration
    init_conn = sqlite3.connect("example.db")
    init_conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    init_conn.close()
    
    # Create connection pool
    db_pool = DatabaseConnectionPool("example.db", min_connections=3, max_connections=8)
    
    def insert_user(name: str, email: str) -> None:
        """Insert a user into the database using connection pool"""
        with get_db_connection(db_pool) as conn:
            conn.execute(
                "INSERT INTO users (name, email) VALUES (?, ?)",
                (name, email)
            )
            conn.commit()
            print(f"Inserted user: {name}")
    
    def get_users() -> list:
        """Retrieve all users from the database"""
        with get_db_connection(db_pool) as conn:
            cursor = conn.execute("SELECT * FROM users ORDER BY created_at DESC")
            return [dict(row) for row in cursor.fetchall()]
    
    # Example usage with multiple threads
    def worker_thread(thread_id: int) -> None:
        """Worker thread that performs database operations"""
        for i in range(3):
            insert_user(f"User-{thread_id}-{i}", f"user{thread_id}_{i}@example.com")
            time.sleep(0.1)
    
    # Create and start worker threads
    threads = []
    for i in range(5):
        thread = Thread(target=worker_thread, args=(i,))
        threads.append(thread)
        thread.start()
    
    # Wait for all threads to complete
    for thread in threads:
        thread.join()
    
    # Retrieve and display all users
    users = get_users()
    print(f"\nTotal users in database: {len(users)}")
    for user in users[:10]:  # Show first 10 users
        print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")
    
    # Clean up
    db_pool.close_all()

Explanation

This database connection pool manager provides an efficient way to handle database connections in multi-threaded applications. It solves the common problem of connection overhead and resource exhaustion that occurs when creating new database connections for every operation.

Key Features:

  1. Connection Pooling: Maintains a pool of reusable database connections, reducing the overhead of creating new connections
  2. Thread Safety: Uses locks and thread-safe queues to ensure safe concurrent access
  3. Automatic Scaling: Dynamically creates new connections when the pool is empty (up to a maximum limit)
  4. Context Manager: Provides a clean, Pythonic interface using with statements
  5. Connection Reset: Automatically rolls back transactions and resets connection state before returning to pool
  6. Resource Management: Properly closes connections when pool is full or when shutting down

How It Works:

  1. Initialization: Creates a minimum number of connections on startup
  2. Connection Acquisition: When a connection is requested, it either returns an available connection from the pool or creates a new one (if under the maximum limit)
  3. Connection Release: Connections are returned to the pool after use and can be reused by other operations
  4. Automatic Cleanup: Excess connections are closed when the pool is full

Why It’s Useful:

How to Run:

  1. Save the code to a file (e.g., db_pool.py)
  2. Run with: python db_pool.py
  3. The script will create a SQLite database, demonstrate concurrent operations, and show the results

To use in your own project, simply create a DatabaseConnectionPool instance and use the get_db_connection context manager for all database operations.