ids.alfacom.it/python_ml/syslog_parser.py
marco370 14d67c63a3 Improve syslog parser reliability and add monitoring
Enhance the syslog parser with auto-reconnect, error recovery, and integrated health metrics logging. Add a cron job for automated health checks and restarts.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 4885eae4-ffc7-4601-8f1c-5414922d5350
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/AXTUZmH
2025-11-25 09:09:21 +00:00

361 lines
13 KiB
Python

"""
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
"""
import re
import psycopg2
from datetime import datetime
from typing import Dict, Optional
import os
import time
from dotenv import load_dotenv
class SyslogParser:
"""
Parser per log MikroTik in formato syslog
Estrae informazioni di rete e salva in database
"""
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
self.conn = None
self.cursor = None
# Pattern regex per parsare log MikroTik (formato reale)
# Esempio: Nov 17 16:52:16 FIBRA forward: ... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
# Esempio: Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
self.main_pattern = re.compile(
r'(?P<action>forward|detected-ddos forward):.*?'
r'proto (?P<proto>UDP|TCP|ICMP)(?:\s+\((?P<tcp_flags>[^)]+)\))?.*?'
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*?'
r'len (?P<len>\d+)',
re.IGNORECASE
)
def connect_db(self):
"""Connessione al database PostgreSQL"""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cursor = self.conn.cursor()
print("[INFO] Connesso a PostgreSQL")
except Exception as e:
print(f"[ERROR] Connessione database fallita: {e}")
raise
def disconnect_db(self):
"""Chiusura connessione database"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("[INFO] Disconnesso da PostgreSQL")
def parse_log_line(self, line: str) -> Optional[Dict]:
"""
Analizza una singola riga di log MikroTik
Returns: Dict con dati parsati o None se non parsabile
Formato reale:
Nov 17 16:52:16 FIBRA forward: in:... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
"""
# Estrai timestamp, hostname, messaggio
# Formato: Nov 17 16:52:16 FIBRA forward: ...
parts = line.split(None, 4)
if len(parts) < 5:
return None
month, day, time_str, hostname, message = parts
# Parse timestamp (usa anno corrente)
current_year = datetime.now().year
try:
timestamp = datetime.strptime(
f"{current_year} {month} {day} {time_str}",
"%Y %b %d %H:%M:%S"
)
except ValueError:
return None
# Match pattern principale
match = self.main_pattern.search(message)
if match:
data = match.groupdict()
# Aggiungi metadati
data['timestamp'] = timestamp
data['router_name'] = hostname
data['raw_message'] = line.strip()
# Determina action finale
action = data['action']
if 'detected-ddos' in action:
data['action'] = 'ddos'
else:
data['action'] = 'forward'
# Converti numeri
for key in ['src_port', 'dst_port', 'len']:
if key in data and data[key]:
data[key] = int(data[key])
return data
return None
def save_to_db(self, log_data: Dict):
"""Salva log parsato nel database"""
try:
query = """
INSERT INTO network_logs
(timestamp, router_name, source_ip, source_port, destination_ip,
destination_port, protocol, packet_length, action, raw_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
"""
self.cursor.execute(query, (
log_data.get('timestamp'),
log_data.get('router_name'),
log_data.get('src_ip'),
log_data.get('src_port'),
log_data.get('dst_ip'),
log_data.get('dst_port'),
log_data.get('proto', 'unknown').lower(),
log_data.get('len') or log_data.get('bytes'),
log_data.get('action', 'unknown'),
log_data.get('raw_message')
))
except Exception as e:
print(f"[ERROR] Errore salvataggio log: {e}")
self.conn.rollback()
def cleanup_old_logs(self, days_to_keep: int = 3):
"""
Elimina log più vecchi di X giorni dal database
Fix: usa concatenazione stringa per INTERVAL (psycopg2 non supporta placeholder in literal)
"""
try:
# Fix: Concatena days in stringa SQL (non literal placeholder)
query = """
DELETE FROM network_logs
WHERE timestamp < NOW() - (%s || ' days')::interval
"""
self.cursor.execute(query, (str(days_to_keep),))
deleted_count = self.cursor.rowcount
self.conn.commit()
if deleted_count > 0:
print(f"[CLEANUP] ✅ Eliminati {deleted_count} log più vecchi di {days_to_keep} giorni")
return deleted_count
except Exception as e:
print(f"[ERROR] Errore cleanup log vecchi: {e}")
import traceback
traceback.print_exc()
self.conn.rollback()
return 0
def process_log_file(self, log_file: str, follow: bool = False):
"""
Processa file di log in modalità streaming (sicuro con rsyslog)
follow: se True, segue il file come 'tail -f'
Resilient Features v2.0:
- Auto-reconnect on DB timeout
- Error recovery (continues after exceptions)
- Health metrics logging
"""
print(f"[INFO] Processando {log_file} (follow={follow})")
processed = 0
saved = 0
cleanup_counter = 0
errors = 0
last_health_check = time.time()
try:
with open(log_file, 'r') as f:
# Se follow, vai alla fine del file
if follow:
f.seek(0, 2) # Seek to end
while True:
try:
line = f.readline()
if not line:
if follow:
time.sleep(0.1) # Attendi nuove righe
# Health check ogni 5 minuti
if time.time() - last_health_check > 300:
print(f"[HEALTH] Parser alive: {processed} righe processate, {saved} salvate, {errors} errori")
last_health_check = time.time()
# Commit batch ogni 100 righe processate
if processed > 0 and processed % 100 == 0:
try:
self.conn.commit()
except Exception as commit_err:
print(f"[ERROR] Commit failed, reconnecting: {commit_err}")
self.reconnect_db()
# Cleanup DB ogni ~16 minuti
cleanup_counter += 1
if cleanup_counter >= 10000:
self.cleanup_old_logs(days_to_keep=3)
cleanup_counter = 0
continue
else:
break # Fine file
processed += 1
# Parsa riga
log_data = self.parse_log_line(line.strip())
if log_data:
try:
self.save_to_db(log_data)
saved += 1
except Exception as save_err:
errors += 1
print(f"[ERROR] Save failed: {save_err}")
# Try to reconnect and continue
try:
self.reconnect_db()
except:
pass
# Commit ogni 100 righe
if processed % 100 == 0:
try:
self.conn.commit()
if saved > 0:
print(f"[INFO] Processate {processed} righe, salvate {saved} log, {errors} errori")
except Exception as commit_err:
print(f"[ERROR] Commit failed: {commit_err}")
self.reconnect_db()
except Exception as line_err:
errors += 1
if errors % 100 == 0:
print(f"[ERROR] Error processing line ({errors} total errors): {line_err}")
# Continue processing instead of crashing!
continue
except KeyboardInterrupt:
print("\n[INFO] Interrotto dall'utente")
except Exception as e:
print(f"[ERROR] Errore critico processamento file: {e}")
import traceback
traceback.print_exc()
finally:
try:
self.conn.commit()
except:
pass
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati, {errors} errori")
def reconnect_db(self):
"""
Riconnette al database (auto-recovery on connection timeout)
"""
print("[INFO] Tentativo riconnessione database...")
try:
self.disconnect_db()
except:
pass
time.sleep(2)
try:
self.connect_db()
print("[INFO] ✅ Riconnessione database riuscita")
except Exception as e:
print(f"[ERROR] ❌ Riconnessione fallita: {e}")
raise
def main():
"""Main entry point"""
print("[DEBUG] Avvio syslog_parser...")
# Carica variabili d'ambiente da .env
print("[DEBUG] Caricamento .env da /opt/ids/.env...")
load_dotenv("/opt/ids/.env")
print("[DEBUG] .env caricato")
# Configurazione database da environment
# IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4
db_config = {
'host': os.getenv('PGHOST', '127.0.0.1'),
'port': os.getenv('PGPORT', '5432'),
'database': os.getenv('PGDATABASE', 'ids_database'),
'user': os.getenv('PGUSER', 'ids_user'),
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
}
print(f"[DEBUG] Configurazione database:")
print(f"[DEBUG] Host: {db_config['host']}")
print(f"[DEBUG] Port: {db_config['port']}")
print(f"[DEBUG] Database: {db_config['database']}")
print(f"[DEBUG] User: {db_config['user']}")
# File log da processare
log_file = '/var/log/mikrotik/raw.log'
print(f"[DEBUG] File log: {log_file}")
# Verifica esistenza file
if not os.path.exists(log_file):
print(f"[ERROR] File log non trovato: {log_file}")
print(f"[ERROR] Verifica che rsyslog sia configurato correttamente")
return
print(f"[INFO] File log trovato: {log_file}")
# Crea parser
print("[DEBUG] Creazione parser...")
parser = SyslogParser(db_config)
try:
# Connetti al database
print("[DEBUG] Connessione database...")
parser.connect_db()
# Processa file in modalità follow (tail -f) - SICURO con rsyslog
# Cleanup automatico DB ogni ~16 minuti
print("[INFO] Avvio processamento log (modalità streaming sicura)...")
print("[INFO] - Modalità follow (tail -f) compatibile con rsyslog")
print("[INFO] - Auto-cleanup log > 3 giorni dal database (ogni ~16 min)")
print("[INFO] - Commit batch ogni 100 righe")
parser.process_log_file(log_file, follow=True)
except Exception as e:
print(f"[ERROR] Errore critico: {e}")
import traceback
traceback.print_exc()
finally:
print("[DEBUG] Chiusura connessione database...")
parser.disconnect_db()
if __name__ == "__main__":
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
print("Pressione Ctrl+C per interrompere\n")
try:
main()
except KeyboardInterrupt:
print("\n[INFO] Terminazione da utente (Ctrl+C)")
except Exception as e:
print(f"\n[ERROR] Errore fatale: {e}")
import traceback
traceback.print_exc()