Enhance the syslog parser with auto-reconnect, error recovery, and integrated health metrics logging. Add a cron job for automated health checks and restarts. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 4885eae4-ffc7-4601-8f1c-5414922d5350 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/AXTUZmH
361 lines
13 KiB
Python
361 lines
13 KiB
Python
"""
|
|
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
|
|
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
|
|
"""
|
|
|
|
import re
|
|
import psycopg2
|
|
from datetime import datetime
|
|
from typing import Dict, Optional
|
|
import os
|
|
import time
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
class SyslogParser:
|
|
"""
|
|
Parser per log MikroTik in formato syslog
|
|
Estrae informazioni di rete e salva in database
|
|
"""
|
|
|
|
def __init__(self, db_config: Dict[str, str]):
|
|
self.db_config = db_config
|
|
self.conn = None
|
|
self.cursor = None
|
|
|
|
# Pattern regex per parsare log MikroTik (formato reale)
|
|
# Esempio: Nov 17 16:52:16 FIBRA forward: ... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
|
|
# Esempio: Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
|
|
|
|
self.main_pattern = re.compile(
|
|
r'(?P<action>forward|detected-ddos forward):.*?'
|
|
r'proto (?P<proto>UDP|TCP|ICMP)(?:\s+\((?P<tcp_flags>[^)]+)\))?.*?'
|
|
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*?'
|
|
r'len (?P<len>\d+)',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
def connect_db(self):
|
|
"""Connessione al database PostgreSQL"""
|
|
try:
|
|
self.conn = psycopg2.connect(**self.db_config)
|
|
self.cursor = self.conn.cursor()
|
|
print("[INFO] Connesso a PostgreSQL")
|
|
except Exception as e:
|
|
print(f"[ERROR] Connessione database fallita: {e}")
|
|
raise
|
|
|
|
def disconnect_db(self):
|
|
"""Chiusura connessione database"""
|
|
if self.cursor:
|
|
self.cursor.close()
|
|
if self.conn:
|
|
self.conn.close()
|
|
print("[INFO] Disconnesso da PostgreSQL")
|
|
|
|
def parse_log_line(self, line: str) -> Optional[Dict]:
|
|
"""
|
|
Analizza una singola riga di log MikroTik
|
|
Returns: Dict con dati parsati o None se non parsabile
|
|
|
|
Formato reale:
|
|
Nov 17 16:52:16 FIBRA forward: in:... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
|
|
Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
|
|
"""
|
|
# Estrai timestamp, hostname, messaggio
|
|
# Formato: Nov 17 16:52:16 FIBRA forward: ...
|
|
parts = line.split(None, 4)
|
|
if len(parts) < 5:
|
|
return None
|
|
|
|
month, day, time_str, hostname, message = parts
|
|
|
|
# Parse timestamp (usa anno corrente)
|
|
current_year = datetime.now().year
|
|
try:
|
|
timestamp = datetime.strptime(
|
|
f"{current_year} {month} {day} {time_str}",
|
|
"%Y %b %d %H:%M:%S"
|
|
)
|
|
except ValueError:
|
|
return None
|
|
|
|
# Match pattern principale
|
|
match = self.main_pattern.search(message)
|
|
if match:
|
|
data = match.groupdict()
|
|
|
|
# Aggiungi metadati
|
|
data['timestamp'] = timestamp
|
|
data['router_name'] = hostname
|
|
data['raw_message'] = line.strip()
|
|
|
|
# Determina action finale
|
|
action = data['action']
|
|
if 'detected-ddos' in action:
|
|
data['action'] = 'ddos'
|
|
else:
|
|
data['action'] = 'forward'
|
|
|
|
# Converti numeri
|
|
for key in ['src_port', 'dst_port', 'len']:
|
|
if key in data and data[key]:
|
|
data[key] = int(data[key])
|
|
|
|
return data
|
|
|
|
return None
|
|
|
|
def save_to_db(self, log_data: Dict):
|
|
"""Salva log parsato nel database"""
|
|
try:
|
|
query = """
|
|
INSERT INTO network_logs
|
|
(timestamp, router_name, source_ip, source_port, destination_ip,
|
|
destination_port, protocol, packet_length, action, raw_message)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
"""
|
|
|
|
self.cursor.execute(query, (
|
|
log_data.get('timestamp'),
|
|
log_data.get('router_name'),
|
|
log_data.get('src_ip'),
|
|
log_data.get('src_port'),
|
|
log_data.get('dst_ip'),
|
|
log_data.get('dst_port'),
|
|
log_data.get('proto', 'unknown').lower(),
|
|
log_data.get('len') or log_data.get('bytes'),
|
|
log_data.get('action', 'unknown'),
|
|
log_data.get('raw_message')
|
|
))
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore salvataggio log: {e}")
|
|
self.conn.rollback()
|
|
|
|
def cleanup_old_logs(self, days_to_keep: int = 3):
|
|
"""
|
|
Elimina log più vecchi di X giorni dal database
|
|
Fix: usa concatenazione stringa per INTERVAL (psycopg2 non supporta placeholder in literal)
|
|
"""
|
|
try:
|
|
# Fix: Concatena days in stringa SQL (non literal placeholder)
|
|
query = """
|
|
DELETE FROM network_logs
|
|
WHERE timestamp < NOW() - (%s || ' days')::interval
|
|
"""
|
|
self.cursor.execute(query, (str(days_to_keep),))
|
|
deleted_count = self.cursor.rowcount
|
|
self.conn.commit()
|
|
|
|
if deleted_count > 0:
|
|
print(f"[CLEANUP] ✅ Eliminati {deleted_count} log più vecchi di {days_to_keep} giorni")
|
|
|
|
return deleted_count
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore cleanup log vecchi: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
self.conn.rollback()
|
|
return 0
|
|
|
|
def process_log_file(self, log_file: str, follow: bool = False):
|
|
"""
|
|
Processa file di log in modalità streaming (sicuro con rsyslog)
|
|
follow: se True, segue il file come 'tail -f'
|
|
|
|
Resilient Features v2.0:
|
|
- Auto-reconnect on DB timeout
|
|
- Error recovery (continues after exceptions)
|
|
- Health metrics logging
|
|
"""
|
|
print(f"[INFO] Processando {log_file} (follow={follow})")
|
|
|
|
processed = 0
|
|
saved = 0
|
|
cleanup_counter = 0
|
|
errors = 0
|
|
last_health_check = time.time()
|
|
|
|
try:
|
|
with open(log_file, 'r') as f:
|
|
# Se follow, vai alla fine del file
|
|
if follow:
|
|
f.seek(0, 2) # Seek to end
|
|
|
|
while True:
|
|
try:
|
|
line = f.readline()
|
|
|
|
if not line:
|
|
if follow:
|
|
time.sleep(0.1) # Attendi nuove righe
|
|
|
|
# Health check ogni 5 minuti
|
|
if time.time() - last_health_check > 300:
|
|
print(f"[HEALTH] Parser alive: {processed} righe processate, {saved} salvate, {errors} errori")
|
|
last_health_check = time.time()
|
|
|
|
# Commit batch ogni 100 righe processate
|
|
if processed > 0 and processed % 100 == 0:
|
|
try:
|
|
self.conn.commit()
|
|
except Exception as commit_err:
|
|
print(f"[ERROR] Commit failed, reconnecting: {commit_err}")
|
|
self.reconnect_db()
|
|
|
|
# Cleanup DB ogni ~16 minuti
|
|
cleanup_counter += 1
|
|
if cleanup_counter >= 10000:
|
|
self.cleanup_old_logs(days_to_keep=3)
|
|
cleanup_counter = 0
|
|
|
|
continue
|
|
else:
|
|
break # Fine file
|
|
|
|
processed += 1
|
|
|
|
# Parsa riga
|
|
log_data = self.parse_log_line(line.strip())
|
|
if log_data:
|
|
try:
|
|
self.save_to_db(log_data)
|
|
saved += 1
|
|
except Exception as save_err:
|
|
errors += 1
|
|
print(f"[ERROR] Save failed: {save_err}")
|
|
# Try to reconnect and continue
|
|
try:
|
|
self.reconnect_db()
|
|
except:
|
|
pass
|
|
|
|
# Commit ogni 100 righe
|
|
if processed % 100 == 0:
|
|
try:
|
|
self.conn.commit()
|
|
if saved > 0:
|
|
print(f"[INFO] Processate {processed} righe, salvate {saved} log, {errors} errori")
|
|
except Exception as commit_err:
|
|
print(f"[ERROR] Commit failed: {commit_err}")
|
|
self.reconnect_db()
|
|
|
|
except Exception as line_err:
|
|
errors += 1
|
|
if errors % 100 == 0:
|
|
print(f"[ERROR] Error processing line ({errors} total errors): {line_err}")
|
|
# Continue processing instead of crashing!
|
|
continue
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n[INFO] Interrotto dall'utente")
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore critico processamento file: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
try:
|
|
self.conn.commit()
|
|
except:
|
|
pass
|
|
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati, {errors} errori")
|
|
|
|
def reconnect_db(self):
|
|
"""
|
|
Riconnette al database (auto-recovery on connection timeout)
|
|
"""
|
|
print("[INFO] Tentativo riconnessione database...")
|
|
try:
|
|
self.disconnect_db()
|
|
except:
|
|
pass
|
|
|
|
time.sleep(2)
|
|
|
|
try:
|
|
self.connect_db()
|
|
print("[INFO] ✅ Riconnessione database riuscita")
|
|
except Exception as e:
|
|
print(f"[ERROR] ❌ Riconnessione fallita: {e}")
|
|
raise
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
print("[DEBUG] Avvio syslog_parser...")
|
|
|
|
# Carica variabili d'ambiente da .env
|
|
print("[DEBUG] Caricamento .env da /opt/ids/.env...")
|
|
load_dotenv("/opt/ids/.env")
|
|
print("[DEBUG] .env caricato")
|
|
|
|
# Configurazione database da environment
|
|
# IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4
|
|
db_config = {
|
|
'host': os.getenv('PGHOST', '127.0.0.1'),
|
|
'port': os.getenv('PGPORT', '5432'),
|
|
'database': os.getenv('PGDATABASE', 'ids_database'),
|
|
'user': os.getenv('PGUSER', 'ids_user'),
|
|
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
|
|
}
|
|
|
|
print(f"[DEBUG] Configurazione database:")
|
|
print(f"[DEBUG] Host: {db_config['host']}")
|
|
print(f"[DEBUG] Port: {db_config['port']}")
|
|
print(f"[DEBUG] Database: {db_config['database']}")
|
|
print(f"[DEBUG] User: {db_config['user']}")
|
|
|
|
# File log da processare
|
|
log_file = '/var/log/mikrotik/raw.log'
|
|
print(f"[DEBUG] File log: {log_file}")
|
|
|
|
# Verifica esistenza file
|
|
if not os.path.exists(log_file):
|
|
print(f"[ERROR] File log non trovato: {log_file}")
|
|
print(f"[ERROR] Verifica che rsyslog sia configurato correttamente")
|
|
return
|
|
|
|
print(f"[INFO] File log trovato: {log_file}")
|
|
|
|
# Crea parser
|
|
print("[DEBUG] Creazione parser...")
|
|
parser = SyslogParser(db_config)
|
|
|
|
try:
|
|
# Connetti al database
|
|
print("[DEBUG] Connessione database...")
|
|
parser.connect_db()
|
|
|
|
# Processa file in modalità follow (tail -f) - SICURO con rsyslog
|
|
# Cleanup automatico DB ogni ~16 minuti
|
|
print("[INFO] Avvio processamento log (modalità streaming sicura)...")
|
|
print("[INFO] - Modalità follow (tail -f) compatibile con rsyslog")
|
|
print("[INFO] - Auto-cleanup log > 3 giorni dal database (ogni ~16 min)")
|
|
print("[INFO] - Commit batch ogni 100 righe")
|
|
parser.process_log_file(log_file, follow=True)
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore critico: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
print("[DEBUG] Chiusura connessione database...")
|
|
parser.disconnect_db()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
|
|
print("Pressione Ctrl+C per interrompere\n")
|
|
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n[INFO] Terminazione da utente (Ctrl+C)")
|
|
except Exception as e:
|
|
print(f"\n[ERROR] Errore fatale: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|