This commit introduces detailed documentation for deploying the Intrusion Detection System (IDS) on AlmaLinux 9, including setup scripts, MikroTik router configuration, and update procedures via git. It also includes the syslog parser script for processing router logs and saving them to PostgreSQL. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: b2b01a4a-55da-4f33-9143-6bf0399e0a03 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/c9ITWqD
216 lines
7.2 KiB
Python
216 lines
7.2 KiB
Python
"""
|
|
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
|
|
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
|
|
"""
|
|
|
|
import re
|
|
import psycopg2
|
|
from datetime import datetime
|
|
from typing import Dict, Optional
|
|
import os
|
|
import time
|
|
|
|
|
|
class SyslogParser:
|
|
"""
|
|
Parser per log MikroTik in formato syslog
|
|
Estrae informazioni di rete e salva in database
|
|
"""
|
|
|
|
def __init__(self, db_config: Dict[str, str]):
|
|
self.db_config = db_config
|
|
self.conn = None
|
|
self.cursor = None
|
|
|
|
# Pattern regex per parsare log MikroTik
|
|
# Formato: timestamp hostname tag: message
|
|
self.patterns = {
|
|
# Firewall connection
|
|
'firewall': re.compile(
|
|
r'(?P<action>accept|drop|reject).*'
|
|
r'src-address=(?P<src_ip>[\d.]+):(?P<src_port>\d+).*'
|
|
r'dst-address=(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*'
|
|
r'proto=(?P<proto>\w+).*'
|
|
r'(?:len=(?P<len>\d+))?'
|
|
),
|
|
# Connection tracking
|
|
'connection': re.compile(
|
|
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*'
|
|
r'proto (?P<proto>\w+).*'
|
|
r'(?:packets: (?P<packets>\d+))?.*'
|
|
r'(?:bytes: (?P<bytes>\d+))?'
|
|
),
|
|
}
|
|
|
|
def connect_db(self):
|
|
"""Connessione al database PostgreSQL"""
|
|
try:
|
|
self.conn = psycopg2.connect(**self.db_config)
|
|
self.cursor = self.conn.cursor()
|
|
print("[INFO] Connesso a PostgreSQL")
|
|
except Exception as e:
|
|
print(f"[ERROR] Connessione database fallita: {e}")
|
|
raise
|
|
|
|
def disconnect_db(self):
|
|
"""Chiusura connessione database"""
|
|
if self.cursor:
|
|
self.cursor.close()
|
|
if self.conn:
|
|
self.conn.close()
|
|
print("[INFO] Disconnesso da PostgreSQL")
|
|
|
|
def parse_log_line(self, line: str) -> Optional[Dict]:
|
|
"""
|
|
Analizza una singola riga di log MikroTik
|
|
Returns: Dict con dati parsati o None se non parsabile
|
|
"""
|
|
# Estrai timestamp, hostname, tag e messaggio
|
|
# Formato: Jan 15 10:30:45 router1 firewall,info: drop src-address=...
|
|
parts = line.split(None, 4)
|
|
if len(parts) < 5:
|
|
return None
|
|
|
|
month, day, time_str, hostname, message = parts
|
|
|
|
# Parse timestamp (usa anno corrente)
|
|
current_year = datetime.now().year
|
|
try:
|
|
timestamp = datetime.strptime(
|
|
f"{current_year} {month} {day} {time_str}",
|
|
"%Y %b %d %H:%M:%S"
|
|
)
|
|
except ValueError:
|
|
return None
|
|
|
|
# Prova pattern firewall
|
|
for pattern_name, pattern in self.patterns.items():
|
|
match = pattern.search(message)
|
|
if match:
|
|
data = match.groupdict()
|
|
|
|
# Aggiungi metadati
|
|
data['timestamp'] = timestamp
|
|
data['router_name'] = hostname
|
|
data['log_type'] = pattern_name
|
|
data['raw_message'] = message.strip()
|
|
|
|
# Converti numeri
|
|
for key in ['src_port', 'dst_port', 'len', 'packets', 'bytes']:
|
|
if key in data and data[key]:
|
|
data[key] = int(data[key])
|
|
|
|
return data
|
|
|
|
return None
|
|
|
|
def save_to_db(self, log_data: Dict):
|
|
"""Salva log parsato nel database"""
|
|
try:
|
|
query = """
|
|
INSERT INTO network_logs
|
|
(timestamp, router_name, source_ip, source_port, destination_ip,
|
|
destination_port, protocol, packet_length, action, raw_message)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
"""
|
|
|
|
self.cursor.execute(query, (
|
|
log_data.get('timestamp'),
|
|
log_data.get('router_name'),
|
|
log_data.get('src_ip'),
|
|
log_data.get('src_port'),
|
|
log_data.get('dst_ip'),
|
|
log_data.get('dst_port'),
|
|
log_data.get('proto', 'unknown').lower(),
|
|
log_data.get('len') or log_data.get('bytes'),
|
|
log_data.get('action', 'unknown'),
|
|
log_data.get('raw_message')
|
|
))
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore salvataggio log: {e}")
|
|
self.conn.rollback()
|
|
|
|
def process_log_file(self, log_file: str, follow: bool = False):
|
|
"""
|
|
Processa file di log
|
|
follow: se True, segue il file come 'tail -f'
|
|
"""
|
|
print(f"[INFO] Processando {log_file} (follow={follow})")
|
|
|
|
processed = 0
|
|
saved = 0
|
|
|
|
try:
|
|
with open(log_file, 'r') as f:
|
|
# Se follow, vai alla fine del file
|
|
if follow:
|
|
f.seek(0, 2) # Seek to end
|
|
|
|
while True:
|
|
line = f.readline()
|
|
|
|
if not line:
|
|
if follow:
|
|
time.sleep(0.1) # Attendi nuove righe
|
|
self.conn.commit() # Commit batch
|
|
continue
|
|
else:
|
|
break # Fine file
|
|
|
|
processed += 1
|
|
|
|
# Parsa riga
|
|
log_data = self.parse_log_line(line.strip())
|
|
if log_data:
|
|
self.save_to_db(log_data)
|
|
saved += 1
|
|
|
|
# Commit ogni 100 righe
|
|
if processed % 100 == 0:
|
|
self.conn.commit()
|
|
print(f"[INFO] Processate {processed} righe, salvate {saved} log")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n[INFO] Interrotto dall'utente")
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore processamento file: {e}")
|
|
finally:
|
|
self.conn.commit()
|
|
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
# Configurazione database da environment
|
|
db_config = {
|
|
'host': os.getenv('PGHOST', 'localhost'),
|
|
'port': os.getenv('PGPORT', '5432'),
|
|
'database': os.getenv('PGDATABASE', 'ids_database'),
|
|
'user': os.getenv('PGUSER', 'ids_user'),
|
|
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
|
|
}
|
|
|
|
# File log da processare
|
|
log_file = '/var/log/mikrotik/raw.log'
|
|
|
|
# Crea parser
|
|
parser = SyslogParser(db_config)
|
|
|
|
try:
|
|
# Connetti al database
|
|
parser.connect_db()
|
|
|
|
# Processa file in modalità follow (come tail -f)
|
|
parser.process_log_file(log_file, follow=True)
|
|
|
|
finally:
|
|
parser.disconnect_db()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
|
|
print("Pressione Ctrl+C per interrompere\n")
|
|
main()
|