ids.alfacom.it/python_ml/syslog_parser.py
marco370 e201c0e770 Add ability to load environment variables from a file
Imported the load_dotenv function from the dotenv library in syslog_parser.py.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 58d752f9-b57e-4e4c-be73-8208077248c4
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/MkBJZ0L
2025-11-17 17:31:57 +00:00

221 lines
7.3 KiB
Python

"""
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
"""
import re
import psycopg2
from datetime import datetime
from typing import Dict, Optional
import os
import time
from dotenv import load_dotenv
class SyslogParser:
"""
Parser per log MikroTik in formato syslog
Estrae informazioni di rete e salva in database
"""
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
self.conn = None
self.cursor = None
# Pattern regex per parsare log MikroTik
# Formato: timestamp hostname tag: message
self.patterns = {
# Firewall connection
'firewall': re.compile(
r'(?P<action>accept|drop|reject).*'
r'src-address=(?P<src_ip>[\d.]+):(?P<src_port>\d+).*'
r'dst-address=(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*'
r'proto=(?P<proto>\w+).*'
r'(?:len=(?P<len>\d+))?'
),
# Connection tracking
'connection': re.compile(
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*'
r'proto (?P<proto>\w+).*'
r'(?:packets: (?P<packets>\d+))?.*'
r'(?:bytes: (?P<bytes>\d+))?'
),
}
def connect_db(self):
"""Connessione al database PostgreSQL"""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cursor = self.conn.cursor()
print("[INFO] Connesso a PostgreSQL")
except Exception as e:
print(f"[ERROR] Connessione database fallita: {e}")
raise
def disconnect_db(self):
"""Chiusura connessione database"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("[INFO] Disconnesso da PostgreSQL")
def parse_log_line(self, line: str) -> Optional[Dict]:
"""
Analizza una singola riga di log MikroTik
Returns: Dict con dati parsati o None se non parsabile
"""
# Estrai timestamp, hostname, tag e messaggio
# Formato: Jan 15 10:30:45 router1 firewall,info: drop src-address=...
parts = line.split(None, 4)
if len(parts) < 5:
return None
month, day, time_str, hostname, message = parts
# Parse timestamp (usa anno corrente)
current_year = datetime.now().year
try:
timestamp = datetime.strptime(
f"{current_year} {month} {day} {time_str}",
"%Y %b %d %H:%M:%S"
)
except ValueError:
return None
# Prova pattern firewall
for pattern_name, pattern in self.patterns.items():
match = pattern.search(message)
if match:
data = match.groupdict()
# Aggiungi metadati
data['timestamp'] = timestamp
data['router_name'] = hostname
data['log_type'] = pattern_name
data['raw_message'] = message.strip()
# Converti numeri
for key in ['src_port', 'dst_port', 'len', 'packets', 'bytes']:
if key in data and data[key]:
data[key] = int(data[key])
return data
return None
def save_to_db(self, log_data: Dict):
"""Salva log parsato nel database"""
try:
query = """
INSERT INTO network_logs
(timestamp, router_name, source_ip, source_port, destination_ip,
destination_port, protocol, packet_length, action, raw_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
"""
self.cursor.execute(query, (
log_data.get('timestamp'),
log_data.get('router_name'),
log_data.get('src_ip'),
log_data.get('src_port'),
log_data.get('dst_ip'),
log_data.get('dst_port'),
log_data.get('proto', 'unknown').lower(),
log_data.get('len') or log_data.get('bytes'),
log_data.get('action', 'unknown'),
log_data.get('raw_message')
))
except Exception as e:
print(f"[ERROR] Errore salvataggio log: {e}")
self.conn.rollback()
def process_log_file(self, log_file: str, follow: bool = False):
"""
Processa file di log
follow: se True, segue il file come 'tail -f'
"""
print(f"[INFO] Processando {log_file} (follow={follow})")
processed = 0
saved = 0
try:
with open(log_file, 'r') as f:
# Se follow, vai alla fine del file
if follow:
f.seek(0, 2) # Seek to end
while True:
line = f.readline()
if not line:
if follow:
time.sleep(0.1) # Attendi nuove righe
self.conn.commit() # Commit batch
continue
else:
break # Fine file
processed += 1
# Parsa riga
log_data = self.parse_log_line(line.strip())
if log_data:
self.save_to_db(log_data)
saved += 1
# Commit ogni 100 righe
if processed % 100 == 0:
self.conn.commit()
print(f"[INFO] Processate {processed} righe, salvate {saved} log")
except KeyboardInterrupt:
print("\n[INFO] Interrotto dall'utente")
except Exception as e:
print(f"[ERROR] Errore processamento file: {e}")
finally:
self.conn.commit()
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati")
def main():
"""Main entry point"""
# Carica variabili d'ambiente da .env
load_dotenv("/opt/ids/.env")
# Configurazione database da environment
# IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4
db_config = {
'host': os.getenv('PGHOST', '127.0.0.1'),
'port': os.getenv('PGPORT', '5432'),
'database': os.getenv('PGDATABASE', 'ids_database'),
'user': os.getenv('PGUSER', 'ids_user'),
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
}
# File log da processare
log_file = '/var/log/mikrotik/raw.log'
# Crea parser
parser = SyslogParser(db_config)
try:
# Connetti al database
parser.connect_db()
# Processa file in modalità follow (come tail -f)
parser.process_log_file(log_file, follow=True)
finally:
parser.disconnect_db()
if __name__ == "__main__":
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
print("Pressione Ctrl+C per interrompere\n")
main()