import sqlite3
from contextlib import contextmanager
from queue import Queue, Empty
from threading import Lock, Thread
import time
class DatabasePool:
def __init__(self, db_path, min_connections=2, max_connections=10):
self.db_path = db_path
self.min_connections = min_connections
self.max_connections = max_connections
self.pool = Queue(maxsize=max_connections)
self.lock = Lock()
self.current_connections = 0
# Pre-populate with minimum connections
for _ in range(min_connections):
conn = self._create_connection()
self.pool.put(conn)
self.current_connections += 1
def _create_connection(self):
"""Create a new database connection."""
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row # Enable dict-like access to rows
return conn
@contextmanager
def get_connection(self, timeout=5):
"""Get a connection from the pool as a context manager."""
conn = None
try:
# Try to get existing connection
try:
conn = self.pool.get(block=True, timeout=timeout)
except Empty:
# No available connections, create new one if under limit
with self.lock:
if self.current_connections < self.max_connections:
conn = self._create_connection()
self.current_connections += 1
else:
# Wait for available connection
conn = self.pool.get(block=True, timeout=timeout)
yield conn
finally:
# Return connection to pool
if conn:
try:
self.pool.put(conn, block=False)
except:
# Pool is full, close connection
conn.close()
with self.lock:
self.current_connections -= 1
def execute_query(self, query, params=None, timeout=5):
"""Execute a query and return results."""
with self.get_connection(timeout) as conn:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
return cursor.fetchall()
def execute_update(self, query, params=None, timeout=5):
"""Execute an update/insert/delete query."""
with self.get_connection(timeout) as conn:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
conn.commit()
return cursor.rowcount
def close_all(self):
"""Close all connections in the pool."""
while not self.pool.empty():
try:
conn = self.pool.get(block=False)
conn.close()
except Empty:
break
self.current_connections = 0
# Example usage
if __name__ == "__main__":
# Initialize the pool
pool = DatabasePool("example.db", min_connections=3, max_connections=5)
# Create a sample table
pool.execute_update("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert sample data
try:
pool.execute_update(
"INSERT INTO users (name, email) VALUES (?, ?)",
("Alice Smith", "alice@example.com")
)
pool.execute_update(
"INSERT INTO users (name, email) VALUES (?, ?)",
("Bob Johnson", "bob@example.com")
)
except sqlite3.IntegrityError:
print("User already exists")
# Query data
users = pool.execute_query("SELECT * FROM users ORDER BY created_at DESC")
print("Users in database:")
for user in users:
print(f"- {user['name']} ({user['email']})")
# Close all connections when done
pool.close_all()
This code implements a thread-safe database connection pool manager with context manager support for Python. It efficiently manages a pool of database connections, allowing multiple threads to share connections without the overhead of creating new connections for each operation.
Key features include:
with statementDatabase connections are expensive to create and destroy. In applications with high database usage, creating a new connection for each operation can lead to performance bottlenecks. A connection pool solves this by:
This implementation is particularly valuable for web applications, data processing pipelines, or any multi-threaded application that frequently accesses a database.
db_pool.py)python db_pool.pyexample.db in the current directoryTo use in your own project:
DatabasePool classget_connection() context manager or the convenience methods execute_query() and execute_update()close_all() when shutting down your applicationThe pool automatically handles connection lifecycle management, making database access both efficient and safe.