Refactors the `SyslogParser` class in `python_ml/syslog_parser.py` to use a new, more comprehensive regex pattern (`main_pattern`) for parsing MikroTik logs. This includes improved identification of 'forward' and 'detected-ddos forward' actions, protocol details (UDP, TCP, ICMP), and associated IP addresses, ports, and lengths. The changes aim to accurately capture network traffic and potential DDoS events from MikroTik logs. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: b7377ada-e722-475a-86d2-07f21299ec70 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/MkBJZ0L
222 lines
7.5 KiB
Python
222 lines
7.5 KiB
Python
"""
|
|
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
|
|
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
|
|
"""
|
|
|
|
import re
|
|
import psycopg2
|
|
from datetime import datetime
|
|
from typing import Dict, Optional
|
|
import os
|
|
import time
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
class SyslogParser:
|
|
"""
|
|
Parser per log MikroTik in formato syslog
|
|
Estrae informazioni di rete e salva in database
|
|
"""
|
|
|
|
def __init__(self, db_config: Dict[str, str]):
|
|
self.db_config = db_config
|
|
self.conn = None
|
|
self.cursor = None
|
|
|
|
# Pattern regex per parsare log MikroTik (formato reale)
|
|
# Esempio: Nov 17 16:52:16 FIBRA forward: ... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
|
|
# Esempio: Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
|
|
|
|
self.main_pattern = re.compile(
|
|
r'(?P<action>forward|detected-ddos forward):.*?'
|
|
r'proto (?P<proto>UDP|TCP|ICMP)(?:\s+\((?P<tcp_flags>[^)]+)\))?.*?'
|
|
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*?'
|
|
r'len (?P<len>\d+)',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
def connect_db(self):
|
|
"""Connessione al database PostgreSQL"""
|
|
try:
|
|
self.conn = psycopg2.connect(**self.db_config)
|
|
self.cursor = self.conn.cursor()
|
|
print("[INFO] Connesso a PostgreSQL")
|
|
except Exception as e:
|
|
print(f"[ERROR] Connessione database fallita: {e}")
|
|
raise
|
|
|
|
def disconnect_db(self):
|
|
"""Chiusura connessione database"""
|
|
if self.cursor:
|
|
self.cursor.close()
|
|
if self.conn:
|
|
self.conn.close()
|
|
print("[INFO] Disconnesso da PostgreSQL")
|
|
|
|
def parse_log_line(self, line: str) -> Optional[Dict]:
|
|
"""
|
|
Analizza una singola riga di log MikroTik
|
|
Returns: Dict con dati parsati o None se non parsabile
|
|
|
|
Formato reale:
|
|
Nov 17 16:52:16 FIBRA forward: in:... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280
|
|
Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210
|
|
"""
|
|
# Estrai timestamp, hostname, messaggio
|
|
# Formato: Nov 17 16:52:16 FIBRA forward: ...
|
|
parts = line.split(None, 4)
|
|
if len(parts) < 5:
|
|
return None
|
|
|
|
month, day, time_str, hostname, message = parts
|
|
|
|
# Parse timestamp (usa anno corrente)
|
|
current_year = datetime.now().year
|
|
try:
|
|
timestamp = datetime.strptime(
|
|
f"{current_year} {month} {day} {time_str}",
|
|
"%Y %b %d %H:%M:%S"
|
|
)
|
|
except ValueError:
|
|
return None
|
|
|
|
# Match pattern principale
|
|
match = self.main_pattern.search(message)
|
|
if match:
|
|
data = match.groupdict()
|
|
|
|
# Aggiungi metadati
|
|
data['timestamp'] = timestamp
|
|
data['router_name'] = hostname
|
|
data['raw_message'] = line.strip()
|
|
|
|
# Determina action finale
|
|
action = data['action']
|
|
if 'detected-ddos' in action:
|
|
data['action'] = 'ddos'
|
|
else:
|
|
data['action'] = 'forward'
|
|
|
|
# Converti numeri
|
|
for key in ['src_port', 'dst_port', 'len']:
|
|
if key in data and data[key]:
|
|
data[key] = int(data[key])
|
|
|
|
return data
|
|
|
|
return None
|
|
|
|
def save_to_db(self, log_data: Dict):
|
|
"""Salva log parsato nel database"""
|
|
try:
|
|
query = """
|
|
INSERT INTO network_logs
|
|
(timestamp, router_name, source_ip, source_port, destination_ip,
|
|
destination_port, protocol, packet_length, action, raw_message)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
"""
|
|
|
|
self.cursor.execute(query, (
|
|
log_data.get('timestamp'),
|
|
log_data.get('router_name'),
|
|
log_data.get('src_ip'),
|
|
log_data.get('src_port'),
|
|
log_data.get('dst_ip'),
|
|
log_data.get('dst_port'),
|
|
log_data.get('proto', 'unknown').lower(),
|
|
log_data.get('len') or log_data.get('bytes'),
|
|
log_data.get('action', 'unknown'),
|
|
log_data.get('raw_message')
|
|
))
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore salvataggio log: {e}")
|
|
self.conn.rollback()
|
|
|
|
def process_log_file(self, log_file: str, follow: bool = False):
|
|
"""
|
|
Processa file di log
|
|
follow: se True, segue il file come 'tail -f'
|
|
"""
|
|
print(f"[INFO] Processando {log_file} (follow={follow})")
|
|
|
|
processed = 0
|
|
saved = 0
|
|
|
|
try:
|
|
with open(log_file, 'r') as f:
|
|
# Se follow, vai alla fine del file
|
|
if follow:
|
|
f.seek(0, 2) # Seek to end
|
|
|
|
while True:
|
|
line = f.readline()
|
|
|
|
if not line:
|
|
if follow:
|
|
time.sleep(0.1) # Attendi nuove righe
|
|
self.conn.commit() # Commit batch
|
|
continue
|
|
else:
|
|
break # Fine file
|
|
|
|
processed += 1
|
|
|
|
# Parsa riga
|
|
log_data = self.parse_log_line(line.strip())
|
|
if log_data:
|
|
self.save_to_db(log_data)
|
|
saved += 1
|
|
|
|
# Commit ogni 100 righe
|
|
if processed % 100 == 0:
|
|
self.conn.commit()
|
|
print(f"[INFO] Processate {processed} righe, salvate {saved} log")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n[INFO] Interrotto dall'utente")
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore processamento file: {e}")
|
|
finally:
|
|
self.conn.commit()
|
|
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
# Carica variabili d'ambiente da .env
|
|
load_dotenv("/opt/ids/.env")
|
|
|
|
# Configurazione database da environment
|
|
# IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4
|
|
db_config = {
|
|
'host': os.getenv('PGHOST', '127.0.0.1'),
|
|
'port': os.getenv('PGPORT', '5432'),
|
|
'database': os.getenv('PGDATABASE', 'ids_database'),
|
|
'user': os.getenv('PGUSER', 'ids_user'),
|
|
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
|
|
}
|
|
|
|
# File log da processare
|
|
log_file = '/var/log/mikrotik/raw.log'
|
|
|
|
# Crea parser
|
|
parser = SyslogParser(db_config)
|
|
|
|
try:
|
|
# Connetti al database
|
|
parser.connect_db()
|
|
|
|
# Processa file in modalità follow (come tail -f)
|
|
parser.process_log_file(log_file, follow=True)
|
|
|
|
finally:
|
|
parser.disconnect_db()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
|
|
print("Pressione Ctrl+C per interrompere\n")
|
|
main()
|