Python Snippets

Database Connection Pool Manager with Context Manager Support

import sqlite3
from contextlib import contextmanager
from queue import Queue, Empty
from threading import Lock, Thread
import time

class DatabasePool:
    def __init__(self, db_path, min_connections=2, max_connections=10):
        self.db_path = db_path
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.pool = Queue(maxsize=max_connections)
        self.lock = Lock()
        self.current_connections = 0
        
        # Pre-populate with minimum connections
        for _ in range(min_connections):
            conn = self._create_connection()
            self.pool.put(conn)
            self.current_connections += 1
    
    def _create_connection(self):
        """Create a new database connection."""
        conn = sqlite3.connect(self.db_path, check_same_thread=False)
        conn.row_factory = sqlite3.Row  # Enable dict-like access to rows
        return conn
    
    @contextmanager
    def get_connection(self, timeout=5):
        """Get a connection from the pool as a context manager."""
        conn = None
        try:
            # Try to get existing connection
            try:
                conn = self.pool.get(block=True, timeout=timeout)
            except Empty:
                # No available connections, create new one if under limit
                with self.lock:
                    if self.current_connections < self.max_connections:
                        conn = self._create_connection()
                        self.current_connections += 1
                    else:
                        # Wait for available connection
                        conn = self.pool.get(block=True, timeout=timeout)
            
            yield conn
        finally:
            # Return connection to pool
            if conn:
                try:
                    self.pool.put(conn, block=False)
                except:
                    # Pool is full, close connection
                    conn.close()
                    with self.lock:
                        self.current_connections -= 1
    
    def execute_query(self, query, params=None, timeout=5):
        """Execute a query and return results."""
        with self.get_connection(timeout) as conn:
            cursor = conn.cursor()
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)
            return cursor.fetchall()
    
    def execute_update(self, query, params=None, timeout=5):
        """Execute an update/insert/delete query."""
        with self.get_connection(timeout) as conn:
            cursor = conn.cursor()
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)
            conn.commit()
            return cursor.rowcount
    
    def close_all(self):
        """Close all connections in the pool."""
        while not self.pool.empty():
            try:
                conn = self.pool.get(block=False)
                conn.close()
            except Empty:
                break
        self.current_connections = 0

# Example usage
if __name__ == "__main__":
    # Initialize the pool
    pool = DatabasePool("example.db", min_connections=3, max_connections=5)
    
    # Create a sample table
    pool.execute_update("""
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    
    # Insert sample data
    try:
        pool.execute_update(
            "INSERT INTO users (name, email) VALUES (?, ?)",
            ("Alice Smith", "alice@example.com")
        )
        pool.execute_update(
            "INSERT INTO users (name, email) VALUES (?, ?)",
            ("Bob Johnson", "bob@example.com")
        )
    except sqlite3.IntegrityError:
        print("User already exists")
    
    # Query data
    users = pool.execute_query("SELECT * FROM users ORDER BY created_at DESC")
    print("Users in database:")
    for user in users:
        print(f"- {user['name']} ({user['email']})")
    
    # Close all connections when done
    pool.close_all()

What This Code Does

This code implements a thread-safe database connection pool manager with context manager support for Python. It efficiently manages a pool of database connections, allowing multiple threads to share connections without the overhead of creating new connections for each operation.

Key features include:

Why This Is Useful

Database connections are expensive to create and destroy. In applications with high database usage, creating a new connection for each operation can lead to performance bottlenecks. A connection pool solves this by:

  1. Reducing Latency: Reusing existing connections eliminates connection setup time
  2. Improving Performance: Managing a fixed number of connections prevents resource exhaustion
  3. Ensuring Thread Safety: Safe for use in multi-threaded applications
  4. Simplifying Resource Management: Automatic cleanup prevents connection leaks

This implementation is particularly valuable for web applications, data processing pipelines, or any multi-threaded application that frequently accesses a database.

How to Run

  1. Save the code to a file (e.g., db_pool.py)
  2. Run with Python: python db_pool.py
  3. The example will create a SQLite database file example.db in the current directory
  4. It will create a users table, insert sample data, and display the results

To use in your own project:

  1. Import the DatabasePool class
  2. Initialize with your database path and connection limits
  3. Use the get_connection() context manager or the convenience methods execute_query() and execute_update()
  4. Call close_all() when shutting down your application

The pool automatically handles connection lifecycle management, making database access both efficient and safe.