Python Snippets

Real-time Log File Monitor with Pattern Matching

import time
import re
from pathlib import Path
from typing import Callable, List, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class LogEntry:
    timestamp: datetime
    line: str
    matched_patterns: List[str]

class LogMonitor:
    def __init__(self, log_file: Path, patterns: List[str], callback: Callable[[LogEntry], None]):
        self.log_file = log_file
        self.patterns = [re.compile(pattern) for pattern in patterns]
        self.callback = callback
        self.last_position = 0
        
    def start(self, poll_interval: float = 1.0):
        """Start monitoring the log file for new entries matching patterns."""
        print(f"Monitoring {self.log_file} for patterns: {[p.pattern for p in self.patterns]}")
        
        # If file doesn't exist yet, wait for it
        while not self.log_file.exists():
            print(f"Waiting for log file {self.log_file} to be created...")
            time.sleep(poll_interval)
        
        # Initialize position to end of existing file
        self.last_position = self.log_file.stat().st_size
        
        try:
            while True:
                self._check_for_updates()
                time.sleep(poll_interval)
        except KeyboardInterrupt:
            print("\nStopping log monitor...")
            
    def _check_for_updates(self):
        """Check for new content in the log file."""
        current_size = self.log_file.stat().st_size
        
        # File was truncated (log rotation)
        if current_size < self.last_position:
            self.last_position = 0
            
        # New content was added
        if current_size > self.last_position:
            with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
                f.seek(self.last_position)
                new_content = f.read()
                self.last_position = f.tell()
                
                # Process each line
                for line in new_content.splitlines():
                    if line.strip():  # Skip empty lines
                        self._process_line(line)
    
    def _process_line(self, line: str):
        """Process a single line and check for pattern matches."""
        matched_patterns = []
        for pattern in self.patterns:
            if pattern.search(line):
                matched_patterns.append(pattern.pattern)
                
        if matched_patterns:
            # Try to extract timestamp from line (common formats)
            timestamp = self._extract_timestamp(line)
            entry = LogEntry(timestamp=timestamp, line=line, matched_patterns=matched_patterns)
            self.callback(entry)
    
    def _extract_timestamp(self, line: str) -> datetime:
        """Extract timestamp from log line or return current time."""
        # Common timestamp formats
        timestamp_patterns = [
            r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}',  # 2023-01-01 12:00:00
            r'\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2}',  # 01/01/2023 12:00:00
            r'\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\]'  # [2023-01-01 12:00:00]
        ]
        
        for pattern in timestamp_patterns:
            match = re.search(pattern, line)
            if match:
                try:
                    ts_str = match.group(0).strip('[]')
                    return datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
                except ValueError:
                    continue
                try:
                    ts_str = match.group(0).strip('[]')
                    return datetime.strptime(ts_str, '%m/%d/%Y %H:%M:%S')
                except ValueError:
                    continue
                    
        # If no timestamp found, return current time
        return datetime.now()

# Example usage
def alert_callback(entry: LogEntry):
    """Callback function to handle matched log entries."""
    print(f"[ALERT] {entry.timestamp.strftime('%Y-%m-%d %H:%M:%S')} - Matched patterns: {entry.matched_patterns}")
    print(f"        {entry.line}\n")

if __name__ == "__main__":
    # Define patterns to watch for
    patterns_to_watch = [
        r'ERROR',           # Any line containing ERROR
        r'Exception',       # Any line containing Exception
        r'failed',          # Any line containing failed
        r'\b500\b',         # HTTP 500 errors (word boundary)
        r'Authentication.*failed'  # Authentication failures
    ]
    
    # Create and start monitor
    log_file_path = Path("application.log")
    monitor = LogMonitor(
        log_file=log_file_path,
        patterns=patterns_to_watch,
        callback=alert_callback
    )
    
    # Start monitoring (will run until interrupted with Ctrl+C)
    monitor.start(poll_interval=2.0)

Explanation

This Log Monitor is a practical tool that watches log files in real-time and alerts you when specific patterns appear. It’s particularly useful for system administrators, developers, and DevOps engineers who need to respond quickly to critical events in application logs.

What It Does

  1. Real-time Monitoring: Continuously watches a specified log file for new entries
  2. Pattern Matching: Checks each new line against user-defined regular expressions
  3. Smart Timestamp Detection: Automatically parses common timestamp formats from log entries
  4. Log Rotation Handling: Gracefully handles when log files are rotated/truncated
  5. Customizable Alerts: Triggers a callback function for each matching entry with detailed information
  6. Robust File Handling: Works with existing files, waits for files to be created, and handles encoding issues

Key Features

How to Use

  1. Define Your Patterns: Specify the regular expressions for the events you want to catch
  2. Create a Callback: Implement a function to handle matched entries (email alerts, logging, etc.)
  3. Initialize Monitor: Create a LogMonitor instance with your log file, patterns, and callback
  4. Start Monitoring: Call start() to begin watching the file

Example Use Cases

The monitor will run continuously until stopped, making it suitable for long-running operations. It’s especially valuable in production environments where immediate response to errors or security events is critical.