import sqlite3
from contextlib import contextmanager
from queue import Queue, Empty
from threading import Lock, Thread
import time
from typing import Optional, Generator
class DatabaseConnectionPool:
def __init__(self, db_path: str, min_connections: int = 2, max_connections: int = 10):
self.db_path = db_path
self.min_connections = min_connections
self.max_connections = max_connections
self.pool = Queue(maxsize=max_connections)
self.lock = Lock()
self.current_connections = 0
# Initialize with minimum connections
for _ in range(min_connections):
self._create_connection()
def _create_connection(self) -> sqlite3.Connection:
"""Create a new database connection"""
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row # Enable dictionary-like access to rows
with self.lock:
self.current_connections += 1
return conn
def _close_connection(self, conn: sqlite3.Connection) -> None:
"""Close a database connection"""
try:
conn.close()
except:
pass
finally:
with self.lock:
self.current_connections -= 1
def get_connection(self, timeout: float = 5.0) -> sqlite3.Connection:
"""Get a connection from the pool or create a new one if needed"""
try:
# Try to get existing connection
return self.pool.get(timeout=timeout)
except Empty:
# No available connections, create new one if under limit
with self.lock:
if self.current_connections < self.max_connections:
return self._create_connection()
# Wait for connection to become available
return self.pool.get(timeout=timeout)
def return_connection(self, conn: sqlite3.Connection) -> None:
"""Return a connection to the pool"""
if conn:
try:
# Reset connection state
conn.rollback()
# Try to put back in pool, close if pool is full
self.pool.put(conn, timeout=0.1)
except:
# Pool is full, close the connection
self._close_connection(conn)
def close_all(self) -> None:
"""Close all connections in the pool"""
while not self.pool.empty():
try:
conn = self.pool.get_nowait()
self._close_connection(conn)
except Empty:
break
@contextmanager
def get_db_connection(pool: DatabaseConnectionPool) -> Generator[sqlite3.Connection, None, None]:
"""Context manager for database connections"""
conn = None
try:
conn = pool.get_connection()
yield conn
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
pool.return_connection(conn)
# Example usage
if __name__ == "__main__":
# Create database and table for demonstration
init_conn = sqlite3.connect("example.db")
init_conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
init_conn.close()
# Create connection pool
db_pool = DatabaseConnectionPool("example.db", min_connections=3, max_connections=8)
def insert_user(name: str, email: str) -> None:
"""Insert a user into the database using connection pool"""
with get_db_connection(db_pool) as conn:
conn.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
conn.commit()
print(f"Inserted user: {name}")
def get_users() -> list:
"""Retrieve all users from the database"""
with get_db_connection(db_pool) as conn:
cursor = conn.execute("SELECT * FROM users ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
# Example usage with multiple threads
def worker_thread(thread_id: int) -> None:
"""Worker thread that performs database operations"""
for i in range(3):
insert_user(f"User-{thread_id}-{i}", f"user{thread_id}_{i}@example.com")
time.sleep(0.1)
# Create and start worker threads
threads = []
for i in range(5):
thread = Thread(target=worker_thread, args=(i,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
# Retrieve and display all users
users = get_users()
print(f"\nTotal users in database: {len(users)}")
for user in users[:10]: # Show first 10 users
print(f"ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")
# Clean up
db_pool.close_all()
This database connection pool manager provides an efficient way to handle database connections in multi-threaded applications. It solves the common problem of connection overhead and resource exhaustion that occurs when creating new database connections for every operation.
with
statementsdb_pool.py
)python db_pool.py
To use in your own project, simply create a DatabaseConnectionPool
instance and use the get_db_connection
context manager for all database operations.