""" Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs """ import re import psycopg2 from datetime import datetime from typing import Dict, Optional import os import time from dotenv import load_dotenv class SyslogParser: """ Parser per log MikroTik in formato syslog Estrae informazioni di rete e salva in database """ def __init__(self, db_config: Dict[str, str]): self.db_config = db_config self.conn = None self.cursor = None # Pattern regex per parsare log MikroTik (formato reale) # Esempio: Nov 17 16:52:16 FIBRA forward: ... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280 # Esempio: Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210 self.main_pattern = re.compile( r'(?Pforward|detected-ddos forward):.*?' r'proto (?PUDP|TCP|ICMP)(?:\s+\((?P[^)]+)\))?.*?' r'(?P[\d.]+):(?P\d+)->(?P[\d.]+):(?P\d+).*?' r'len (?P\d+)', re.IGNORECASE ) def connect_db(self): """Connessione al database PostgreSQL""" try: self.conn = psycopg2.connect(**self.db_config) self.cursor = self.conn.cursor() print("[INFO] Connesso a PostgreSQL") except Exception as e: print(f"[ERROR] Connessione database fallita: {e}") raise def disconnect_db(self): """Chiusura connessione database""" if self.cursor: self.cursor.close() if self.conn: self.conn.close() print("[INFO] Disconnesso da PostgreSQL") def parse_log_line(self, line: str) -> Optional[Dict]: """ Analizza una singola riga di log MikroTik Returns: Dict con dati parsati o None se non parsabile Formato reale: Nov 17 16:52:16 FIBRA forward: in:... proto UDP, 185.203.26.34:55841->192.178.203.94:443, len 1280 Nov 17 16:52:16 FIBRA detected-ddos forward: ... proto TCP (SYN), 82.62.84.108:43863->185.203.26.34:8472, len 210 """ # Estrai timestamp, hostname, messaggio # Formato: Nov 17 16:52:16 FIBRA forward: ... parts = line.split(None, 4) if len(parts) < 5: return None month, day, time_str, hostname, message = parts # Parse timestamp (usa anno corrente) current_year = datetime.now().year try: timestamp = datetime.strptime( f"{current_year} {month} {day} {time_str}", "%Y %b %d %H:%M:%S" ) except ValueError: return None # Match pattern principale match = self.main_pattern.search(message) if match: data = match.groupdict() # Aggiungi metadati data['timestamp'] = timestamp data['router_name'] = hostname data['raw_message'] = line.strip() # Determina action finale action = data['action'] if 'detected-ddos' in action: data['action'] = 'ddos' else: data['action'] = 'forward' # Converti numeri for key in ['src_port', 'dst_port', 'len']: if key in data and data[key]: data[key] = int(data[key]) return data return None def save_to_db(self, log_data: Dict): """Salva log parsato nel database""" try: query = """ INSERT INTO network_logs (timestamp, router_name, source_ip, source_port, destination_ip, destination_port, protocol, packet_length, action, raw_message) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING """ self.cursor.execute(query, ( log_data.get('timestamp'), log_data.get('router_name'), log_data.get('src_ip'), log_data.get('src_port'), log_data.get('dst_ip'), log_data.get('dst_port'), log_data.get('proto', 'unknown').lower(), log_data.get('len') or log_data.get('bytes'), log_data.get('action', 'unknown'), log_data.get('raw_message') )) except Exception as e: print(f"[ERROR] Errore salvataggio log: {e}") self.conn.rollback() def cleanup_old_logs(self, days_to_keep: int = 3): """ Elimina log più vecchi di X giorni dal database Fix: usa concatenazione stringa per INTERVAL (psycopg2 non supporta placeholder in literal) """ try: # Fix: Concatena days in stringa SQL (non literal placeholder) query = """ DELETE FROM network_logs WHERE timestamp < NOW() - (%s || ' days')::interval """ self.cursor.execute(query, (str(days_to_keep),)) deleted_count = self.cursor.rowcount self.conn.commit() if deleted_count > 0: print(f"[CLEANUP] ✅ Eliminati {deleted_count} log più vecchi di {days_to_keep} giorni") return deleted_count except Exception as e: print(f"[ERROR] Errore cleanup log vecchi: {e}") import traceback traceback.print_exc() self.conn.rollback() return 0 def process_log_file(self, log_file: str, follow: bool = False): """ Processa file di log in modalità streaming (sicuro con rsyslog) follow: se True, segue il file come 'tail -f' """ print(f"[INFO] Processando {log_file} (follow={follow})") processed = 0 saved = 0 cleanup_counter = 0 try: with open(log_file, 'r') as f: # Se follow, vai alla fine del file if follow: f.seek(0, 2) # Seek to end while True: line = f.readline() if not line: if follow: time.sleep(0.1) # Attendi nuove righe # Commit batch ogni 100 righe processate if processed > 0 and processed % 100 == 0: self.conn.commit() # Cleanup DB ogni 1000 righe (~ ogni minuto) cleanup_counter += 1 if cleanup_counter >= 10000: # ~16 minuti self.cleanup_old_logs(days_to_keep=3) cleanup_counter = 0 continue else: break # Fine file processed += 1 # Parsa riga log_data = self.parse_log_line(line.strip()) if log_data: self.save_to_db(log_data) saved += 1 # Commit ogni 100 righe if processed % 100 == 0: self.conn.commit() if saved > 0: print(f"[INFO] Processate {processed} righe, salvate {saved} log") except KeyboardInterrupt: print("\n[INFO] Interrotto dall'utente") except Exception as e: print(f"[ERROR] Errore processamento file: {e}") import traceback traceback.print_exc() finally: self.conn.commit() print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati") def main(): """Main entry point""" print("[DEBUG] Avvio syslog_parser...") # Carica variabili d'ambiente da .env print("[DEBUG] Caricamento .env da /opt/ids/.env...") load_dotenv("/opt/ids/.env") print("[DEBUG] .env caricato") # Configurazione database da environment # IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4 db_config = { 'host': os.getenv('PGHOST', '127.0.0.1'), 'port': os.getenv('PGPORT', '5432'), 'database': os.getenv('PGDATABASE', 'ids_database'), 'user': os.getenv('PGUSER', 'ids_user'), 'password': os.getenv('PGPASSWORD', 'ids_password_change_me') } print(f"[DEBUG] Configurazione database:") print(f"[DEBUG] Host: {db_config['host']}") print(f"[DEBUG] Port: {db_config['port']}") print(f"[DEBUG] Database: {db_config['database']}") print(f"[DEBUG] User: {db_config['user']}") # File log da processare log_file = '/var/log/mikrotik/raw.log' print(f"[DEBUG] File log: {log_file}") # Verifica esistenza file if not os.path.exists(log_file): print(f"[ERROR] File log non trovato: {log_file}") print(f"[ERROR] Verifica che rsyslog sia configurato correttamente") return print(f"[INFO] File log trovato: {log_file}") # Crea parser print("[DEBUG] Creazione parser...") parser = SyslogParser(db_config) try: # Connetti al database print("[DEBUG] Connessione database...") parser.connect_db() # Processa file in modalità follow (tail -f) - SICURO con rsyslog # Cleanup automatico DB ogni ~16 minuti print("[INFO] Avvio processamento log (modalità streaming sicura)...") print("[INFO] - Modalità follow (tail -f) compatibile con rsyslog") print("[INFO] - Auto-cleanup log > 3 giorni dal database (ogni ~16 min)") print("[INFO] - Commit batch ogni 100 righe") parser.process_log_file(log_file, follow=True) except Exception as e: print(f"[ERROR] Errore critico: {e}") import traceback traceback.print_exc() finally: print("[DEBUG] Chiusura connessione database...") parser.disconnect_db() if __name__ == "__main__": print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===") print("Pressione Ctrl+C per interrompere\n") try: main() except KeyboardInterrupt: print("\n[INFO] Terminazione da utente (Ctrl+C)") except Exception as e: print(f"\n[ERROR] Errore fatale: {e}") import traceback traceback.print_exc()