Modify syslog_parser.py to load environment variables from .env file and force IPv4 connection to the database. Update replit.md to reflect recent fixes and workflow changes. Increment version in version.json. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 727221f9-ad54-4498-b2e4-e87a951b4308 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/c9ITWqD
220 lines
7.3 KiB
Python
220 lines
7.3 KiB
Python
"""
|
|
Syslog Parser - Analizza log MikroTik e li salva in PostgreSQL
|
|
Legge da /var/log/mikrotik/raw.log e popola la tabella network_logs
|
|
"""
|
|
|
|
import re
|
|
import psycopg2
|
|
from datetime import datetime
|
|
from typing import Dict, Optional
|
|
import os
|
|
import time
|
|
|
|
|
|
class SyslogParser:
|
|
"""
|
|
Parser per log MikroTik in formato syslog
|
|
Estrae informazioni di rete e salva in database
|
|
"""
|
|
|
|
def __init__(self, db_config: Dict[str, str]):
|
|
self.db_config = db_config
|
|
self.conn = None
|
|
self.cursor = None
|
|
|
|
# Pattern regex per parsare log MikroTik
|
|
# Formato: timestamp hostname tag: message
|
|
self.patterns = {
|
|
# Firewall connection
|
|
'firewall': re.compile(
|
|
r'(?P<action>accept|drop|reject).*'
|
|
r'src-address=(?P<src_ip>[\d.]+):(?P<src_port>\d+).*'
|
|
r'dst-address=(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*'
|
|
r'proto=(?P<proto>\w+).*'
|
|
r'(?:len=(?P<len>\d+))?'
|
|
),
|
|
# Connection tracking
|
|
'connection': re.compile(
|
|
r'(?P<src_ip>[\d.]+):(?P<src_port>\d+)->(?P<dst_ip>[\d.]+):(?P<dst_port>\d+).*'
|
|
r'proto (?P<proto>\w+).*'
|
|
r'(?:packets: (?P<packets>\d+))?.*'
|
|
r'(?:bytes: (?P<bytes>\d+))?'
|
|
),
|
|
}
|
|
|
|
def connect_db(self):
|
|
"""Connessione al database PostgreSQL"""
|
|
try:
|
|
self.conn = psycopg2.connect(**self.db_config)
|
|
self.cursor = self.conn.cursor()
|
|
print("[INFO] Connesso a PostgreSQL")
|
|
except Exception as e:
|
|
print(f"[ERROR] Connessione database fallita: {e}")
|
|
raise
|
|
|
|
def disconnect_db(self):
|
|
"""Chiusura connessione database"""
|
|
if self.cursor:
|
|
self.cursor.close()
|
|
if self.conn:
|
|
self.conn.close()
|
|
print("[INFO] Disconnesso da PostgreSQL")
|
|
|
|
def parse_log_line(self, line: str) -> Optional[Dict]:
|
|
"""
|
|
Analizza una singola riga di log MikroTik
|
|
Returns: Dict con dati parsati o None se non parsabile
|
|
"""
|
|
# Estrai timestamp, hostname, tag e messaggio
|
|
# Formato: Jan 15 10:30:45 router1 firewall,info: drop src-address=...
|
|
parts = line.split(None, 4)
|
|
if len(parts) < 5:
|
|
return None
|
|
|
|
month, day, time_str, hostname, message = parts
|
|
|
|
# Parse timestamp (usa anno corrente)
|
|
current_year = datetime.now().year
|
|
try:
|
|
timestamp = datetime.strptime(
|
|
f"{current_year} {month} {day} {time_str}",
|
|
"%Y %b %d %H:%M:%S"
|
|
)
|
|
except ValueError:
|
|
return None
|
|
|
|
# Prova pattern firewall
|
|
for pattern_name, pattern in self.patterns.items():
|
|
match = pattern.search(message)
|
|
if match:
|
|
data = match.groupdict()
|
|
|
|
# Aggiungi metadati
|
|
data['timestamp'] = timestamp
|
|
data['router_name'] = hostname
|
|
data['log_type'] = pattern_name
|
|
data['raw_message'] = message.strip()
|
|
|
|
# Converti numeri
|
|
for key in ['src_port', 'dst_port', 'len', 'packets', 'bytes']:
|
|
if key in data and data[key]:
|
|
data[key] = int(data[key])
|
|
|
|
return data
|
|
|
|
return None
|
|
|
|
def save_to_db(self, log_data: Dict):
|
|
"""Salva log parsato nel database"""
|
|
try:
|
|
query = """
|
|
INSERT INTO network_logs
|
|
(timestamp, router_name, source_ip, source_port, destination_ip,
|
|
destination_port, protocol, packet_length, action, raw_message)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
"""
|
|
|
|
self.cursor.execute(query, (
|
|
log_data.get('timestamp'),
|
|
log_data.get('router_name'),
|
|
log_data.get('src_ip'),
|
|
log_data.get('src_port'),
|
|
log_data.get('dst_ip'),
|
|
log_data.get('dst_port'),
|
|
log_data.get('proto', 'unknown').lower(),
|
|
log_data.get('len') or log_data.get('bytes'),
|
|
log_data.get('action', 'unknown'),
|
|
log_data.get('raw_message')
|
|
))
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore salvataggio log: {e}")
|
|
self.conn.rollback()
|
|
|
|
def process_log_file(self, log_file: str, follow: bool = False):
|
|
"""
|
|
Processa file di log
|
|
follow: se True, segue il file come 'tail -f'
|
|
"""
|
|
print(f"[INFO] Processando {log_file} (follow={follow})")
|
|
|
|
processed = 0
|
|
saved = 0
|
|
|
|
try:
|
|
with open(log_file, 'r') as f:
|
|
# Se follow, vai alla fine del file
|
|
if follow:
|
|
f.seek(0, 2) # Seek to end
|
|
|
|
while True:
|
|
line = f.readline()
|
|
|
|
if not line:
|
|
if follow:
|
|
time.sleep(0.1) # Attendi nuove righe
|
|
self.conn.commit() # Commit batch
|
|
continue
|
|
else:
|
|
break # Fine file
|
|
|
|
processed += 1
|
|
|
|
# Parsa riga
|
|
log_data = self.parse_log_line(line.strip())
|
|
if log_data:
|
|
self.save_to_db(log_data)
|
|
saved += 1
|
|
|
|
# Commit ogni 100 righe
|
|
if processed % 100 == 0:
|
|
self.conn.commit()
|
|
print(f"[INFO] Processate {processed} righe, salvate {saved} log")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n[INFO] Interrotto dall'utente")
|
|
except Exception as e:
|
|
print(f"[ERROR] Errore processamento file: {e}")
|
|
finally:
|
|
self.conn.commit()
|
|
print(f"[INFO] Totale: {processed} righe processate, {saved} log salvati")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
# Carica variabili d'ambiente da .env
|
|
load_dotenv("/opt/ids/.env")
|
|
|
|
# Configurazione database da environment
|
|
# IMPORTANTE: Usa 127.0.0.1 invece di localhost per forzare IPv4
|
|
db_config = {
|
|
'host': os.getenv('PGHOST', '127.0.0.1'),
|
|
'port': os.getenv('PGPORT', '5432'),
|
|
'database': os.getenv('PGDATABASE', 'ids_database'),
|
|
'user': os.getenv('PGUSER', 'ids_user'),
|
|
'password': os.getenv('PGPASSWORD', 'ids_password_change_me')
|
|
}
|
|
|
|
# File log da processare
|
|
log_file = '/var/log/mikrotik/raw.log'
|
|
|
|
# Crea parser
|
|
parser = SyslogParser(db_config)
|
|
|
|
try:
|
|
# Connetti al database
|
|
parser.connect_db()
|
|
|
|
# Processa file in modalità follow (come tail -f)
|
|
parser.process_log_file(log_file, follow=True)
|
|
|
|
finally:
|
|
parser.disconnect_db()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("=== SYSLOG PARSER PER ROUTER MIKROTIK ===")
|
|
print("Pressione Ctrl+C per interrompere\n")
|
|
main()
|