import sqlite3
from queue import Queue, Empty
from threading import Lock, Thread
import time
from contextlib import contextmanager
from typing import Optional, Generator
import logging
class DatabaseConnectionPool:
"""
A thread-safe database connection pool with context manager support.
Manages a pool of database connections to improve performance and resource utilization.
"""
def __init__(self, db_path: str, min_connections: int = 2, max_connections: int = 10):
self.db_path = db_path
self.min_connections = min_connections
self.max_connections = max_connections
self.pool = Queue(maxsize=max_connections)
self.lock = Lock()
self.current_connections = 0
# Initialize minimum connections
for _ in range(min_connections):
self._create_connection()
def _create_connection(self) -> sqlite3.Connection:
"""Create a new database connection."""
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL") # Enable WAL mode for better concurrency
self.current_connections += 1
return conn
def _close_connection(self, conn: sqlite3.Connection) -> None:
"""Close a database connection."""
try:
conn.close()
except:
pass
finally:
self.current_connections -= 1
@contextmanager
def get_connection(self) -> Generator[sqlite3.Connection, None, None]:
"""
Get a database connection from the pool.
Automatically returns the connection to the pool when done.
"""
conn = None
try:
# Try to get connection from pool
try:
conn = self.pool.get_nowait()
except Empty:
# If pool is empty, create new connection if under limit
with self.lock:
if self.current_connections < self.max_connections:
conn = self._create_connection()
else:
# Wait for available connection
conn = self.pool.get(timeout=5)
# Verify connection is still valid
conn.execute("SELECT 1")
yield conn
except Exception as e:
# If there's an error, close the connection and don't return to pool
if conn:
with self.lock:
self._close_connection(conn)
raise e
else:
# Return connection to pool
try:
self.pool.put_nowait(conn)
except:
# If pool is full, close the connection
with self.lock:
self._close_connection(conn)
def close_all(self) -> None:
"""Close all connections in the pool."""
with self.lock:
while not self.pool.empty():
try:
conn = self.pool.get_nowait()
self._close_connection(conn)
except Empty:
break
# Example usage
if __name__ == "__main__":
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create database and table for demo
demo_db = "demo.db"
with sqlite3.connect(demo_db) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
# Initialize connection pool
pool = DatabaseConnectionPool(demo_db, min_connections=3, max_connections=8)
def insert_user(name: str, email: str) -> None:
"""Insert a user into the database using connection pool."""
try:
with pool.get_connection() as conn:
conn.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
conn.commit()
logger.info(f"Inserted user: {name} ({email})")
except sqlite3.IntegrityError:
logger.warning(f"User with email {email} already exists")
except Exception as e:
logger.error(f"Error inserting user {name}: {e}")
def get_user_count() -> int:
"""Get the total number of users."""
try:
with pool.get_connection() as conn:
cursor = conn.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
return count
except Exception as e:
logger.error(f"Error getting user count: {e}")
return 0
# Simulate concurrent database operations
from concurrent.futures import ThreadPoolExecutor
# Insert multiple users concurrently
users_data = [
("Alice Johnson", "alice@example.com"),
("Bob Smith", "bob@example.com"),
("Charlie Brown", "charlie@example.com"),
("Diana Prince", "diana@example.com"),
("Eve Wilson", "eve@example.com"),
]
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit insert tasks
futures = [
executor.submit(insert_user, name, email)
for name, email in users_data
]
# Wait for all insertions to complete
for future in futures:
future.result()
# Check final count
count = get_user_count()
logger.info(f"Total users in database: {count}")
# Clean up
pool.close_all()
This snippet implements a Database Connection Pool Manager with the following key features:
Thread-Safe Connection Management: Uses a queue-based approach with locks to ensure thread safety when multiple threads request connections simultaneously.
Context Manager Support: Provides a clean get_connection()
method that works as a context manager (with
statement), automatically returning connections to the pool.
Dynamic Connection Creation: Creates new connections when needed (up to a maximum limit) and reuses existing ones when possible.
Connection Validation: Checks that connections are still valid before yielding them to callers.
Automatic Resource Management: Automatically handles connection cleanup and ensures connections are properly closed when the pool is shut down.
Graceful Error Handling: Handles database errors appropriately, ensuring that broken connections don’t get returned to the pool.
Connection pooling is essential for applications that:
This implementation is particularly useful for:
db_pool.py
)python db_pool.py
demo.db
)You can customize the pool behavior by adjusting:
min_connections
: Minimum number of connections to maintainmax_connections
: Maximum number of connections alloweddemo.db
to any SQLite database file_create_connection()
for different database configurationsThe pool works with SQLite by default but can be adapted for other databases by changing the connection creation logic.