ids.alfacom.it/python_ml/syslog_parser.py
marco370 0d34bf7d3c Update log parsing to better identify network traffic and DDoS events
Refactors the `SyslogParser` class in `python_ml/syslog_parser.py` to use a new, more comprehensive regex pattern (`main_pattern`) for parsing MikroTik logs. This includes improved identification of 'forward' and 'detected-ddos forward' actions, protocol details (UDP, TCP, ICMP), and associated IP addresses, ports, and lengths. The changes aim to accurately capture network traffic and potential DDoS events from MikroTik logs.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: b7377ada-e722-475a-86d2-07f21299ec70
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/MkBJZ0L
2025-11-17 17:35:37 +00:00

222 lines
7.5 KiB
Python

"""
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
"""
import re
import psycopg2
from datetime import datetime
from typing import Dict, Optional
import os
import time
from dotenv import load_dotenv
class SyslogParser:
"""
Parser per log MikroTik in formato syslog
Estrae informazioni di rete e salva in database
"""
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
self.conn = None
self.cursor = None
# Pattern regex per parsare log MikroTik (formato reale)
# Esempio: Nov 17 16:52:16 FIBRA forward: ... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
# Esempio: Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
self.main_pattern = re.compile(
r'(?P<action>forward|detected-ddos forward):.*?'
r'proto (?P<proto>UDP|TCP|ICMP)(?:\s+\((?P<tcp_flags>[^)]+)\))?.*?'
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*?'
r'len (?P<len>\d+)',
re.IGNORECASE
)
def connect_db(self):
"""Connessione al database PostgreSQL"""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cursor = self.conn.cursor()
print("[INFO] Connesso a PostgreSQL")
except Exception as e:
print(f"[ERROR] Connessione database fallita: {e}")
raise
def disconnect_db(self):
"""Chiusura connessione database"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("[INFO] Disconnesso da PostgreSQL")
def parse_log_line(self, line: str) -> Optional[Dict]:
"""
Analizza una singola riga di log MikroTik
Returns: Dict con dati parsati o None se non parsabile
Formato reale:
Nov 17 16:52:16 FIBRA forward: in:... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
"""
# Estrai timestamp, hostname, messaggio
# Formato: Nov 17 16:52:16 FIBRA forward: ...
parts = line.split(None, 4)
if len(parts) < 5:
return None
month, day, time_str, hostname, message = parts
# Parse timestamp (usa anno corrente)
current_year = datetime.now().year
try:
timestamp = datetime.strptime(
f"{current_year} {month} {day} {time_str}",
"%Y %b %d %H:%M:%S"
)
except ValueError:
return None
# Match pattern principale
match = self.main_pattern.search(message)
if match:
data = match.groupdict()
# Aggiungi metadati
data['timestamp'] = timestamp
data['router_name'] = hostname
data['raw_message'] = line.strip()
# Determina action finale
action = data['action']
if 'detected-ddos' in action:
data['action'] = 'ddos'
else:
data['action'] = 'forward'
# Converti numeri
for key in ['src_port', 'dst_port', 'len']:
if key in data and data[key]:
data[key] = int(data[key])
return data
return None
def save_to_db(self, log_data: Dict):
"""Salva log parsato nel database"""
try:
query = """
INSERT INTO network_logs
(timestamp, router_name, source_ip, source_port, destination_ip,
destination_port, protocol, packet_length, action, raw_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
"""
self.cursor.execute(query, (
log_data.get('timestamp'),
log_data.get('router_name'),
log_data.get('src_ip'),
log_data.get('src_port'),
log_data.get('dst_ip'),
log_data.get('dst_port'),
log_data.get('proto', 'unknown').lower(),
log_data.get('len') or log_data.get('bytes'),
log_data.get('action', 'unknown'),
log_data.get('raw_message')
))
except Exception as e:
print(f"[ERROR] Errore salvataggio log: {e}")
self.conn.rollback()
def process_log_file(self, log_file: str, follow: bool = False):
"""
Processa file di log
follow: se True, segue il file come 'tail -f'
"""
print(f"[INFO] Processando {log_file} (follow={follow})")
processed = 0
saved = 0
try:
with open(log_file, 'r') as f:
# Se follow, vai alla fine del file
if follow:
f.seek(0, 2) # Seek to end
while True:
line = f.readline()
if not line:
if follow:
time.sleep(0.1) # Attendi nuove righe
self.conn.commit() # Commit batch
continue
else:
break # Fine file
processed += 1
# Parsa riga
log_data = self.parse_log_line(line.strip())
if log_data:
self.save_to_db(log_data)
saved += 1
# Commit ogni 100 righe
if processed % 100 == 0:
self.conn.commit()
print(f"[INFO] Processate {processed} righe, salvate {saved} log")
except KeyboardInterrupt:
print("\n[INFO] Interrotto dall'utente")
except Exception as e:
print(f"[ERROR] Errore processamento file: {e}")
finally:
self.conn.commit()
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati")
def main():
"""Main entry point"""
# Carica variabili d'ambiente da .env
load_dotenv("/opt/ids/.env")
# Configurazione database da environment
# IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4
db_config = {
'host': os.getenv('PGHOST', '127.0.0.1'),
'port': os.getenv('PGPORT', '5432'),
'database': os.getenv('PGDATABASE', 'ids_database'),
'user': os.getenv('PGUSER', 'ids_user'),
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
}
# File log da processare
log_file = '/var/log/mikrotik/raw.log'
# Crea parser
parser = SyslogParser(db_config)
try:
# Connetti al database
parser.connect_db()
# Processa file in modalità follow (come tail -f)
parser.process_log_file(log_file, follow=True)
finally:
parser.disconnect_db()
if __name__ == "__main__":
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
print("Pressione Ctrl+C per interrompere\n")
main()