ids.alfacom.it/python_ml/syslog_parser.py
marco370 fcd4bbf2b2 Add comprehensive deployment and configuration guides for the IDS system
This commit introduces detailed documentation for deploying the Intrusion Detection System (IDS) on AlmaLinux 9, including setup scripts, MikroTik router configuration, and update procedures via git. It also includes the syslog parser script for processing router logs and saving them to PostgreSQL.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: b2b01a4a-55da-4f33-9143-6bf0399e0a03
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/c9ITWqD
2025-11-15 11:30:55 +00:00

216 lines
7.2 KiB
Python

"""
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
"""
import re
import psycopg2
from datetime import datetime
from typing import Dict, Optional
import os
import time
class SyslogParser:
"""
Parser per log MikroTik in formato syslog
Estrae informazioni di rete e salva in database
"""
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
self.conn = None
self.cursor = None
# Pattern regex per parsare log MikroTik
# Formato: timestamp hostname tag: message
self.patterns = {
# Firewall connection
'firewall': re.compile(
r'(?P<action>accept|drop|reject).*'
r'src-address=(?P<src_ip>[\d.]+):(?P<src_port>\d+).*'
r'dst-address=(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*'
r'proto=(?P<proto>\w+).*'
r'(?:len=(?P<len>\d+))?'
),
# Connection tracking
'connection': re.compile(
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*'
r'proto (?P<proto>\w+).*'
r'(?:packets: (?P<packets>\d+))?.*'
r'(?:bytes: (?P<bytes>\d+))?'
),
}
def connect_db(self):
"""Connessione al database PostgreSQL"""
try:
self.conn = psycopg2.connect(**self.db_config)
self.cursor = self.conn.cursor()
print("[INFO] Connesso a PostgreSQL")
except Exception as e:
print(f"[ERROR] Connessione database fallita: {e}")
raise
def disconnect_db(self):
"""Chiusura connessione database"""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
print("[INFO] Disconnesso da PostgreSQL")
def parse_log_line(self, line: str) -> Optional[Dict]:
"""
Analizza una singola riga di log MikroTik
Returns: Dict con dati parsati o None se non parsabile
"""
# Estrai timestamp, hostname, tag e messaggio
# Formato: Jan 15 10:30:45 router1 firewall,info: drop src-address=...
parts = line.split(None, 4)
if len(parts) < 5:
return None
month, day, time_str, hostname, message = parts
# Parse timestamp (usa anno corrente)
current_year = datetime.now().year
try:
timestamp = datetime.strptime(
f"{current_year} {month} {day} {time_str}",
"%Y %b %d %H:%M:%S"
)
except ValueError:
return None
# Prova pattern firewall
for pattern_name, pattern in self.patterns.items():
match = pattern.search(message)
if match:
data = match.groupdict()
# Aggiungi metadati
data['timestamp'] = timestamp
data['router_name'] = hostname
data['log_type'] = pattern_name
data['raw_message'] = message.strip()
# Converti numeri
for key in ['src_port', 'dst_port', 'len', 'packets', 'bytes']:
if key in data and data[key]:
data[key] = int(data[key])
return data
return None
def save_to_db(self, log_data: Dict):
"""Salva log parsato nel database"""
try:
query = """
INSERT INTO network_logs
(timestamp, router_name, source_ip, source_port, destination_ip,
destination_port, protocol, packet_length, action, raw_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
"""
self.cursor.execute(query, (
log_data.get('timestamp'),
log_data.get('router_name'),
log_data.get('src_ip'),
log_data.get('src_port'),
log_data.get('dst_ip'),
log_data.get('dst_port'),
log_data.get('proto', 'unknown').lower(),
log_data.get('len') or log_data.get('bytes'),
log_data.get('action', 'unknown'),
log_data.get('raw_message')
))
except Exception as e:
print(f"[ERROR] Errore salvataggio log: {e}")
self.conn.rollback()
def process_log_file(self, log_file: str, follow: bool = False):
"""
Processa file di log
follow: se True, segue il file come 'tail -f'
"""
print(f"[INFO] Processando {log_file} (follow={follow})")
processed = 0
saved = 0
try:
with open(log_file, 'r') as f:
# Se follow, vai alla fine del file
if follow:
f.seek(0, 2) # Seek to end
while True:
line = f.readline()
if not line:
if follow:
time.sleep(0.1) # Attendi nuove righe
self.conn.commit() # Commit batch
continue
else:
break # Fine file
processed += 1
# Parsa riga
log_data = self.parse_log_line(line.strip())
if log_data:
self.save_to_db(log_data)
saved += 1
# Commit ogni 100 righe
if processed % 100 == 0:
self.conn.commit()
print(f"[INFO] Processate {processed} righe, salvate {saved} log")
except KeyboardInterrupt:
print("\n[INFO] Interrotto dall'utente")
except Exception as e:
print(f"[ERROR] Errore processamento file: {e}")
finally:
self.conn.commit()
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati")
def main():
"""Main entry point"""
# Configurazione database da environment
db_config = {
'host': os.getenv('PGHOST', 'localhost'),
'port': os.getenv('PGPORT', '5432'),
'database': os.getenv('PGDATABASE', 'ids_database'),
'user': os.getenv('PGUSER', 'ids_user'),
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
}
# File log da processare
log_file = '/var/log/mikrotik/raw.log'
# Crea parser
parser = SyslogParser(db_config)
try:
# Connetti al database
parser.connect_db()
# Processa file in modalità follow (come tail -f)
parser.process_log_file(log_file, follow=True)
finally:
parser.disconnect_db()
if __name__ == "__main__":
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
print("Pressione Ctrl+C per interrompere\n")
main()