#!/usr/bin/env python3 import pandas as pd from sqlalchemy import create_engine from sqlalchemy.sql import text from joblib import load import logging import gc import os import time import sys from collections import defaultdict from datetime import datetime, timedelta, timezone import ipaddress import numpy as np from sklearn.ensemble import IsolationForest import threading import argparse import signal # Configurazione del logging avanzata per il debug logging.basicConfig( level=logging.INFO, # Modificato da DEBUG a INFO per default format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('ddetect_debug.log') ] ) # Configurazione del database DB_USER = os.environ.get('DB_USER', 'root') DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-') DB_HOST = os.environ.get('DB_HOST', 'localhost') DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK') CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}' # Percorsi dei file MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models') os.makedirs(MODEL_DIR, exist_ok=True) MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib') PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib') WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt') LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt') # Definizione dei livelli di rischio e soglie RISK_LEVELS = { 'NORMALE': 0.1, 'BASSO': 0.3, 'MEDIO': 0.6, 'ALTO': 0.8, 'CRITICO': 0.95 } logging.debug(f"Percorsi: MODEL_DIR={MODEL_DIR}, WHITELIST_PATH={WHITELIST_PATH}") logging.debug(f"Livelli di rischio configurati: {RISK_LEVELS}") # Variabili globali per il tracciamento dell'avanzamento progress_counters = { 'ip_whitelisted': 0, 'ip_analyzed': 0, 'ip_normal': 0, 'ip_low': 0, 'ip_medium': 0, 'ip_high': 0, 'ip_critical': 0, 'metrics_processed': 0, 'last_update': 0, 'in_progress': False, 'operation': '', 'start_time': None } def reset_counters(): """ Resetta i contatori per una nuova esecuzione """ global progress_counters progress_counters['ip_whitelisted'] = 0 progress_counters['ip_analyzed'] = 0 progress_counters['ip_normal'] = 0 progress_counters['ip_low'] = 0 progress_counters['ip_medium'] = 0 progress_counters['ip_high'] = 0 progress_counters['ip_critical'] = 0 progress_counters['metrics_processed'] = 0 progress_counters['last_update'] = 0 progress_counters['in_progress'] = False progress_counters['operation'] = '' progress_counters['start_time'] = None def start_progress_tracking(operation): """ Inizia il tracciamento dell'operazione """ global progress_counters reset_counters() progress_counters['in_progress'] = True progress_counters['operation'] = operation progress_counters['start_time'] = time.time() # Avvia un thread per il reporting threading.Thread(target=progress_reporter, daemon=True).start() logging.info(f"Avvio monitoraggio operazione: {operation}") def update_counter(counter_name, increment=1): """ Aggiorna un contatore specifico """ global progress_counters if counter_name in progress_counters: progress_counters[counter_name] += increment def end_progress_tracking(): """ Termina il tracciamento e mostra il report finale """ global progress_counters if not progress_counters['in_progress']: return progress_counters['in_progress'] = False report_progress(force=True) logging.info(f"Monitoraggio completato per: {progress_counters['operation']}") def report_progress(force=False): """ Riporta lo stato attuale dei contatori """ global progress_counters if not progress_counters['in_progress'] and not force: return current_time = time.time() if not force and (current_time - progress_counters['last_update']) < 10: # Aggiorna ogni 10 secondi return elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0 report = f""" ======== REPORT DI PROGRESSO - {progress_counters['operation']} ======== Tempo trascorso: {elapsed:.1f} secondi IP Whitelistati esclusi: {progress_counters['ip_whitelisted']} Metriche elaborate: {progress_counters['metrics_processed']} IP Analizzati: {progress_counters['ip_analyzed']} Classificazione rischio: - IP NORMALI: {progress_counters['ip_normal']} - IP BASSI: {progress_counters['ip_low']} - IP MEDI: {progress_counters['ip_medium']} - IP ALTI: {progress_counters['ip_high']} - IP CRITICI: {progress_counters['ip_critical']} ================================================================ """ logging.info(report) progress_counters['last_update'] = current_time def progress_reporter(): """ Thread che riporta periodicamente i progressi """ while progress_counters['in_progress']: report_progress() time.sleep(2) # Controlla ogni 2 secondi, ma riporta solo ogni 10 def test_database_connection(): """ Test di connessione al database """ try: logging.debug("Tentativo di connessione al database...") engine = create_engine(CONN_STRING) with engine.connect() as conn: result = conn.execute(text("SELECT 1")).fetchone() if result[0] == 1: logging.debug("Test connessione al database riuscito!") tables = conn.execute(text("SHOW TABLES")).fetchall() logging.debug(f"Tabelle disponibili: {[t[0] for t in tables]}") return True return False except Exception as e: logging.error(f"Errore nel test di connessione al database: {e}") return False def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2): """ Crea una connessione al database con tentativi multipli """ for attempt in range(max_retries): try: # Configurazione ottimizzata per SQLAlchemy engine = create_engine( conn_string, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True, pool_timeout=30, echo=False, isolation_level="READ COMMITTED" ) # Test di connessione with engine.connect() as conn: conn.execute(text("SELECT 1")).fetchone() logging.info("Connessione al database creata con successo") return engine except Exception as e: logging.error(f"Tentativo {attempt+1} fallito: {e}") if attempt < max_retries - 1: logging.info(f"Nuovo tentativo tra {retry_delay} secondi...") time.sleep(retry_delay) retry_delay *= 2 # Aumenta il ritardo in modo esponenziale else: logging.error("Impossibile connettersi al database dopo tutti i tentativi") raise def load_models(): """ Carica i modelli salvati e gli oggetti per il preprocessing """ try: # Carica il modello Isolation Forest model = load(MODEL_PATH) logging.info("Modello Isolation Forest caricato con successo") # Tenta di caricare il preprocessor try: preprocessor = load(PREPROCESSOR_PATH) # Verifica che il preprocessor abbia la struttura attesa if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor: logging.info(f"Preprocessor caricato con successo: {len(preprocessor['feature_columns'])} feature") return model, preprocessor else: logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.") except Exception as e: logging.error(f"Errore nel caricamento del preprocessor: {e}") # Se siamo qui, il preprocessor non è valido o ha dato errore logging.error("Preprocessor non valido o non contiene le informazioni sulle feature. Utilizzo metodo fallback.") # Crea un preprocessor fallback con 53 colonne (numero atteso dal modello) fallback_preprocessor = { 'feature_columns': [f'col_{i}' for i in range(53)], 'categorical_features': {}, 'text_vectorizer': None } logging.info("Creato preprocessor fallback con 53 feature") return model, fallback_preprocessor except Exception as e: logging.error(f"Errore nel caricamento dei modelli: {e}") return None, None def load_whitelist(whitelist_path=None): """ Carica la whitelist da file """ if whitelist_path is None: whitelist_path = '/root/whitelist.txt' try: if not os.path.exists(whitelist_path): logging.warning(f"File whitelist non trovato: {whitelist_path}") return {'exact_ips': set(), 'networks': []} with open(whitelist_path, 'r') as f: whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')] exact_ips = set() networks = [] for entry in whitelist_entries: try: if '/' in entry: # È una rete network = ipaddress.ip_network(entry, strict=False) networks.append(network) else: # È un IP singolo exact_ips.add(entry) except Exception as e: logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}") whitelist = { 'exact_ips': exact_ips, 'networks': networks } logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.") return whitelist except Exception as e: logging.error(f"Errore nel caricamento della whitelist: {e}") return {'exact_ips': set(), 'networks': []} def is_ip_whitelisted(ip, whitelist): """ Verifica se un IP è nella whitelist """ if pd.isna(ip) or not ip: return False try: # Ottimizzazione: utilizziamo un set per la verifica di appartenenza diretta if ip in whitelist.get('exact_ips', set()): update_counter('ip_whitelisted') return True # Per le reti dobbiamo convertire l'IP in un oggetto try: ip_obj = ipaddress.ip_address(ip) except ValueError: logging.debug(f"IP non valido: {ip}") return False # Limitiamo la verifica a massimo 5000 reti per evitare blocchi # Verifichiamo le reti solo se l'IP non è già stato trovato come esatto for i, network in enumerate(whitelist.get('networks', [])): if i >= 5000: # Limitiamo per evitare blocchi # Usiamo una variabile globale per loggare questo warning solo una volta per IP if not hasattr(is_ip_whitelisted, 'warned_ips'): is_ip_whitelisted.warned_ips = set() if ip not in is_ip_whitelisted.warned_ips: logging.debug(f"Limite di 5000 reti raggiunto nella verifica whitelist per IP {ip}") is_ip_whitelisted.warned_ips.add(ip) # Limita la dimensione del set per evitare crescita incontrollata if len(is_ip_whitelisted.warned_ips) > 100: is_ip_whitelisted.warned_ips.clear() break if ip_obj in network: update_counter('ip_whitelisted') return True return False except Exception as e: logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}") return False def load_last_analyzed_id(): """ Carica l'ultimo ID analizzato dal file """ try: if os.path.exists(LAST_ID_PATH): with open(LAST_ID_PATH, 'r') as f: last_id = int(f.read().strip()) return last_id else: logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.") return 0 except Exception as e: logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}") return 0 def save_last_analyzed_id(last_id): """ Salva l'ultimo ID analizzato nel file """ try: with open(LAST_ID_PATH, 'w') as f: f.write(str(last_id)) logging.info(f"Ultimo ID analizzato salvato: {last_id}") except Exception as e: logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}") def extract_data(engine, last_analyzed_id, batch_size=10000): """ Estrae i dati dal database a partire dall'ultimo ID analizzato Utilizzando la tabella Esterna """ try: query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4 FROM Esterna WHERE ID > :last_id ORDER BY ID ASC LIMIT :batch_size """) new_data = pd.read_sql(query, engine, params={"last_id": last_analyzed_id, "batch_size": batch_size}) logging.info(f"Dati estratti: {len(new_data)} record dalla tabella Esterna.") return new_data except Exception as e: logging.error(f"Errore durante l'esecuzione della query SQL: {e}") return pd.DataFrame() def prepare_data(new_data, preprocessor=None): """ Prepara i dati per la predizione con un approccio compatibile con il modello addestrato in analisys_fixed.py, che si aspetta 83 feature """ logging.info("--DEBUG-- Inizio preparazione dati compatibile...") try: # Ottieni i nomi delle feature dal preprocessor se disponibile feature_names = [] if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor: feature_names = preprocessor['feature_columns'] logging.info(f"Trovate {len(feature_names)} feature nel preprocessor") else: # Feature predefinite osservate nei log feature_names = [ 'time_since_last', 'events_last_hour', 'events_last_day', 'time_since_last_mean', 'time_since_last_std', 'time_since_last_min', 'time_since_last_max', 'events_last_hour_max', 'events_last_day_max' ] # Aggiungi feature TF-IDF del protocollo for i in range(21): feature_names.append(f'protocol_tfidf_{i}') # Aggiungi colonne numeriche for i in range(15): feature_names.append(f'col_{i}') # Aggiungi feature categoriche per Host feature_names.extend(['host_FIBRA', 'host_nan']) # Aggiungi altre colonne come osservato nei log for i in range(15): feature_names.append(f'col_{i}') # Aggiungi di nuovo le TF-IDF for i in range(21): feature_names.append(f'protocol_tfidf_{i}') logging.info(f"Usando {len(feature_names)} feature predefinite") # Crea un dataframe con tutte le feature richieste, inizializzate a zero import numpy as np X = pd.DataFrame(np.zeros((len(new_data), len(feature_names))), index=new_data.index) # Imposta i nomi delle colonne X.columns = feature_names # Estrai IP attaccante da Messaggio2 se disponibile if 'Messaggio2' in new_data.columns: new_data['IP_Attaccante'] = new_data['Messaggio2'].apply(lambda x: x.split(':')[0] if pd.notna(x) and ':' in str(x) else None) logging.info(f"Estratti {len(new_data[new_data['IP_Attaccante'].notna()])} IP attaccanti") # Se possibile, aggiungiamo alcune informazioni if 'IP_Attaccante' in new_data.columns and 'Host' in new_data.columns: try: from category_encoders import HashingEncoder # Converti i valori essenziali (IP e Host) logging.info("--DEBUG-- Encoding valori essenziali...") he = HashingEncoder(n_components=10, hash_method='md5') # Attento a come gestiamo i valori nulli encoded_values = he.fit_transform( new_data[['Host', 'IP_Attaccante']].fillna('unknown') ) # Popola le prime colonne disponibili con i valori codificati for i in range(min(10, encoded_values.shape[1])): col_name = f'col_{i}' if col_name in X.columns: X[col_name] = encoded_values.iloc[:, i].values # Imposta il valore della colonna host_FIBRA se presente if 'host_FIBRA' in X.columns: X['host_FIBRA'] = new_data['Host'].fillna('').str.contains('FIBRA').astype(int) # Imposta il valore della colonna host_nan se presente if 'host_nan' in X.columns: X['host_nan'] = new_data['Host'].isna().astype(int) except Exception as e: logging.error(f"Errore nell'encoding dei valori: {e}") logging.info(f"--DEBUG-- Preparazione dati completata: {X.shape} (dovrebbe essere {len(new_data)} righe, 83 colonne)") return X except Exception as e: logging.error(f"Errore nella preparazione dei dati: {e}") import traceback logging.error(f"Traceback: {traceback.format_exc()}") return None def predict_anomalies(model, features): """ Predice le anomalie utilizzando il modello caricato """ try: logging.info(f"--DEBUG-- Predizione su {features.shape[0]} esempi con {features.shape[1]} feature") # Verifica se i nomi delle feature sono corretti expected_names = [f'col_{i}' for i in range(53)] if not all(col in features.columns for col in expected_names): logging.error("Nomi delle colonne non compatibili con il modello") # Rinomina le colonne se necessario features.columns = [f'col_{i}' for i in range(features.shape[1])] logging.info("Colonne rinominate per compatibilità") # Aggiorna il contatore PRIMA di fare la predizione update_counter('metrics_processed', features.shape[0]) if hasattr(model, 'predict'): # Esegui la predizione con timeout start_time = time.time() max_time = 60 # Massimo 60 secondi # Imposta X_sparse per compatibilità con il modello try: # Sopprimiamo il warning sui nomi delle feature import warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning, message="X does not have valid feature names") from scipy import sparse # Salva i nomi delle colonne prima di convertire in matrice sparse feature_names = features.columns.tolist() X_sparse = sparse.csr_matrix(features.values) logging.info(f"--DEBUG-- Matrice sparse creata: {X_sparse.shape}") # Prova a fare la predizione con la matrice sparse predictions = model.predict(X_sparse) logging.info(f"--DEBUG-- Predizione su matrice sparse completata: {len(predictions)} risultati") except Exception as e: logging.warning(f"Errore sulla matrice sparse: {e}. Provo con DataFrame normale.") # Fallback: usa il DataFrame originale predictions = model.predict(features) logging.info(f"--DEBUG-- Predizione su DataFrame completata: {len(predictions)} risultati") # Conta le anomalie anomaly_count = sum(1 for p in predictions if p == -1) logging.info(f"Trovate {anomaly_count} anomalie su {len(predictions)} predizioni") return predictions else: logging.error("Il modello non ha il metodo predict") # Fallback - considera tutto normale return np.zeros(features.shape[0]) except Exception as e: logging.error(f"Errore durante la predizione: {e}") import traceback logging.error(traceback.format_exc()) # Fallback - considera tutto normale return np.zeros(features.shape[0]) def get_details(engine, ids): """ Ottieni i dettagli completi per gli ID specificati dalla tabella Esterna """ try: if not ids: return pd.DataFrame() id_list = ','.join(map(str, ids)) query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4 FROM Esterna WHERE ID IN ({}) """.format(id_list)) details = pd.read_sql(query, engine) # Converti timestamp if 'Data' in details.columns and 'Ora' in details.columns: details['Data'] = pd.to_datetime(details['Data'], errors='coerce') details['Ora'] = pd.to_timedelta(details['Ora'].astype(str), errors='coerce') details['Timestamp'] = details['Data'] + details['Ora'] return details except Exception as e: logging.error(f"Errore nell'ottenere i dettagli dalla tabella Esterna: {e}") return pd.DataFrame() def classify_risk(anomaly_score): """ Classifica il livello di rischio in base allo score di anomalia """ # Normalizza lo score (potrebbe essere già normalizzato) score = abs(anomaly_score) if score < RISK_LEVELS['NORMALE']: update_counter('ip_normal') return 'NORMALE' elif score < RISK_LEVELS['BASSO']: update_counter('ip_low') return 'BASSO' elif score < RISK_LEVELS['MEDIO']: update_counter('ip_medium') return 'MEDIO' elif score < RISK_LEVELS['ALTO']: update_counter('ip_high') return 'ALTO' else: update_counter('ip_critical') return 'CRITICO' def is_known_attacker(engine, ip_address): """ Verifica se un IP è un attaccante noto Crea la tabella se non esiste """ try: with engine.connect() as conn: # Crea la tabella se non esiste logging.debug(f"Verifica tabella known_attackers per IP {ip_address}") create_table_query = text(""" CREATE TABLE IF NOT EXISTS known_attackers ( id INT AUTO_INCREMENT PRIMARY KEY, ip_address VARCHAR(45) NOT NULL, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, attack_count INT DEFAULT 1, risk_level VARCHAR(20) DEFAULT 'NORMALE', ports_used TEXT DEFAULT NULL, attack_patterns TEXT DEFAULT NULL, is_blocked TINYINT(1) DEFAULT 0, UNIQUE KEY unique_ip (ip_address) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) # Assicurati che la transazione sia valida conn.execute(create_table_query) # Verifica se l'IP esiste check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip") logging.debug(f"Esecuzione query: {check_query} con parametri: {{'ip': {ip_address}}}") result = conn.execute(check_query, {"ip": ip_address}).fetchone() exists = result is not None logging.debug(f"IP {ip_address} è un attaccante noto: {exists}") return exists except Exception as e: logging.error(f"Errore nel verificare se l'IP {ip_address} è un attaccante noto: {e}") return False def update_known_attacker(engine, ip_address, risk_level, port=None, message=None): """ Aggiorna o inserisce un attaccante noto Incrementa il livello di rischio se l'IP viene visto più volte """ try: conn = engine.connect() trans = conn.begin() # Inizia una transazione esplicita try: # Verifica se l'IP esiste già logging.debug(f"Aggiornamento attaccante noto: {ip_address}, rischio iniziale: {risk_level}") query = text("SELECT * FROM known_attackers WHERE ip_address = :ip") result = conn.execute(query, {"ip": ip_address}).fetchone() if result: # Ottieni il conteggio attacchi e il rischio attuale attack_count = result[4] + 1 # Indice 4 = attack_count current_risk = result[5] # Indice 5 = risk_level current_ports = result[6] # Indice 6 = ports_used logging.debug(f"IP {ip_address}: conteggio attuale={attack_count-1}, rischio attuale={current_risk}") # Aggiorna la lista delle porte new_ports = current_ports or "" if port and port not in new_ports: new_ports = f"{new_ports},{port}" if new_ports else port # Incrementa il livello di rischio basato sul numero di rilevamenti new_risk = risk_level old_risk = current_risk # Teniamo traccia del rischio precedente # Escalation del rischio in base al numero di rilevamenti if risk_level == 'NORMALE' and attack_count >= 10: new_risk = 'BASSO' logging.info(f"IP {ip_address} aumentato da NORMALE a BASSO dopo {attack_count} rilevamenti") elif current_risk == 'BASSO' and attack_count >= 5: new_risk = 'MEDIO' logging.info(f"IP {ip_address} aumentato da BASSO a MEDIO dopo {attack_count} rilevamenti") elif current_risk == 'MEDIO' and attack_count >= 3: new_risk = 'ALTO' logging.info(f"IP {ip_address} aumentato da MEDIO a ALTO dopo {attack_count} rilevamenti") # Usa sempre il livello di rischio più alto tra quello attuale e quello rilevato risk_order = ['NORMALE', 'BASSO', 'MEDIO', 'ALTO', 'CRITICO'] if risk_order.index(current_risk) > risk_order.index(new_risk): new_risk = current_risk # Mantiene il rischio più alto # Se il rischio è cambiato, aggiorna i contatori if new_risk != old_risk: # Decrementa il contatore del vecchio livello se possibile if old_risk == 'NORMALE': update_counter('ip_normal', -1) elif old_risk == 'BASSO': update_counter('ip_low', -1) elif old_risk == 'MEDIO': update_counter('ip_medium', -1) elif old_risk == 'ALTO': update_counter('ip_high', -1) elif old_risk == 'CRITICO': update_counter('ip_critical', -1) # Incrementa il contatore del nuovo livello if new_risk == 'NORMALE': update_counter('ip_normal') elif new_risk == 'BASSO': update_counter('ip_low') elif new_risk == 'MEDIO': update_counter('ip_medium') elif new_risk == 'ALTO': update_counter('ip_high') elif new_risk == 'CRITICO': update_counter('ip_critical') logging.debug(f"IP {ip_address}: nuovo conteggio={attack_count}, nuovo rischio={new_risk}, porte={new_ports}") # Aggiorna l'esistente update_query = text(""" UPDATE known_attackers SET last_seen = NOW(), attack_count = attack_count + 1, risk_level = :risk, ports_used = :ports WHERE ip_address = :ip """) conn.execute(update_query, {"ip": ip_address, "risk": new_risk, "ports": new_ports}) # Commit della transazione trans.commit() # Restituisci il nuovo livello di rischio return new_risk else: # Inserisci nuovo logging.debug(f"Inserimento nuovo attaccante: {ip_address}, rischio={risk_level}, porta={port}") insert_query = text(""" INSERT INTO known_attackers (ip_address, risk_level, ports_used, attack_patterns, is_blocked) VALUES (:ip, :risk, :port, :message, 0) """) result = conn.execute(insert_query, {"ip": ip_address, "risk": risk_level, "port": port, "message": message}) logging.debug(f"Risultato inserimento: {result.rowcount} righe inserite") # Commit della transazione trans.commit() # Inizializza il contatore al nuovo livello di rischio if risk_level == 'NORMALE': update_counter('ip_normal') elif risk_level == 'BASSO': update_counter('ip_low') elif risk_level == 'MEDIO': update_counter('ip_medium') elif risk_level == 'ALTO': update_counter('ip_high') elif risk_level == 'CRITICO': update_counter('ip_critical') return risk_level except Exception as e: # Rollback in caso di errore trans.rollback() logging.error(f"Errore nell'aggiornare l'attaccante noto {ip_address}: {e}") logging.error(f"Dettagli errore: {str(e)}") return risk_level finally: # Chiudi la connessione conn.close() except Exception as e: logging.error(f"Errore nel creare la connessione per {ip_address}: {e}") return risk_level def should_block_ip(risk_level): """ Determina se un IP dovrebbe essere bloccato in base al suo livello di rischio """ should_block = risk_level in ['ALTO', 'CRITICO'] logging.debug(f"IP con rischio {risk_level} dovrebbe essere bloccato: {should_block}") return should_block def insert_anomaly_to_db(engine, ip_address, risk_level, list_name): """ Inserisce o aggiorna un'anomalia nel database """ try: with engine.connect() as conn: # Verifica se la colonna risk_level esiste nella tabella ip_list try: # Verifica la struttura della tabella check_column_query = text(""" SELECT COUNT(*) AS column_exists FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = :db_name AND TABLE_NAME = 'ip_list' AND COLUMN_NAME = 'risk_level' """) db_name = os.environ.get('MYSQL_DATABASE', 'LOG_MIKROTIK') result = conn.execute(check_column_query, {"db_name": db_name}).fetchone() if result[0] == 0: # La colonna non esiste, aggiungiamola logging.info("La colonna risk_level non esiste nella tabella ip_list. Aggiunta in corso...") alter_query = text(""" ALTER TABLE ip_list ADD COLUMN risk_level VARCHAR(20) DEFAULT 'MEDIO' """) conn.execute(alter_query) logging.info("Colonna risk_level aggiunta con successo.") else: logging.debug("La colonna risk_level esiste già nella tabella ip_list.") except Exception as e: logging.error(f"Errore nella verifica/aggiunta della colonna risk_level: {e}") # Continua comunque, potrebbe fallire nell'insert ma almeno ci abbiamo provato # Ora procediamo con l'inserimento upsert_query = text(""" INSERT INTO ip_list (list_name, ip_address, retrieved_at, risk_level) VALUES (:list_name, :ip_address, NOW(), :risk_level) ON DUPLICATE KEY UPDATE retrieved_at = NOW(), risk_level = :risk_level """) conn.execute(upsert_query, { "list_name": list_name, "ip_address": ip_address, "risk_level": risk_level }) logging.info(f"IP {ip_address} inserito/aggiornato nella lista {list_name} con rischio {risk_level}") return True except Exception as e: logging.error(f"Errore nell'inserire l'IP {ip_address} nel database: {e}") return False def handle_anomaly(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'): """ Gestisce un'anomalia rilevata aggiornando le informazioni nel database """ logging.debug(f"Gestione anomalia per IP: {ip_address}, rischio: {risk_level}") try: # Aggiorna le informazioni sull'attaccante nel database final_risk = update_known_attacker(engine, ip_address, risk_level, port, message) # Se il rischio è ALTO o CRITICO, aggiungi IP alla lista di blocco if should_block_ip(final_risk): insert_anomaly_to_db(engine, ip_address, final_risk, list_name) except Exception as e: logging.error(f"Errore nella gestione dell'anomalia per {ip_address}: {e}") def main(): """ Funzione principale per il rilevamento DDoS """ # Parsing degli argomenti parser = argparse.ArgumentParser(description='Rilevamento DDoS in tempo reale') parser.add_argument('--debug', action='store_true', help='Abilita logging di debug dettagliato') parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione del batch di dati da analizzare') parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso del file whitelist') parser.add_argument('--ciclo', action='store_true', help='Esegui in un ciclo infinito fino all\'interruzione') parser.add_argument('--pausa', type=int, default=60, help='Secondi di pausa tra un ciclo e l\'altro (con --ciclo)') args = parser.parse_args() # Imposta il livello di logging in base agli argomenti if args.debug: logging.getLogger().setLevel(logging.DEBUG) logging.debug("Modalità debug attivata") # Gestisce l'interruzione con CTRL+C def handle_interrupt(signum, frame): logging.info("Ricevuto segnale di interruzione. Chiusura del programma...") end_progress_tracking() exit(0) # Registra il gestore per SIGINT (CTRL+C) signal.signal(signal.SIGINT, handle_interrupt) # Ciclo infinito quando --ciclo è specificato ciclo_count = 0 def esegui_analisi(): # Visualizza informazioni di avvio if args.ciclo: ciclo_txt = f" (ciclo {ciclo_count})" else: ciclo_txt = "" logging.info(f"Avvio rilevamento DDoS{ciclo_txt}...") start_progress_tracking(f"rilevamento DDoS{ciclo_txt}") # Verifica percorsi e autorizzazioni dei file di modello logging.info("Verifica dei percorsi dei modelli...") model_files = { "Modello principale": MODEL_PATH, "Preprocessor": PREPROCESSOR_PATH } for name, path in model_files.items(): if os.path.exists(path): logging.info(f"✅ {name} trovato: {path}") try: # Verifica che il file sia leggibile with open(path, 'rb') as f: f.read(1) logging.info(f"✅ {name} è leggibile") except Exception as e: logging.error(f"❌ {name} esiste ma non è leggibile: {e}") else: logging.warning(f"❌ {name} non trovato: {path}") # Test connessione database if not test_database_connection(): logging.error("Impossibile connettersi al database. Verificare le credenziali e la disponibilità del server.") end_progress_tracking() return False try: # Connessione al database logging.info("Connessione al database...") engine = create_engine_with_retry(CONN_STRING) # Caricamento del modello model, preprocessor = load_models() if model is None: logging.error("Impossibile caricare il modello. Arresto del programma.") end_progress_tracking() return False # Verifica che il modello sia valido if not hasattr(model, 'predict'): logging.error("Il modello caricato non è valido (manca il metodo predict). Arresto del programma.") end_progress_tracking() return False # Carica la whitelist e processa i dati whitelist = load_whitelist(args.whitelist) # Verifica se ci sono dati nuovi last_id = load_last_analyzed_id() logging.info(f"Last analyzed ID: {last_id}") # SEMPLIFICA IL FLUSSO PRINCIPALE try: # 1. Estrai dati logging.info("--DEBUG-- Estrazione dati dal database...") new_data = extract_data(engine, last_id, args.batch_size) if new_data.empty: logging.info("Nessun nuovo dato da analizzare.") end_progress_tracking() return True # 2. Estrai e prepara gli IP attaccanti logging.info("--DEBUG-- Preparazione IP attaccanti...") # Converti in pochi passaggi per evitare blocchi new_data['IP_Attaccante'] = new_data['Messaggio2'].apply( lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None ) # 3. Conta solo un campione di IP whitelistati per evitare blocchi whitelisted_ips = set() if len(new_data) > 100: # Verifichiamo tutti gli IP per la whitelist all_ips = new_data['IP_Attaccante'].dropna().unique() logging.info(f"--DEBUG-- Verifica di {len(all_ips)} IP unici contro la whitelist...") for ip in all_ips: if is_ip_whitelisted(ip, whitelist): whitelisted_ips.add(ip) logging.info(f"Trovati {len(whitelisted_ips)} IP in whitelist su {len(all_ips)} IP unici") # 4. Prepara i dati con metodo minimalista logging.info("--DEBUG-- Preparazione dati per predizione...") features = prepare_data(new_data, preprocessor) if features is None or features.shape[0] == 0: logging.error("Nessuna feature generata per la predizione.") end_progress_tracking() return True # 5. Fai la predizione logging.info("--DEBUG-- Predizione anomalie...") predictions = predict_anomalies(model, features) if len(predictions) == 0: logging.warning("Nessuna predizione generata, uso array di zeri") predictions = np.zeros(new_data.shape[0]) # 6. Aggiusta le dimensioni se necessario if len(predictions) != len(new_data): logging.warning(f"Dimensioni differenti: predizioni {len(predictions)}, dati {len(new_data)}") if len(predictions) < len(new_data): # Estendi l'array delle predizioni predictions = np.append(predictions, np.zeros(len(new_data) - len(predictions))) else: # Tronca l'array delle predizioni predictions = predictions[:len(new_data)] # 7. Applica risultati e conta new_data['anomaly'] = predictions # 8. Aggiorna i contatori per ogni IP (senza duplicati) ip_counter = {} for idx, row in new_data.iterrows(): ip = row.get('IP_Attaccante') if pd.notna(ip) and ip not in ip_counter: ip_counter[ip] = True if ip not in whitelisted_ips: # Un nuovo IP analizzato update_counter('ip_analyzed') # Classifica in base al risultato della predizione if row['anomaly'] == -1: # Anomalo - non incrementiamo qui, lo farà update_known_attacker pass else: # Normale update_counter('ip_normal') logging.info(f"Analizzati {len(ip_counter)} IP unici, {len(whitelisted_ips)} in whitelist") # 9. Gestisci anomalie anomalies = new_data[new_data['anomaly'] == -1] logging.info(f"Rilevate {len(anomalies)} anomalie su {len(new_data)} eventi") if not anomalies.empty: # Mostra un esempio delle anomalie nei log sample_size = min(5, len(anomalies)) logging.info(f"Esempio di {sample_size} anomalie rilevate:") for idx, row in anomalies.head(sample_size).iterrows(): ip = row.get('IP_Attaccante') msg = row.get('Messaggio2', 'N/A') logging.info(f" - [{idx}] IP: {ip}, Messaggio: {msg}") # Conteggia gli IP anomali per IP anomaly_ips = {} for idx, row in anomalies.iterrows(): ip = row.get('IP_Attaccante') if pd.notna(ip): if ip not in anomaly_ips: anomaly_ips[ip] = 0 anomaly_ips[ip] += 1 logging.info(f"Trovati {len(anomaly_ips)} IP unici con anomalie") # Mostra i top 10 IP con più anomalie top_anomalies = sorted(anomaly_ips.items(), key=lambda x: x[1], reverse=True)[:10] if top_anomalies: logging.info("Top 10 IP con più anomalie:") for ip, count in top_anomalies: logging.info(f" - IP: {ip}, Anomalie: {count}") # Processa ciascun IP una sola volta processed_ips = set() for idx, row in anomalies.iterrows(): ip = row.get('IP_Attaccante') if pd.notna(ip) and ip not in processed_ips and not is_ip_whitelisted(ip, whitelist): processed_ips.add(ip) # Assegna un livello di rischio base risk_level = 'MEDIO' # Default port = None if 'Porta_Attaccante' in row and pd.notna(row['Porta_Attaccante']): port = row['Porta_Attaccante'] # Crea un messaggio informativo msg = f"Anomalia rilevata da {row.get('Messaggio2', 'N/A')}" # Gestisci l'anomalia handle_anomaly(engine, ip, risk_level, port, msg, 'ddos_ia') logging.info(f"Gestite anomalie per {len(processed_ips)} IP unici") # 10. Salva l'ultimo ID analizzato if not new_data.empty: last_analyzed_id = new_data['ID'].max() save_last_analyzed_id(last_analyzed_id) # 11. Segnala il completamento logging.info(f"Analisi completata: processati {len(new_data)} eventi, trovate {len(anomalies)} anomalie") end_progress_tracking() return True except Exception as e: logging.error(f"Errore durante l'analisi: {e}") import traceback logging.error(f"Traceback completo: {traceback.format_exc()}") end_progress_tracking() return False except Exception as e: logging.error(f"Errore generale: {e}") import traceback logging.error(f"Traceback completo: {traceback.format_exc()}") end_progress_tracking() return False # Esegui una singola analisi o in ciclo if args.ciclo: logging.info(f"Esecuzione in modalità ciclo. Per interrompere premere CTRL+C") while True: ciclo_count += 1 success = esegui_analisi() if success: logging.info(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...") time.sleep(args.pausa) else: logging.error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi prima di riprovare...") time.sleep(args.pausa * 2) # Pausa più lunga in caso di errore else: # Modalità singola esegui_analisi() if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "--test": logging.info("MODALITÀ TEST: verifica delle funzioni principali") try: engine = create_engine_with_retry(CONN_STRING) test_ip = "192.168.1.1" logging.info(f"Test 1: Verifica tabella known_attackers") is_known = is_known_attacker(engine, test_ip) logging.info(f"IP {test_ip} è un attaccante noto: {is_known}") logging.info(f"Test 2: Aggiornamento known_attacker") new_risk = update_known_attacker(engine, test_ip, "NORMALE", "80", "Test message") logging.info(f"Nuovo livello di rischio: {new_risk}") logging.info(f"Test 3: Verifica se ora è un attaccante noto") is_known = is_known_attacker(engine, test_ip) logging.info(f"IP {test_ip} è ora un attaccante noto: {is_known}") logging.info("MODALITÀ TEST completata") sys.exit(0) except Exception as e: logging.error(f"Errore nei test: {e}") sys.exit(1) else: main()