ids.alfacom.it/python_ml/syslog_parser.py
marco370 46ab780e60 Add detailed logging to track the syslog parser's execution and state
Enhance syslog_parser.py with debug, info, and error logging statements, including checks for log file existence and robust error handling for database connections and parsing.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1663254c-86fe-4c1e-966a-168eb8cd8f97
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/MkBJZ0L
2025-11-17 17:46:37 +00:00

257 lines
8.7 KiB
Python

"""
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
"""
import re
import psycopg2
from datetime import datetime
from typing import Dict, Optional
import os
import time
from dotenv import load_dotenv
class SyslogParser:
"""
Parser per log MikroTik in formato syslog
Estrae informazioni di rete e salva in database
"""
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
self.conn = None
self.cursor = None
# Pattern regex per parsare log MikroTik (formato reale)
# Esempio: Nov 17 16:52:16 FIBRA forward: ... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
# Esempio: Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
self.main_pattern = re.compile(
r'(?P<action>forward|detected-ddos forward):.*?'
r'proto (?P<proto>UDP|TCP|ICMP)(?:\s+\((?P<tcp_flags>[^)]+)\))?.*?'
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*?'
r'len (?P<len>\d+)',
re.IGNORECASE
)
def connect_db(self):
"""Connessione al database PostgreSQL"""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cursor = self.conn.cursor()
print("[INFO] Connesso a PostgreSQL")
except Exception as e:
print(f"[ERROR] Connessione database fallita: {e}")
raise
def disconnect_db(self):
"""Chiusura connessione database"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("[INFO] Disconnesso da PostgreSQL")
def parse_log_line(self, line: str) -> Optional[Dict]:
"""
Analizza una singola riga di log MikroTik
Returns: Dict con dati parsati o None se non parsabile
Formato reale:
Nov 17 16:52:16 FIBRA forward: in:... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
"""
# Estrai timestamp, hostname, messaggio
# Formato: Nov 17 16:52:16 FIBRA forward: ...
parts = line.split(None, 4)
if len(parts) < 5:
return None
month, day, time_str, hostname, message = parts
# Parse timestamp (usa anno corrente)
current_year = datetime.now().year
try:
timestamp = datetime.strptime(
f"{current_year} {month} {day} {time_str}",
"%Y %b %d %H:%M:%S"
)
except ValueError:
return None
# Match pattern principale
match = self.main_pattern.search(message)
if match:
data = match.groupdict()
# Aggiungi metadati
data['timestamp'] = timestamp
data['router_name'] = hostname
data['raw_message'] = line.strip()
# Determina action finale
action = data['action']
if 'detected-ddos' in action:
data['action'] = 'ddos'
else:
data['action'] = 'forward'
# Converti numeri
for key in ['src_port', 'dst_port', 'len']:
if key in data and data[key]:
data[key] = int(data[key])
return data
return None
def save_to_db(self, log_data: Dict):
"""Salva log parsato nel database"""
try:
query = """
INSERT INTO network_logs
(timestamp, router_name, source_ip, source_port, destination_ip,
destination_port, protocol, packet_length, action, raw_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
"""
self.cursor.execute(query, (
log_data.get('timestamp'),
log_data.get('router_name'),
log_data.get('src_ip'),
log_data.get('src_port'),
log_data.get('dst_ip'),
log_data.get('dst_port'),
log_data.get('proto', 'unknown').lower(),
log_data.get('len') or log_data.get('bytes'),
log_data.get('action', 'unknown'),
log_data.get('raw_message')
))
except Exception as e:
print(f"[ERROR] Errore salvataggio log: {e}")
self.conn.rollback()
def process_log_file(self, log_file: str, follow: bool = False):
"""
Processa file di log
follow: se True, segue il file come 'tail -f'
"""
print(f"[INFO] Processando {log_file} (follow={follow})")
processed = 0
saved = 0
try:
with open(log_file, 'r') as f:
# Se follow, vai alla fine del file
if follow:
f.seek(0, 2) # Seek to end
while True:
line = f.readline()
if not line:
if follow:
time.sleep(0.1) # Attendi nuove righe
self.conn.commit() # Commit batch
continue
else:
break # Fine file
processed += 1
# Parsa riga
log_data = self.parse_log_line(line.strip())
if log_data:
self.save_to_db(log_data)
saved += 1
# Commit ogni 100 righe
if processed % 100 == 0:
self.conn.commit()
print(f"[INFO] Processate {processed} righe, salvate {saved} log")
except KeyboardInterrupt:
print("\n[INFO] Interrotto dall'utente")
except Exception as e:
print(f"[ERROR] Errore processamento file: {e}")
finally:
self.conn.commit()
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati")
def main():
"""Main entry point"""
print("[DEBUG] Avvio syslog_parser...")
# Carica variabili d'ambiente da .env
print("[DEBUG] Caricamento .env da /opt/ids/.env...")
load_dotenv("/opt/ids/.env")
print("[DEBUG] .env caricato")
# Configurazione database da environment
# IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4
db_config = {
'host': os.getenv('PGHOST', '127.0.0.1'),
'port': os.getenv('PGPORT', '5432'),
'database': os.getenv('PGDATABASE', 'ids_database'),
'user': os.getenv('PGUSER', 'ids_user'),
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
}
print(f"[DEBUG] Configurazione database:")
print(f"[DEBUG] Host: {db_config['host']}")
print(f"[DEBUG] Port: {db_config['port']}")
print(f"[DEBUG] Database: {db_config['database']}")
print(f"[DEBUG] User: {db_config['user']}")
# File log da processare
log_file = '/var/log/mikrotik/raw.log'
print(f"[DEBUG] File log: {log_file}")
# Verifica esistenza file
if not os.path.exists(log_file):
print(f"[ERROR] File log non trovato: {log_file}")
print(f"[ERROR] Verifica che rsyslog sia configurato correttamente")
return
print(f"[INFO] File log trovato: {log_file}")
# Crea parser
print("[DEBUG] Creazione parser...")
parser = SyslogParser(db_config)
try:
# Connetti al database
print("[DEBUG] Connessione database...")
parser.connect_db()
# Processa file in modalità follow (come tail -f)
print("[INFO] Avvio processamento log (modalità follow)...")
parser.process_log_file(log_file, follow=True)
except Exception as e:
print(f"[ERROR] Errore critico: {e}")
import traceback
traceback.print_exc()
finally:
print("[DEBUG] Chiusura connessione database...")
parser.disconnect_db()
if __name__ == "__main__":
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
print("Pressione Ctrl+C per interrompere\n")
try:
main()
except KeyboardInterrupt:
print("\n[INFO] Terminazione da utente (Ctrl+C)")
except Exception as e:
print(f"\n[ERROR] Errore fatale: {e}")
import traceback
traceback.print_exc()