""" Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs """ import re import psycopg2 from datetime import datetime from typing import Dict, Optional import os import time from dotenv import load_dotenv class SyslogParser: """ Parser per log MikroTik in formato syslog Estrae informazioni di rete e salva in database """ def __init__(self, db_config: Dict[str, str]): self.db_config = db_config self.conn = None self.cursor = None # Pattern regex per parsare log MikroTik # Formato: timestamp hostname tag: message self.patterns = { # Firewall connection 'firewall': re.compile( r'(?Paccept|drop|reject).*' r'src-address=(?P[\d.]+):(?P\d+).*' r'dst-address=(?P[\d.]+):(?P\d+).*' r'proto=(?P\w+).*' r'(?:len=(?P\d+))?' ), # Connection tracking 'connection': re.compile( r'(?P[\d.]+):(?P\d+)->(?P[\d.]+):(?P\d+).*' r'proto (?P\w+).*' r'(?:packets: (?P\d+))?.*' r'(?:bytes: (?P\d+))?' ), } def connect_db(self): """Connessione al database PostgreSQL""" try: self.conn = psycopg2.connect(**self.db_config) self.cursor = self.conn.cursor() print("[INFO] Connesso a PostgreSQL") except Exception as e: print(f"[ERROR] Connessione database fallita: {e}") raise def disconnect_db(self): """Chiusura connessione database""" if self.cursor: self.cursor.close() if self.conn: self.conn.close() print("[INFO] Disconnesso da PostgreSQL") def parse_log_line(self, line: str) -> Optional[Dict]: """ Analizza una singola riga di log MikroTik Returns: Dict con dati parsati o None se non parsabile """ # Estrai timestamp, hostname, tag e messaggio # Formato: Jan 15 10:30:45 router1 firewall,info: drop src-address=... parts = line.split(None, 4) if len(parts) < 5: return None month, day, time_str, hostname, message = parts # Parse timestamp (usa anno corrente) current_year = datetime.now().year try: timestamp = datetime.strptime( f"{current_year} {month} {day} {time_str}", "%Y %b %d %H:%M:%S" ) except ValueError: return None # Prova pattern firewall for pattern_name, pattern in self.patterns.items(): match = pattern.search(message) if match: data = match.groupdict() # Aggiungi metadati data['timestamp'] = timestamp data['router_name'] = hostname data['log_type'] = pattern_name data['raw_message'] = message.strip() # Converti numeri for key in ['src_port', 'dst_port', 'len', 'packets', 'bytes']: if key in data and data[key]: data[key] = int(data[key]) return data return None def save_to_db(self, log_data: Dict): """Salva log parsato nel database""" try: query = """ INSERT INTO network_logs (timestamp, router_name, source_ip, source_port, destination_ip, destination_port, protocol, packet_length, action, raw_message) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING """ self.cursor.execute(query, ( log_data.get('timestamp'), log_data.get('router_name'), log_data.get('src_ip'), log_data.get('src_port'), log_data.get('dst_ip'), log_data.get('dst_port'), log_data.get('proto', 'unknown').lower(), log_data.get('len') or log_data.get('bytes'), log_data.get('action', 'unknown'), log_data.get('raw_message') )) except Exception as e: print(f"[ERROR] Errore salvataggio log: {e}") self.conn.rollback() def process_log_file(self, log_file: str, follow: bool = False): """ Processa file di log follow: se True, segue il file come 'tail -f' """ print(f"[INFO] Processando {log_file} (follow={follow})") processed = 0 saved = 0 try: with open(log_file, 'r') as f: # Se follow, vai alla fine del file if follow: f.seek(0, 2) # Seek to end while True: line = f.readline() if not line: if follow: time.sleep(0.1) # Attendi nuove righe self.conn.commit() # Commit batch continue else: break # Fine file processed += 1 # Parsa riga log_data = self.parse_log_line(line.strip()) if log_data: self.save_to_db(log_data) saved += 1 # Commit ogni 100 righe if processed % 100 == 0: self.conn.commit() print(f"[INFO] Processate {processed} righe, salvate {saved} log") except KeyboardInterrupt: print("\n[INFO] Interrotto dall'utente") except Exception as e: print(f"[ERROR] Errore processamento file: {e}") finally: self.conn.commit() print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati") def main(): """Main entry point""" # Carica variabili d'ambiente da .env load_dotenv("/opt/ids/.env") # Configurazione database da environment # IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4 db_config = { 'host': os.getenv('PGHOST', '127.0.0.1'), 'port': os.getenv('PGPORT', '5432'), 'database': os.getenv('PGDATABASE', 'ids_database'), 'user': os.getenv('PGUSER', 'ids_user'), 'password': os.getenv('PGPASSWORD', 'ids_password_change_me') } # File log da processare log_file = '/var/log/mikrotik/raw.log' # Crea parser parser = SyslogParser(db_config) try: # Connetti al database parser.connect_db() # Processa file in modalità follow (come tail -f) parser.process_log_file(log_file, follow=True) finally: parser.disconnect_db() if __name__ == "__main__": print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===") print("Pressione Ctrl+C per interrompere\n") main()