import time
import re
from pathlib import Path
from typing import Callable, List, Optional
from dataclasses import dataclass
from threading import Thread, Event
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
@dataclass
class LogAlert:
pattern: str
description: str
callback: Optional[Callable] = None
email_notification: bool = False
class LogMonitor:
def __init__(self, log_file_path: str, alerts: List[LogAlert]):
self.log_file_path = Path(log_file_path)
self.alerts = alerts
self.stop_event = Event()
self.last_position = 0
# Email configuration (configure these)
self.smtp_server = "smtp.gmail.com"
self.smtp_port = 587
self.email_user = "your_email@gmail.com"
self.email_password = "your_app_password"
self.alert_recipient = "alert_recipient@gmail.com"
def _send_email_alert(self, subject: str, message: str):
"""Send email notification for critical alerts"""
try:
msg = MIMEMultipart()
msg['From'] = self.email_user
msg['To'] = self.alert_recipient
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
server.login(self.email_user, self.email_password)
server.send_message(msg)
server.quit()
print(f"π§ Email alert sent: {subject}")
except Exception as e:
print(f"β Failed to send email: {e}")
def _process_line(self, line: str, line_number: int):
"""Process each log line and check for pattern matches"""
for alert in self.alerts:
if re.search(alert.pattern, line):
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
alert_message = f"[{timestamp}] ALERT: {alert.description} at line {line_number}"
print(f"π¨ {alert_message}")
print(f" Content: {line.strip()}")
# Execute custom callback if provided
if alert.callback:
try:
alert.callback(line, line_number)
except Exception as e:
print(f"β Callback error: {e}")
# Send email notification if configured
if alert.email_notification:
self._send_email_alert(
f"Log Alert: {alert.description}",
f"Pattern matched in {self.log_file_path}\n"
f"Line {line_number}: {line.strip()}\n"
f"Pattern: {alert.pattern}"
)
def _get_file_size(self) -> int:
"""Get current file size"""
try:
return self.log_file_path.stat().st_size
except FileNotFoundError:
return 0
def start_monitoring(self):
"""Start monitoring the log file in real-time"""
print(f"π Starting log monitor for: {self.log_file_path}")
# Initialize file position
if self.log_file_path.exists():
self.last_position = self._get_file_size()
print(f"π Log file found. Starting from position: {self.last_position}")
else:
print("β οΈ Log file not found. Waiting for it to be created...")
line_number = 0
while not self.stop_event.is_set():
try:
# Check if file exists
if not self.log_file_path.exists():
time.sleep(1)
continue
# Check file size
current_size = self._get_file_size()
# Handle file rotation (when file becomes smaller)
if current_size < self.last_position:
print("π Log file rotation detected")
self.last_position = 0
# Read new content if file grew
if current_size > self.last_position:
with open(self.log_file_path, 'r', encoding='utf-8', errors='ignore') as file:
file.seek(self.last_position)
while not self.stop_event.is_set():
line = file.readline()
if not line:
break
line_number += 1
self._process_line(line, line_number)
self.last_position = file.tell()
time.sleep(0.5) # Check every 500ms
except Exception as e:
print(f"β Error monitoring log file: {e}")
time.sleep(1)
def stop_monitoring(self):
"""Stop the log monitoring"""
print("π Stopping log monitor...")
self.stop_event.set()
# Example usage and custom alert handlers
def handle_error_alert(line: str, line_number: int):
"""Custom handler for error alerts"""
# Could log to database, trigger webhook, etc.
print(f"π§ Custom error handler triggered for line {line_number}")
def handle_security_alert(line: str, line_number: int):
"""Custom handler for security alerts"""
# Could block IP, notify security team, etc.
print(f"π Security alert handler triggered for line {line_number}")
# Define alert patterns
log_alerts = [
LogAlert(
pattern=r"ERROR|CRITICAL|FATAL",
description="Application error detected",
callback=handle_error_alert,
email_notification=True
),
LogAlert(
pattern=r"SECURITY|UNAUTHORIZED|INVALID_LOGIN",
description="Security breach attempt",
callback=handle_security_alert,
email_notification=True
),
LogAlert(
pattern=r"WARNING|WARN",
description="Warning message",
email_notification=False
)
]
# Example usage
if __name__ == "__main__":
# Create log monitor instance
monitor = LogMonitor("/var/log/application.log", log_alerts)
# Start monitoring in a separate thread
monitor_thread = Thread(target=monitor.start_monitoring)
monitor_thread.daemon = True
monitor_thread.start()
try:
# Keep main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nπ Keyboard interrupt received")
monitor.stop_monitoring()
monitor_thread.join(timeout=2)
print("π Log monitoring stopped")
This real-time log monitoring system watches log files for specific patterns and triggers alerts when those patterns are detected. Itβs particularly useful for system administrators, DevOps engineers, and developers who need to be notified immediately when critical events occur in their applications.
LogMonitor
class takes a log file path and a list of LogAlert
configurationsBefore using this code, youβll need to:
smtp_server
, email_user
, email_password
, and alert_recipient
LogAlert
objects with patterns and descriptionsThe system provides immediate feedback through console output and can notify you via email, making it an essential tool for maintaining system reliability and security.