import time
import re
from pathlib import Path
from typing import Callable, List, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class LogEntry:
timestamp: datetime
line: str
matched_patterns: List[str]
class LogMonitor:
def __init__(self, log_file: Path, patterns: List[str], callback: Callable[[LogEntry], None]):
self.log_file = log_file
self.patterns = [re.compile(pattern) for pattern in patterns]
self.callback = callback
self.last_position = 0
def start(self, poll_interval: float = 1.0):
"""Start monitoring the log file for new entries matching patterns."""
print(f"Monitoring {self.log_file} for patterns: {[p.pattern for p in self.patterns]}")
# If file doesn't exist yet, wait for it
while not self.log_file.exists():
print(f"Waiting for log file {self.log_file} to be created...")
time.sleep(poll_interval)
# Initialize position to end of existing file
self.last_position = self.log_file.stat().st_size
try:
while True:
self._check_for_updates()
time.sleep(poll_interval)
except KeyboardInterrupt:
print("\nStopping log monitor...")
def _check_for_updates(self):
"""Check for new content in the log file."""
current_size = self.log_file.stat().st_size
# File was truncated (log rotation)
if current_size < self.last_position:
self.last_position = 0
# New content was added
if current_size > self.last_position:
with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(self.last_position)
new_content = f.read()
self.last_position = f.tell()
# Process each line
for line in new_content.splitlines():
if line.strip(): # Skip empty lines
self._process_line(line)
def _process_line(self, line: str):
"""Process a single line and check for pattern matches."""
matched_patterns = []
for pattern in self.patterns:
if pattern.search(line):
matched_patterns.append(pattern.pattern)
if matched_patterns:
# Try to extract timestamp from line (common formats)
timestamp = self._extract_timestamp(line)
entry = LogEntry(timestamp=timestamp, line=line, matched_patterns=matched_patterns)
self.callback(entry)
def _extract_timestamp(self, line: str) -> datetime:
"""Extract timestamp from log line or return current time."""
# Common timestamp formats
timestamp_patterns = [
r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', # 2023-01-01 12:00:00
r'\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}', # 01/01/2023 12:00:00
r'\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\]' # [2023-01-01 12:00:00]
]
for pattern in timestamp_patterns:
match = re.search(pattern, line)
if match:
try:
ts_str = match.group(0).strip('[]')
return datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
continue
try:
ts_str = match.group(0).strip('[]')
return datetime.strptime(ts_str, '%m/%d/%Y %H:%M:%S')
except ValueError:
continue
# If no timestamp found, return current time
return datetime.now()
# Example usage
def alert_callback(entry: LogEntry):
"""Callback function to handle matched log entries."""
print(f"[ALERT] {entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')} - Matched patterns: {entry.matched_patterns}")
print(f" {entry.line}\n")
if __name__ == "__main__":
# Define patterns to watch for
patterns_to_watch = [
r'ERROR', # Any line containing ERROR
r'Exception', # Any line containing Exception
r'failed', # Any line containing failed
r'\b500\b', # HTTP 500 errors (word boundary)
r'Authentication.*failed' # Authentication failures
]
# Create and start monitor
log_file_path = Path("application.log")
monitor = LogMonitor(
log_file=log_file_path,
patterns=patterns_to_watch,
callback=alert_callback
)
# Start monitoring (will run until interrupted with Ctrl+C)
monitor.start(poll_interval=2.0)
This Log Monitor is a practical tool that watches log files in real-time and alerts you when specific patterns appear. It’s particularly useful for system administrators, developers, and DevOps engineers who need to respond quickly to critical events in application logs.
LogEntry dataclass to organize matched log informationLogMonitor instance with your log file, patterns, and callbackstart() to begin watching the fileThe monitor will run continuously until stopped, making it suitable for long-running operations. It’s especially valuable in production environments where immediate response to errors or security events is critical.