Python Snippets

Real-time Log File Monitor with Pattern Matching and Alert System

import time
import re
from pathlib import Path
from typing import Callable, List, Optional
from dataclasses import dataclass
from threading import Thread, Event
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

@dataclass
class LogAlert:
    pattern: str
    description: str
    callback: Optional[Callable] = None
    email_notification: bool = False

class LogMonitor:
    def __init__(self, log_file_path: str, alerts: List[LogAlert]):
        self.log_file_path = Path(log_file_path)
        self.alerts = alerts
        self.stop_event = Event()
        self.last_position = 0
        
        # Email configuration (configure these)
        self.smtp_server = "smtp.gmail.com"
        self.smtp_port = 587
        self.email_user = "your_email@gmail.com"
        self.email_password = "your_app_password"
        self.alert_recipient = "alert_recipient@gmail.com"
        
    def _send_email_alert(self, subject: str, message: str):
        """Send email notification for critical alerts"""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.email_user
            msg['To'] = self.alert_recipient
            msg['Subject'] = subject
            
            msg.attach(MIMEText(message, 'plain'))
            
            server = smtplib.SMTP(self.smtp_server, self.smtp_port)
            server.starttls()
            server.login(self.email_user, self.email_password)
            server.send_message(msg)
            server.quit()
            print(f"πŸ“§ Email alert sent: {subject}")
        except Exception as e:
            print(f"❌ Failed to send email: {e}")
    
    def _process_line(self, line: str, line_number: int):
        """Process each log line and check for pattern matches"""
        for alert in self.alerts:
            if re.search(alert.pattern, line):
                timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
                alert_message = f"[{timestamp}] ALERT: {alert.description} at line {line_number}"
                print(f"🚨 {alert_message}")
                print(f"   Content: {line.strip()}")
                
                # Execute custom callback if provided
                if alert.callback:
                    try:
                        alert.callback(line, line_number)
                    except Exception as e:
                        print(f"❌ Callback error: {e}")
                
                # Send email notification if configured
                if alert.email_notification:
                    self._send_email_alert(
                        f"Log Alert: {alert.description}",
                        f"Pattern matched in {self.log_file_path}\n"
                        f"Line {line_number}: {line.strip()}\n"
                        f"Pattern: {alert.pattern}"
                    )
    
    def _get_file_size(self) -> int:
        """Get current file size"""
        try:
            return self.log_file_path.stat().st_size
        except FileNotFoundError:
            return 0
    
    def start_monitoring(self):
        """Start monitoring the log file in real-time"""
        print(f"πŸ” Starting log monitor for: {self.log_file_path}")
        
        # Initialize file position
        if self.log_file_path.exists():
            self.last_position = self._get_file_size()
            print(f"πŸ“„ Log file found. Starting from position: {self.last_position}")
        else:
            print("⚠️  Log file not found. Waiting for it to be created...")
        
        line_number = 0
        
        while not self.stop_event.is_set():
            try:
                # Check if file exists
                if not self.log_file_path.exists():
                    time.sleep(1)
                    continue
                
                # Check file size
                current_size = self._get_file_size()
                
                # Handle file rotation (when file becomes smaller)
                if current_size < self.last_position:
                    print("πŸ”„ Log file rotation detected")
                    self.last_position = 0
                
                # Read new content if file grew
                if current_size > self.last_position:
                    with open(self.log_file_path, 'r', encoding='utf-8', errors='ignore') as file:
                        file.seek(self.last_position)
                        while not self.stop_event.is_set():
                            line = file.readline()
                            if not line:
                                break
                            line_number += 1
                            self._process_line(line, line_number)
                        self.last_position = file.tell()
                
                time.sleep(0.5)  # Check every 500ms
                
            except Exception as e:
                print(f"❌ Error monitoring log file: {e}")
                time.sleep(1)
    
    def stop_monitoring(self):
        """Stop the log monitoring"""
        print("πŸ›‘ Stopping log monitor...")
        self.stop_event.set()

# Example usage and custom alert handlers
def handle_error_alert(line: str, line_number: int):
    """Custom handler for error alerts"""
    # Could log to database, trigger webhook, etc.
    print(f"πŸ”§ Custom error handler triggered for line {line_number}")

def handle_security_alert(line: str, line_number: int):
    """Custom handler for security alerts"""
    # Could block IP, notify security team, etc.
    print(f"πŸ”’ Security alert handler triggered for line {line_number}")

# Define alert patterns
log_alerts = [
    LogAlert(
        pattern=r"ERROR|CRITICAL|FATAL",
        description="Application error detected",
        callback=handle_error_alert,
        email_notification=True
    ),
    LogAlert(
        pattern=r"SECURITY|UNAUTHORIZED|INVALID_LOGIN",
        description="Security breach attempt",
        callback=handle_security_alert,
        email_notification=True
    ),
    LogAlert(
        pattern=r"WARNING|WARN",
        description="Warning message",
        email_notification=False
    )
]

# Example usage
if __name__ == "__main__":
    # Create log monitor instance
    monitor = LogMonitor("/var/log/application.log", log_alerts)
    
    # Start monitoring in a separate thread
    monitor_thread = Thread(target=monitor.start_monitoring)
    monitor_thread.daemon = True
    monitor_thread.start()
    
    try:
        # Keep main thread alive
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nπŸ›‘ Keyboard interrupt received")
        monitor.stop_monitoring()
        monitor_thread.join(timeout=2)
        print("πŸ‘‹ Log monitoring stopped")

What This Code Does

This real-time log monitoring system watches log files for specific patterns and triggers alerts when those patterns are detected. It’s particularly useful for system administrators, DevOps engineers, and developers who need to be notified immediately when critical events occur in their applications.

Key Features

  1. Real-time Monitoring: Continuously watches log files for new content
  2. Pattern Matching: Uses regular expressions to detect specific log entries
  3. Custom Callbacks: Execute custom functions when patterns are matched
  4. Email Notifications: Send email alerts for critical issues
  5. Log Rotation Handling: Automatically handles when log files are rotated
  6. Thread Safety: Runs in the background without blocking the main application

How It Works

  1. The LogMonitor class takes a log file path and a list of LogAlert configurations
  2. It continuously checks the log file for new content
  3. When new lines are detected, it checks each against the defined patterns
  4. If a match is found, it triggers alerts, executes callbacks, and optionally sends emails
  5. The system handles file rotation and continues monitoring seamlessly

Configuration

Before using this code, you’ll need to:

  1. Set up email configuration:
    • Update smtp_server, email_user, email_password, and alert_recipient
    • For Gmail, use an app password instead of your regular password
  2. Define your alerts:
    • Create LogAlert objects with patterns and descriptions
    • Add custom callbacks for specific handling logic
    • Enable email notifications for critical alerts
  3. Set the log file path:
    • Point to the log file you want to monitor

Use Cases

The system provides immediate feedback through console output and can notify you via email, making it an essential tool for maintaining system reliability and security.