Python Snippets

Database Connection Pool Manager with Context Manager Support

import sqlite3
from contextlib import contextmanager
from threading import Lock
from queue import Queue, Empty
import time
from typing import Optional, Generator

class DatabasePool:
    def __init__(self, db_path: str, min_connections: int = 2, max_connections: int = 10):
        self.db_path = db_path
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.pool = Queue(maxsize=max_connections)
        self.lock = Lock()
        self.current_connections = 0
        
        # Initialize minimum connections
        for _ in range(min_connections):
            self._create_connection()
    
    def _create_connection(self) -> sqlite3.Connection:
        """Create a new database connection"""
        conn = sqlite3.connect(self.db_path, check_same_thread=False)
        conn.row_factory = sqlite3.Row  # Enable column access by name
        with self.lock:
            self.current_connections += 1
        return conn
    
    def _destroy_connection(self, conn: sqlite3.Connection) -> None:
        """Close a database connection"""
        conn.close()
        with self.lock:
            self.current_connections -= 1
    
    def acquire(self) -> sqlite3.Connection:
        """Acquire a connection from the pool"""
        try:
            # Try to get existing connection
            return self.pool.get_nowait()
        except Empty:
            # No available connections, create new one if under limit
            with self.lock:
                if self.current_connections < self.max_connections:
                    return self._create_connection()
            # Wait for available connection
            return self.pool.get()
    
    def release(self, conn: sqlite3.Connection) -> None:
        """Release a connection back to the pool"""
        if conn:
            try:
                # Reset connection state
                conn.rollback()
                self.pool.put_nowait(conn)
            except:
                # If pool is full, close the connection
                self._destroy_connection(conn)
    
    @contextmanager
    def get_connection(self) -> Generator[sqlite3.Connection, None, None]:
        """Context manager for acquiring and releasing connections"""
        conn = self.acquire()
        try:
            yield conn
        finally:
            self.release(conn)
    
    def close_all(self) -> None:
        """Close all connections in the pool"""
        while not self.pool.empty():
            try:
                conn = self.pool.get_nowait()
                self._destroy_connection(conn)
            except Empty:
                break

# Example usage
if __name__ == "__main__":
    # Create database and pool
    pool = DatabasePool("example.db", min_connections=3, max_connections=5)
    
    # Create sample table
    with pool.get_connection() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
    
    # Insert sample data
    def insert_user(name: str, email: str):
        with pool.get_connection() as conn:
            conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))
            conn.commit()
            print(f"Inserted user: {name}")
    
    # Query data
    def get_users():
        with pool.get_connection() as conn:
            cursor = conn.execute("SELECT * FROM users ORDER BY created_at DESC")
            return cursor.fetchall()
    
    # Example operations
    insert_user("Alice Johnson", "alice@example.com")
    insert_user("Bob Smith", "bob@example.com")
    insert_user("Carol Davis", "carol@example.com")
    
    users = get_users()
    print("\nAll users:")
    for user in users:
        print(f"- {user['name']} ({user['email']})")
    
    # Clean up
    pool.close_all()

What This Code Does

This snippet implements a database connection pool manager for SQLite with context manager support. It efficiently manages a pool of database connections to avoid the overhead of creating new connections for each database operation while preventing resource exhaustion.

Key features include:

Why This is Useful

Connection pooling is essential for applications that perform frequent database operations because:

  1. Performance: Eliminates the overhead of creating/destroying connections repeatedly
  2. Resource Management: Prevents connection leaks and resource exhaustion
  3. Scalability: Allows efficient handling of concurrent database operations
  4. Thread Safety: Ensures safe database access in multi-threaded applications
  5. Automatic Cleanup: Context managers ensure connections are always returned to the pool

How to Run

  1. Save the code to a Python file (e.g., db_pool.py)
  2. Run it directly: python db_pool.py
  3. The example will create a SQLite database file (example.db) and demonstrate basic usage

To integrate into your project:

  1. Import the DatabasePool class
  2. Create a pool instance with your database path
  3. Use the get_connection() context manager for database operations

The pool automatically handles connection lifecycle management, making database operations both efficient and safe.