import sqlite3
from contextlib import contextmanager
from threading import Lock
from queue import Queue, Empty
import time
from typing import Optional, Generator
class DatabasePool:
def __init__(self, db_path: str, min_connections: int = 2, max_connections: int = 10):
self.db_path = db_path
self.min_connections = min_connections
self.max_connections = max_connections
self.pool = Queue(maxsize=max_connections)
self.lock = Lock()
self.current_connections = 0
# Initialize minimum connections
for _ in range(min_connections):
self._create_connection()
def _create_connection(self) -> sqlite3.Connection:
"""Create a new database connection"""
conn = sqlite3.connect(self.db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row # Enable column access by name
with self.lock:
self.current_connections += 1
return conn
def _destroy_connection(self, conn: sqlite3.Connection) -> None:
"""Close a database connection"""
conn.close()
with self.lock:
self.current_connections -= 1
def acquire(self) -> sqlite3.Connection:
"""Acquire a connection from the pool"""
try:
# Try to get existing connection
return self.pool.get_nowait()
except Empty:
# No available connections, create new one if under limit
with self.lock:
if self.current_connections < self.max_connections:
return self._create_connection()
# Wait for available connection
return self.pool.get()
def release(self, conn: sqlite3.Connection) -> None:
"""Release a connection back to the pool"""
if conn:
try:
# Reset connection state
conn.rollback()
self.pool.put_nowait(conn)
except:
# If pool is full, close the connection
self._destroy_connection(conn)
@contextmanager
def get_connection(self) -> Generator[sqlite3.Connection, None, None]:
"""Context manager for acquiring and releasing connections"""
conn = self.acquire()
try:
yield conn
finally:
self.release(conn)
def close_all(self) -> None:
"""Close all connections in the pool"""
while not self.pool.empty():
try:
conn = self.pool.get_nowait()
self._destroy_connection(conn)
except Empty:
break
# Example usage
if __name__ == "__main__":
# Create database and pool
pool = DatabasePool("example.db", min_connections=3, max_connections=5)
# Create sample table
with pool.get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
# Insert sample data
def insert_user(name: str, email: str):
with pool.get_connection() as conn:
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))
conn.commit()
print(f"Inserted user: {name}")
# Query data
def get_users():
with pool.get_connection() as conn:
cursor = conn.execute("SELECT * FROM users ORDER BY created_at DESC")
return cursor.fetchall()
# Example operations
insert_user("Alice Johnson", "alice@example.com")
insert_user("Bob Smith", "bob@example.com")
insert_user("Carol Davis", "carol@example.com")
users = get_users()
print("\nAll users:")
for user in users:
print(f"- {user['name']} ({user['email']})")
# Clean up
pool.close_all()
This snippet implements a database connection pool manager for SQLite with context manager support. It efficiently manages a pool of database connections to avoid the overhead of creating new connections for each database operation while preventing resource exhaustion.
Key features include:
with statement interface for automatic connection managementConnection pooling is essential for applications that perform frequent database operations because:
db_pool.py)python db_pool.pyexample.db) and demonstrate basic usageTo integrate into your project:
DatabasePool classget_connection() context manager for database operationsThe pool automatically handles connection lifecycle management, making database operations both efficient and safe.