#!/usr/bin/env python3 import pandas as pd from sqlalchemy import create_engine from sqlalchemy.sql import text from joblib import load import logging import gc import os import time import sys from collections import defaultdict from datetime import datetime, timedelta, timezone import ipaddress import numpy as np from sklearn.ensemble import IsolationForest import threading import argparse import signal import multiprocessing from concurrent.futures import ThreadPoolExecutor, as_completed from category_encoders import HashingEncoder from sklearn.feature_extraction.text import TfidfVectorizer # Configurazione del logging avanzata per il debug logging.basicConfig( level=logging.WARNING, # Modificato da INFO a WARNING per default format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('ddetect_debug.log') ] ) # Aggiungi un altro handler per il file con livello più dettagliato file_handler = logging.FileHandler('ddetect_full.log') file_handler.setLevel(logging.INFO) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) logging.getLogger().addHandler(file_handler) # Configurazione del database DB_USER = os.environ.get('DB_USER', 'root') DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-') DB_HOST = os.environ.get('DB_HOST', 'localhost') DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK') CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}' # Percorsi dei file MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models') os.makedirs(MODEL_DIR, exist_ok=True) MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib') PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib') WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt') LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt') # Definizione dei livelli di rischio e soglie RISK_LEVELS = { 'NORMALE': 0.1, 'BASSO': 0.3, 'MEDIO': 0.6, 'ALTO': 0.8, 'CRITICO': 0.95 } logging.debug(f"Percorsi: MODEL_DIR={MODEL_DIR}, WHITELIST_PATH={WHITELIST_PATH}") logging.debug(f"Livelli di rischio configurati: {RISK_LEVELS}") # Colori ANSI per i messaggi class Colors: HEADER = '\033[95m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def log_phase(message): """Evidenzia una nuova fase principale dell'esecuzione""" print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n") logging.info(f"FASE: {message}") def log_result(message): """Evidenzia un risultato importante""" print(f"{Colors.BLUE}✓ {message}{Colors.END}") logging.info(f"RISULTATO: {message}") def log_warning(message): """Evidenzia un avviso importante""" print(f"{Colors.YELLOW}⚠ {message}{Colors.END}") logging.warning(message) def log_error(message): """Evidenzia un errore importante""" print(f"{Colors.RED}✗ {message}{Colors.END}") logging.error(message) # Variabili globali per il tracciamento dell'avanzamento progress_counters = { 'ip_whitelisted': 0, 'ip_analyzed': 0, 'ip_normal': 0, 'ip_low': 0, 'ip_medium': 0, 'ip_high': 0, 'ip_critical': 0, 'metrics_processed': 0, 'last_update': 0, 'in_progress': False, 'operation': '', 'start_time': None } def reset_counters(): """ Resetta i contatori per una nuova esecuzione """ global progress_counters progress_counters['ip_whitelisted'] = 0 progress_counters['ip_analyzed'] = 0 progress_counters['ip_normal'] = 0 progress_counters['ip_low'] = 0 progress_counters['ip_medium'] = 0 progress_counters['ip_high'] = 0 progress_counters['ip_critical'] = 0 progress_counters['metrics_processed'] = 0 progress_counters['last_update'] = 0 progress_counters['in_progress'] = False progress_counters['operation'] = '' progress_counters['start_time'] = None def start_progress_tracking(operation): """ Inizia il tracciamento dell'operazione """ global progress_counters reset_counters() progress_counters['in_progress'] = True progress_counters['operation'] = operation progress_counters['start_time'] = time.time() # Avvia un thread per il reporting threading.Thread(target=progress_reporter, daemon=True).start() logging.info(f"Avvio monitoraggio operazione: {operation}") def update_counter(counter_name, increment=1): """ Aggiorna un contatore specifico """ global progress_counters if counter_name in progress_counters: progress_counters[counter_name] += increment def end_progress_tracking(): """ Termina il tracciamento e mostra il report finale """ global progress_counters if not progress_counters['in_progress']: return progress_counters['in_progress'] = False report_progress(force=True) logging.info(f"Monitoraggio completato per: {progress_counters['operation']}") def report_progress(force=False): """ Riporta lo stato attuale dei contatori """ global progress_counters if not progress_counters['in_progress'] and not force: return current_time = time.time() if not force and (current_time - progress_counters['last_update']) < 10: # Aggiorna ogni 10 secondi return elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0 report = f""" {Colors.BOLD}======== REPORT DI PROGRESSO - {progress_counters['operation']} ========{Colors.END} Tempo trascorso: {elapsed:.1f} secondi IP Whitelistati esclusi: {progress_counters['ip_whitelisted']} Metriche elaborate: {progress_counters['metrics_processed']} IP Analizzati: {progress_counters['ip_analyzed']} Classificazione rischio: - IP NORMALI: {progress_counters['ip_normal']} - IP BASSI: {progress_counters['ip_low']} - IP MEDI: {progress_counters['ip_medium']} - IP ALTI: {progress_counters['ip_high']} - IP CRITICI: {progress_counters['ip_critical']} {Colors.BOLD}================================================================{Colors.END} """ print(report) logging.info(report.replace(Colors.BOLD, '').replace(Colors.END, '')) progress_counters['last_update'] = current_time def progress_reporter(): """ Thread che riporta periodicamente i progressi """ while progress_counters['in_progress']: report_progress() time.sleep(2) # Controlla ogni 2 secondi, ma riporta solo ogni 10 def test_database_connection(): """ Test di connessione al database """ try: logging.debug("Tentativo di connessione al database...") engine = create_engine(CONN_STRING) with engine.connect() as conn: result = conn.execute(text("SELECT 1")).fetchone() if result[0] == 1: logging.debug("Test connessione al database riuscito!") tables = conn.execute(text("SHOW TABLES")).fetchall() logging.debug(f"Tabelle disponibili: {[t[0] for t in tables]}") return True return False except Exception as e: logging.error(f"Errore nel test di connessione al database: {e}") return False def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2): """ Crea una connessione al database con tentativi multipli """ for attempt in range(max_retries): try: # Configurazione ottimizzata per SQLAlchemy engine = create_engine( conn_string, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True, pool_timeout=30, echo=False, isolation_level="READ COMMITTED" ) # Test di connessione with engine.connect() as conn: conn.execute(text("SELECT 1")).fetchone() logging.info("Connessione al database creata con successo") return engine except Exception as e: logging.error(f"Tentativo {attempt+1} fallito: {e}") if attempt < max_retries - 1: logging.info(f"Nuovo tentativo tra {retry_delay} secondi...") time.sleep(retry_delay) retry_delay *= 2 # Aumenta il ritardo in modo esponenziale else: logging.error("Impossibile connettersi al database dopo tutti i tentativi") raise def load_models(): """ Carica i modelli di rilevamento delle anomalie addestrati """ try: # Carica il modello logging.info(f"Caricamento modello da {MODEL_PATH}...") if os.path.exists(MODEL_PATH): model = load(MODEL_PATH) logging.debug("Modello caricato con successo!") else: logging.error(f"File modello non trovato: {MODEL_PATH}") return None, None # Carica il preprocessor try: logging.info(f"Caricamento preprocessor da {PREPROCESSOR_PATH}...") if os.path.exists(PREPROCESSOR_PATH): preprocessor = load(PREPROCESSOR_PATH) # Verifica che il preprocessor abbia la struttura attesa if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor: feature_count = len(preprocessor['feature_columns']) if feature_count < 125: logging.warning(f"Il modello si aspetta 125 feature, " f"ma il preprocessor ne ha {feature_count}") return model, preprocessor else: logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.") else: logging.error(f"File preprocessor non trovato: {PREPROCESSOR_PATH}") except Exception as e: logging.error(f"Errore nel caricamento del preprocessor: {e}") # Se arriviamo qui, il preprocessor non è disponibile o non è valido # Crea un preprocessor di fallback preprocessor = {'feature_columns': [f'feature_{i}' for i in range(125)]} return model, preprocessor except Exception as e: logging.error(f"Errore nel caricamento dei modelli: {e}") return None, None def load_whitelist(whitelist_path=None): """ Carica la whitelist da file """ if whitelist_path is None: whitelist_path = '/root/whitelist.txt' try: if not os.path.exists(whitelist_path): logging.warning(f"File whitelist non trovato: {whitelist_path}") return {'exact_ips': set(), 'networks': []} with open(whitelist_path, 'r') as f: whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')] exact_ips = set() networks = [] for entry in whitelist_entries: try: if '/' in entry: # È una rete network = ipaddress.ip_network(entry, strict=False) networks.append(network) else: # È un IP singolo exact_ips.add(entry) except Exception as e: logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}") whitelist = { 'exact_ips': exact_ips, 'networks': networks } logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.") return whitelist except Exception as e: logging.error(f"Errore nel caricamento della whitelist: {e}") return {'exact_ips': set(), 'networks': []} def is_ip_whitelisted(ip, whitelist): """ Verifica se un IP è nella whitelist """ if pd.isna(ip) or not ip: return False try: # Ottimizzazione: utilizziamo un set per la verifica di appartenenza diretta if ip in whitelist.get('exact_ips', set()): update_counter('ip_whitelisted') return True # Per le reti dobbiamo convertire l'IP in un oggetto try: ip_obj = ipaddress.ip_address(ip) except ValueError: logging.debug(f"IP non valido: {ip}") return False # Limitiamo la verifica a massimo 5000 reti per evitare blocchi # Verifichiamo le reti solo se l'IP non è già stato trovato come esatto for i, network in enumerate(whitelist.get('networks', [])): if i >= 5000: # Limitiamo per evitare blocchi # Usiamo una variabile globale per loggare questo warning solo una volta per IP if not hasattr(is_ip_whitelisted, 'warned_ips'): is_ip_whitelisted.warned_ips = set() if ip not in is_ip_whitelisted.warned_ips: logging.debug(f"Limite di 5000 reti raggiunto nella verifica whitelist per IP {ip}") is_ip_whitelisted.warned_ips.add(ip) # Limita la dimensione del set per evitare crescita incontrollata if len(is_ip_whitelisted.warned_ips) > 100: is_ip_whitelisted.warned_ips.clear() break if ip_obj in network: update_counter('ip_whitelisted') return True return False except Exception as e: logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}") return False def load_last_analyzed_id(): """ Carica l'ultimo ID analizzato dal file """ try: if os.path.exists(LAST_ID_PATH): with open(LAST_ID_PATH, 'r') as f: last_id = int(f.read().strip()) return last_id else: logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.") return 0 except Exception as e: logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}") return 0 def save_last_analyzed_id(last_id): """ Salva l'ultimo ID analizzato nel file """ try: with open(LAST_ID_PATH, 'w') as f: f.write(str(last_id)) logging.info(f"Ultimo ID analizzato salvato: {last_id}") except Exception as e: logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}") def extract_data(engine, last_id=0, batch_size=1000, max_id=None): """ Estrae i dati dalla tabella Esterna per l'analisi Ora supporta estrazione ottimizzata e soglia massima """ try: # Se viene specificato max_id, limitiamo l'estrazione fino a quell'ID # Utile per evitare di analizzare log molto vecchi e concentrarsi sugli ultimi if max_id: logging.info(f"Limitazione estrazione fino a ID {max_id}") # Otteniamo il numero totale di record inserendo i parametri direttamente # Questa è una soluzione che evita problemi di compatibilità con i segnaposti if max_id: query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id} AND ID <= {max_id}" else: query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id}" with engine.connect() as conn: count_result = conn.execute(text(query_count)).scalar() if count_result == 0: logging.info("Nessun nuovo record da estrarre") return pd.DataFrame() logging.info(f"Trovati {count_result} nuovi record da estrarre") # Se ci sono troppi record (>50k), otteniamo solo i più recenti effective_batch_size = batch_size if count_result > 50000: logging.warning(f"Troppi record in coda ({count_result}). Limitando l'analisi ai più recenti.") # Troviamo l'ID limite per i record più recenti with engine.connect() as conn: latest_id_query = "SELECT MAX(ID) FROM Esterna" latest_id = conn.execute(text(latest_id_query)).scalar() if latest_id: # Estrarre gli ultimi N record più recenti max_id = latest_id first_id_query = f"SELECT ID FROM Esterna WHERE ID <= {latest_id} ORDER BY ID DESC LIMIT {batch_size*5}" result = conn.execute(text(first_id_query)).fetchall() if result and len(result) > 0: # Prendiamo l'ID più basso dei record recenti first_recent_id = result[-1][0] logging.info(f"Limitando l'analisi ai record con ID da {first_recent_id} a {latest_id}") last_id = first_recent_id - 1 # Aggiorniamo il conteggio effettivo count_result = latest_id - last_id # Limitiamo il batch_size in base al conteggio effettivo if count_result < effective_batch_size: effective_batch_size = count_result # Calcolo del numero di batch num_batches = (count_result + effective_batch_size - 1) // effective_batch_size logging.info(f"Estrazione suddivisa in {num_batches} batch di massimo {effective_batch_size} record ciascuno") # Evitare estrazione su ID troppo elevati max_extraction_limit = 100000 # Limita a 100k record per esecuzione if count_result > max_extraction_limit: logging.warning(f"Troppi record da analizzare ({count_result}). Limitando a {max_extraction_limit}.") count_result = max_extraction_limit # Prima verifichiamo quali colonne sono disponibili nella tabella Esterna try: with engine.connect() as conn: # Ottieni informazioni sulle colonne col_query = f"SHOW COLUMNS FROM Esterna" columns_info = conn.execute(text(col_query)).fetchall() available_columns = [col[0] for col in columns_info] logging.info(f"Colonne disponibili nella tabella Esterna: {available_columns}") except Exception as e: logging.error(f"Errore nel verificare le colonne disponibili: {e}") # Se non riusciamo a ottenere le colonne, usiamo un set predefinito di colonne base available_columns = ['ID', 'Data', 'Ora', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4'] logging.info(f"Utilizzo colonne predefinite: {available_columns}") # Per ogni batch, costruiamo ed eseguiamo la query inserendo # i parametri direttamente nella stringa SQL e utilizzando solo le colonne disponibili current_id = last_id frames = [] # Costruiamo la lista di colonne per la query # Assicuriamoci che ID sia sempre incluso select_columns = ['ID'] # Aggiungiamo altre colonne solo se disponibili for col in ['Data', 'Ora', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4', 'Topic', 'RouterOS', 'RouterIP', 'RouterName']: if col in available_columns: select_columns.append(col) # Costruiamo la stringa delle colonne columns_str = ', '.join(select_columns) for i in range(num_batches): with engine.connect() as conn: if max_id: query = f""" SELECT {columns_str} FROM Esterna WHERE ID > {current_id} AND ID <= {max_id} ORDER BY ID ASC LIMIT {effective_batch_size} """ else: query = f""" SELECT {columns_str} FROM Esterna WHERE ID > {current_id} ORDER BY ID ASC LIMIT {effective_batch_size} """ logging.info(f"Estraendo batch {i+1}/{num_batches}: ID > {current_id}") # Esecuzione della query senza parametri (sono già inseriti nella stringa) result = conn.execute(text(query)) # Convertiamo il risultato in DataFrame chunk = pd.DataFrame(result.fetchall(), columns=result.keys()) if chunk.empty: break # Aggiorna l'ID corrente per il prossimo batch current_id = chunk['ID'].max() # Aggiungiamo una colonna Timestamp calcolata se abbiamo Data e Ora if 'Data' in chunk.columns and 'Ora' in chunk.columns: try: chunk['Data'] = pd.to_datetime(chunk['Data'], errors='coerce') chunk['Ora'] = pd.to_timedelta(chunk['Ora'].astype(str), errors='coerce') chunk['Timestamp'] = chunk['Data'] + chunk['Ora'] except Exception as e: logging.warning(f"Impossibile creare colonna Timestamp: {e}") # Accumula il chunk frames.append(chunk) # Feedback sull'avanzamento logging.info(f"Estratti {len(chunk)} record, fino all'ID {current_id}") # Combina tutti i frame if not frames: return pd.DataFrame() result = pd.concat(frames, ignore_index=True) logging.info(f"Estrazione completata: {len(result)} record totali") return result except Exception as e: logging.error(f"Errore nell'estrazione dei dati: {e}") import traceback logging.error(traceback.format_exc()) return pd.DataFrame() def prepare_data(df, preprocessor): """ Prepara i dati per il modello, generando tutte le feature necessarie per garantire la compatibilità con ddetect_fixed.py e ridurre i placeholder """ try: # Crea una copia esplicita del dataframe per evitare SettingWithCopyWarning df = df.copy() import numpy as np # Numero atteso di feature (dal modello) expected_features = 125 # Prepara un dizionario per tutte le feature feature_data = {} feature_count = 0 # 1. Aggiungi le caratteristiche temporali essenziali (9 feature) time_features = [ 'time_since_last', 'events_last_hour', 'events_last_day', 'time_since_last_mean', 'time_since_last_std', 'time_since_last_min', 'time_since_last_max', 'events_last_hour_max', 'events_last_day_max' ] for feat in time_features: if feat in df.columns: feature_data[feat] = df[feat].fillna(0).values else: feature_data[feat] = np.zeros(len(df)) feature_count += 1 # 2. Estrai caratteristiche TF-IDF dal protocollo (21 feature) if 'Messaggio1' in df.columns: try: from sklearn.feature_extraction.text import TfidfVectorizer vectorizer = TfidfVectorizer(max_features=21) proto_data = df['Messaggio1'].fillna('').astype(str) # Fit e transform tfidf_matrix = vectorizer.fit_transform(proto_data) tfidf_features = tfidf_matrix.toarray() # Aggiungi al feature data for i in range(min(21, tfidf_features.shape[1])): feature_data[f'protocol_tfidf_{i}'] = tfidf_features[:, i] feature_count += 1 # Se abbiamo meno di 21 feature, riempi con zeri for i in range(tfidf_features.shape[1], 21): feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df)) feature_count += 1 except Exception as e: logging.error(f"Errore nell'estrazione TF-IDF: {e}") # Fallback: aggiungi 21 feature vuote for i in range(21): feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df)) feature_count += 1 else: # Se non c'è Messaggio1, aggiungi 21 feature vuote for i in range(21): feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df)) feature_count += 1 # 3. Aggiungi feature di Host (2 feature) if 'Host' in df.columns: feature_data['host_FIBRA'] = df['Host'].fillna('').str.contains('FIBRA').astype(int).values feature_data['host_nan'] = df['Host'].isna().astype(int).values else: feature_data['host_FIBRA'] = np.zeros(len(df)) feature_data['host_nan'] = np.zeros(len(df)) feature_count += 2 # 4. Estrai e codifica IP e Host (15 feature) # Estrai IP attaccante se non presente if 'IP_Attaccante' not in df.columns and 'Messaggio2' in df.columns: df['IP_Attaccante'] = df['Messaggio2'].apply( lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None ) # Usa HashingEncoder per IP e Host se disponibili try: hash_encoder = HashingEncoder(n_components=15) # Prepara i dati per l'encoding encode_cols = [] encode_data = [] if 'IP_Attaccante' in df.columns: encode_cols.append('IP_Attaccante') encode_data.append(df['IP_Attaccante'].fillna('unknown').astype(str)) if 'Host' in df.columns: encode_cols.append('Host') encode_data.append(df['Host'].fillna('unknown').astype(str)) if encode_cols: # Combina i dati per l'encoding encode_df = pd.DataFrame({ col: data for col, data in zip(encode_cols, encode_data) }, index=df.index) # Fai l'encoding encoded = hash_encoder.fit_transform(encode_df) # Aggiungi le feature codificate for i in range(min(15, encoded.shape[1])): feature_data[f'col_{i}'] = encoded.iloc[:, i].values feature_count += 1 # Aggiungi colonne mancanti se necessario for i in range(encoded.shape[1], 15): feature_data[f'col_{i}'] = np.zeros(len(df)) feature_count += 1 else: # Nessuna colonna da codificare, aggiungi feature vuote for i in range(15): feature_data[f'col_{i}'] = np.zeros(len(df)) feature_count += 1 except Exception as e: logging.error(f"Errore nell'encoding delle colonne: {e}") # Fallback: aggiungi 15 feature vuote for i in range(15): feature_data[f'col_{i}'] = np.zeros(len(df)) feature_count += 1 # 5. Caratteristiche aggiuntive da ddetect_fixed (36 feature = 15 + 21) # Aggiungi 15 colonne additional_col for i in range(15): feature_data[f'additional_col_{i}'] = np.zeros(len(df)) feature_count += 1 # Aggiungi 21 colonne additional_tfidf for i in range(21): feature_data[f'additional_tfidf_{i}'] = np.zeros(len(df)) feature_count += 1 # 6. Genera colonne rimanenti per arrivare a 125 remaining = expected_features - feature_count if remaining > 0: placeholder_data = {} for i in range(remaining): feature_data[f'extra_col_{i}'] = np.zeros(len(df)) feature_count += 1 logging.info(f"Aggiunte {remaining} colonne extra per raggiungere {expected_features} feature") # Controlla se abbiamo generato il numero corretto di feature assert feature_count == expected_features, f"Numero di feature generate ({feature_count}) != attese ({expected_features})" # Crea il DataFrame in un unico passaggio e converti in numpy array X = pd.DataFrame(feature_data, index=df.index) X_array = X.to_numpy() # Verifica finale logging.debug(f"Generate {feature_count} feature senza placeholder") return X_array except Exception as e: logging.error(f"Errore nella preparazione dei dati: {e}") import traceback logging.error(f"Traceback: {traceback.format_exc()}") return None def predict_anomalies(model, features, sensitivity=5): """ Predice le anomalie utilizzando il modello caricato Il parametro sensitivity (1-10) regola la sensibilità di rilevamento: - 1: massima sensibilità (più falsi positivi) - 10: minima sensibilità (più falsi negativi) """ try: logging.debug(f"Predizione su {features.shape[0]} esempi con {features.shape[1]} feature (sensibilità: {sensitivity})") # Verifica che il numero di feature corrisponda if features.shape[1] != 125: logging.error(f"Dimensione feature errata: trovate {features.shape[1]}, attese 125") return np.zeros(features.shape[0]) # Fallback sicuro # Aggiorna il contatore PRIMA di fare la predizione update_counter('metrics_processed', features.shape[0]) if hasattr(model, 'predict'): # Esegui la predizione con timeout per evitare blocchi start_time = time.time() max_time = 60 # Massimo 60 secondi try: # Sopprimiamo i warning sui nomi delle feature import warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning) # Crea un DataFrame con i nomi di feature corretti dal modello se disponibili try: if hasattr(model, 'feature_names_in_'): # Se il modello ha feature_names_in_, usa quelli feature_names = model.feature_names_in_ raw_predictions = model.predict(features) else: # Altrimenti, fai la predizione direttamente sull'array numpy raw_predictions = model.predict(features) except Exception as e: logging.warning(f"Errore con feature_names_in_: {e}, tentativo alternativo") # Fallback: predizione diretta su array numpy raw_predictions = model.predict(features) # Se il modello supporta il decision_function, usiamo quello per applicare la sensibilità if hasattr(model, 'decision_function'): try: # Ottieni gli score di decisione decision_scores = model.decision_function(features) # Normalizza gli score per la sensibilità # Un valore inferiore del threshold rende il modello più sensibile # La sensibilità è inversa al threshold: sensitivity 1 = threshold basso, quindi più anomalie threshold_multiplier = sensitivity / 5.0 # 5 è il valore neutro (1.0) custom_threshold = -0.2 * threshold_multiplier # Valore base regolabile # Applica il threshold personalizzato predictions = np.where(decision_scores < custom_threshold, -1, 1) num_anomalies = np.sum(predictions == -1) logging.debug(f"Trovate {num_anomalies} anomalie con sensibilità {sensitivity} (threshold: {custom_threshold:.3f})") except Exception as e: logging.warning(f"Errore nell'utilizzo di decision_function: {e}, usando predict standard") predictions = raw_predictions else: # Usa le predizioni standard predictions = raw_predictions logging.debug(f"Predizione completata: {len(predictions)} risultati") except Exception as e: # Se c'è un errore, registriamolo per debug logging.error(f"Errore durante la predizione: {e}") import traceback logging.error(traceback.format_exc()) # Aggiorniamo anche il timeout elapsed = time.time() - start_time if elapsed >= max_time: logging.error(f"Timeout durante la predizione ({elapsed:.1f} sec)") # Fallback: array di zeri (non anomali) predictions = np.zeros(features.shape[0]) return predictions else: logging.error("Modello non ha il metodo predict") return np.zeros(features.shape[0]) except Exception as e: logging.error(f"Errore generale nella predizione: {e}") import traceback logging.error(traceback.format_exc()) return np.zeros(features.shape[0]) def get_details(engine, ids): """ Ottieni i dettagli completi per gli ID specificati dalla tabella Esterna """ try: if not ids: return pd.DataFrame() id_list = ','.join(map(str, ids)) query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4 FROM Esterna WHERE ID IN ({}) """.format(id_list)) details = pd.read_sql(query, engine) # Converti timestamp if 'Data' in details.columns and 'Ora' in details.columns: details['Data'] = pd.to_datetime(details['Data'], errors='coerce') details['Ora'] = pd.to_timedelta(details['Ora'].astype(str), errors='coerce') details['Timestamp'] = details['Data'] + details['Ora'] return details except Exception as e: logging.error(f"Errore nell'ottenere i dettagli dalla tabella Esterna: {e}") return pd.DataFrame() def classify_risk(anomaly_score): """ Classifica il livello di rischio in base allo score di anomalia """ # Normalizza lo score (potrebbe essere già normalizzato) score = abs(anomaly_score) if score < RISK_LEVELS['NORMALE']: update_counter('ip_normal') return 'NORMALE' elif score < RISK_LEVELS['BASSO']: update_counter('ip_low') return 'BASSO' elif score < RISK_LEVELS['MEDIO']: update_counter('ip_medium') return 'MEDIO' elif score < RISK_LEVELS['ALTO']: update_counter('ip_high') return 'ALTO' else: update_counter('ip_critical') return 'CRITICO' def is_known_attacker(engine, ip_address): """ Verifica se un IP è un attaccante noto Crea la tabella se non esiste """ try: with engine.connect() as conn: # Crea la tabella se non esiste logging.debug(f"Verifica tabella known_attackers per IP {ip_address}") create_table_query = text(""" CREATE TABLE IF NOT EXISTS known_attackers ( id INT AUTO_INCREMENT PRIMARY KEY, ip_address VARCHAR(45) NOT NULL, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, attack_count INT DEFAULT 1, risk_level VARCHAR(20) DEFAULT 'NORMALE', ports_used TEXT DEFAULT NULL, attack_patterns TEXT DEFAULT NULL, is_blocked TINYINT(1) DEFAULT 0, UNIQUE KEY unique_ip (ip_address) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) # Assicurati che la transazione sia valida conn.execute(create_table_query) # Verifica se l'IP esiste check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip") logging.debug(f"Esecuzione query: {check_query} con parametri: {{'ip': {ip_address}}}") result = conn.execute(check_query, {"ip": ip_address}).fetchone() exists = result is not None logging.debug(f"IP {ip_address} è un attaccante noto: {exists}") return exists except Exception as e: logging.error(f"Errore nel verificare se l'IP {ip_address} è un attaccante noto: {e}") return False def update_known_attacker(engine, ip_address, risk_level, port=None, message=None): """ Aggiorna o inserisce informazioni su un attaccante noto Ritorna il livello di rischio aggiornato """ try: # Inizializza porta e messaggio a valori accettabili per il DB port_str = str(port) if port is not None else None message_str = str(message) if message is not None else None with engine.connect() as conn: # Crea una transazione trans = conn.begin() # Verifica se l'IP esiste già check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip") result = conn.execute(check_query, {"ip": ip_address}).fetchone() if result: # Aggiorna esistente logging.debug(f"Aggiornamento attaccante esistente: {ip_address}, nuovo rischio={risk_level}") # Estrai le porte utilizzate e aggiungi quella nuova se non presente ports = result.ports_used or "" if port_str and port_str not in ports.split(','): if ports: ports += f",{port_str}" else: ports = port_str # Determina il nuovo livello di rischio (il massimo tra quello esistente e il nuovo) existing_risk = result.risk_level if existing_risk: risk_levels = ['BASSO', 'MEDIO', 'ALTO', 'CRITICO'] existing_idx = risk_levels.index(existing_risk) if existing_risk in risk_levels else 0 new_idx = risk_levels.index(risk_level) if risk_level in risk_levels else 1 final_risk = risk_levels[max(existing_idx, new_idx)] else: final_risk = risk_level # Aggiorna l'entry update_query = text(""" UPDATE known_attackers SET last_seen = NOW(), attack_count = attack_count + 1, risk_level = :risk, ports_used = :ports WHERE ip_address = :ip """) conn.execute(update_query, {"ip": ip_address, "risk": final_risk, "ports": ports}) trans.commit() return final_risk else: # Inserisci nuovo try: logging.debug(f"Inserimento nuovo attaccante: {ip_address}, rischio={risk_level}, porta={port}") insert_query = text(""" INSERT INTO known_attackers (ip_address, risk_level, ports_used, attack_patterns, is_blocked) VALUES (:ip, :risk, :port, :message, 0) """) conn.execute(insert_query, {"ip": ip_address, "risk": risk_level, "port": port_str, "message": message_str}) trans.commit() # Inizializza il contatore al nuovo livello di rischio if risk_level == 'BASSO': update_counter('ip_low') elif risk_level == 'MEDIO': update_counter('ip_medium') elif risk_level == 'ALTO': update_counter('ip_high') elif risk_level == 'CRITICO': update_counter('ip_critical') return risk_level except Exception as e: logging.error(f"Errore nell'inserimento del nuovo attaccante: {e}") trans.rollback() return risk_level except Exception as e: logging.error(f"Errore nell'aggiornamento dell'attaccante {ip_address}: {e}") return risk_level def should_block_ip(risk_level): """ Determina se un IP dovrebbe essere bloccato in base al suo livello di rischio """ should_block = risk_level in ['ALTO', 'CRITICO'] logging.debug(f"IP con rischio {risk_level} dovrebbe essere bloccato: {should_block}") return should_block def insert_anomaly_to_db(engine, ip_address, risk_level, list_name): """ Inserisce un'anomalia nel database Ritorna True se l'inserimento è avvenuto con successo, False altrimenti """ try: # Verifica la connessione al database if not engine: logging.error("Nessuna connessione al database disponibile") return False logging.debug(f"Tentativo di inserimento IP {ip_address} nella lista {list_name}") # Converti il livello di rischio in un valore numerico per il database risk_value = { 'BASSO': 1, 'MEDIO': 2, 'ALTO': 3, 'CRITICO': 4 }.get(risk_level, 2) # Default a MEDIO se non riconosciuto # Prepara la query di inserimento insert_query = text(""" INSERT INTO Fibra (IndirizzoIP, Data, Ora, Host, Attivo, Lista, NumeroAttacchi, LivelloDiRischio) VALUES (:ip, CURDATE(), CURTIME(), :host, 1, :lista, 1, :rischio) ON DUPLICATE KEY UPDATE Attivo = 1, NumeroAttacchi = NumeroAttacchi + 1, Data = CURDATE(), Ora = CURTIME(), LivelloDiRischio = GREATEST(LivelloDiRischio, :rischio) """) # Hostname dal database se disponibile hostname = "" # Tenta di ottenere l'hostname dal database se possibile try: hostname_query = text(""" SELECT Host FROM Esterna WHERE Messaggio2 LIKE :pattern AND Host IS NOT NULL AND Host != '' ORDER BY ID DESC LIMIT 1 """) with engine.connect() as conn: result = conn.execute(hostname_query, {"pattern": f"{ip_address}:%"}).fetchone() if result and result[0]: hostname = result[0] logging.debug(f"Hostname trovato per {ip_address}: {hostname}") except Exception as e: logging.warning(f"Impossibile recuperare l'hostname per {ip_address}: {e}") # Esegui la query di inserimento with engine.connect() as conn: result = conn.execute( insert_query, { "ip": ip_address, "host": hostname, "lista": list_name, "rischio": risk_value } ) conn.commit() # Esplicito commit per assicurarsi che la transazione venga completata logging.info(f"IP {ip_address} inserito/aggiornato nella lista {list_name} con rischio {risk_level}") # Verifica l'inserimento verify_query = text("SELECT COUNT(*) FROM Fibra WHERE IndirizzoIP = :ip AND Lista = :lista") count = conn.execute(verify_query, {"ip": ip_address, "lista": list_name}).scalar() if count > 0: logging.debug(f"Verifica inserimento: trovate {count} righe per IP {ip_address}") return True logging.error(f"Verifica inserimento fallita: nessuna riga trovata per IP {ip_address}") return False except Exception as e: logging.error(f"Errore nell'inserimento dell'anomalia nel database per IP {ip_address}: {e}") import traceback logging.error(traceback.format_exc()) return False def handle_anomaly(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'): """ Gestisce un'anomalia rilevata, aggiornando il database e generando avvisi """ try: # Verifico che l'indirizzo IP sia valido if not ip_address or pd.isna(ip_address): logging.warning(f"Indirizzo IP non valido: {ip_address}") return # Controllo se l'IP è già noto come attaccante is_known = is_known_attacker(engine, ip_address) # Se è già noto, aggiorno l'attacco, altrimenti lo creo if is_known: logging.debug(f"IP {ip_address} già noto, aggiornamento entry...") update_known_attacker(engine, ip_address, risk_level, port, message) else: logging.debug(f"IP {ip_address} nuovo attaccante, creazione entry...") # Inserisci l'anomalia nel database try: logging.debug(f"Inserimento anomalia nel DB per {ip_address} con rischio {risk_level}") insert_success = insert_anomaly_to_db(engine, ip_address, risk_level, list_name) if insert_success: # Aggiorna i contatori appropriati if risk_level == 'BASSO': update_counter('ip_low') elif risk_level == 'MEDIO': update_counter('ip_medium') elif risk_level == 'ALTO': update_counter('ip_high') elif risk_level == 'CRITICO': update_counter('ip_critical') # Conta per ogni tipo di lista update_counter(f'list_{list_name}') logging.info(f"Anomalia inserita con successo per IP {ip_address} (Rischio: {risk_level})") else: logging.error(f"Errore nell'inserimento dell'anomalia per IP {ip_address}") except Exception as insert_e: logging.error(f"Eccezione durante l'inserimento dell'anomalia: {insert_e}") import traceback logging.error(traceback.format_exc()) # Determina se l'IP dovrebbe essere bloccato if should_block_ip(risk_level): logging.warning(f"IP {ip_address} dovrebbe essere bloccato (Rischio: {risk_level})") # Qui potremmo implementare il blocco automatico dell'IP return True except Exception as e: logging.error(f"Errore nella gestione dell'anomalia per IP {ip_address}: {e}") import traceback logging.error(traceback.format_exc()) return False def process_batch(batch_data, engine, model, preprocessor, whitelist, sensitivity=5): """ Processa un batch di dati per rilevare anomalie Ritorna gli IP processati e il numero di anomalie rilevate """ try: # Prepara i dati X = prepare_data(batch_data, preprocessor) if X is None or X.shape[0] == 0: logging.error("Nessuna feature generata per la predizione nel batch") return set(), 0 # Fai la predizione predictions = predict_anomalies(model, X, sensitivity) # Aggiusta dimensioni se necessario if len(predictions) != len(batch_data): logging.warning(f"Dimensioni predizioni ({len(predictions)}) != dimensioni batch ({len(batch_data)})") if len(predictions) < len(batch_data): # Estendi l'array predictions = np.append(predictions, np.zeros(len(batch_data) - len(predictions))) else: # Tronca l'array predictions = predictions[:len(batch_data)] # Aggiungi il risultato al batch batch_data.loc[:, 'anomaly'] = predictions # Estrai gli IP e trova le anomalie processed_ips = set() anomaly_count = 0 # Gestisci le predizioni anomalies = batch_data[batch_data['anomaly'] == -1] anomaly_count = len(anomalies) if not anomalies.empty: # Estrai e prepara gli IP attaccanti if 'IP_Attaccante' not in batch_data.columns and 'Messaggio2' in batch_data.columns: batch_data['IP_Attaccante'] = batch_data['Messaggio2'].apply( lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None ) if 'Porta_Attaccante' not in batch_data.columns and 'Messaggio2' in batch_data.columns: batch_data['Porta_Attaccante'] = batch_data['Messaggio2'].apply( lambda x: x.split(':')[1] if pd.notna(x) and isinstance(x, str) and ':' in x else None ) # Processa ciascun IP anomalo for idx, row in anomalies.iterrows(): ip = row.get('IP_Attaccante') if pd.notna(ip) and not is_ip_whitelisted(ip, whitelist): processed_ips.add(ip) # Assegna un livello di rischio base risk_level = 'MEDIO' # Default port = None if 'Porta_Attaccante' in row and pd.notna(row['Porta_Attaccante']): port = row['Porta_Attaccante'] # Crea un messaggio informativo msg = f"Anomalia rilevata da {row.get('Messaggio2', 'N/A')}" try: # Gestisci anomalia con cattura delle eccezioni specifiche handle_anomaly(engine, ip, risk_level, port, msg, 'ddos_ia') except Exception as e: # Log dell'errore ma continua con altre IP logging.warning(f"Errore nel gestire l'anomalia per IP {ip}: {e}") # Restituisci IP elaborati e conteggio anomalie return processed_ips, len(anomalies) except Exception as e: logging.error(f"Errore nell'elaborazione del batch: {e}") return set(), 0 def esegui_analisi(args, ciclo_count=0): # Visualizza informazioni di avvio if args.ciclo: ciclo_txt = f" (ciclo {ciclo_count})" else: ciclo_txt = "" log_phase(f"Avvio rilevamento DDoS{ciclo_txt}") # Mostra informazioni sulla modalità di esecuzione if args.parallel: log_result(f"Modalità parallela attiva con {args.workers} worker") log_result(f"Dimensione batch: {args.batch_size} record") start_progress_tracking(f"rilevamento DDoS{ciclo_txt}") # Statistiche per questo ciclo stats = { 'records': 0, 'anomalies': 0, 'unique_ips': 0 } # Verifica percorsi e autorizzazioni dei file di modello logging.debug("Verifica dei percorsi dei modelli...") model_files = { "Modello principale": MODEL_PATH, "Preprocessor": PREPROCESSOR_PATH } all_models_ok = True for name, path in model_files.items(): if os.path.exists(path): try: # Verifica che il file sia leggibile with open(path, 'rb') as f: f.read(1) logging.debug(f"✅ {name} trovato e leggibile: {path}") except Exception as e: log_error(f"Il file {name} esiste ma non è leggibile: {e}") all_models_ok = False else: log_error(f"File {name} non trovato: {path}") all_models_ok = False if all_models_ok: log_result("Tutti i file modello sono presenti e leggibili") # Test connessione database if not test_database_connection(): log_error("Impossibile connettersi al database. Verificare le credenziali e la disponibilità del server.") end_progress_tracking() return False, stats log_result("Connessione al database stabilita") try: # Connessione al database logging.debug("Connessione al database...") engine = create_engine_with_retry(CONN_STRING) # Caricamento del modello log_phase("Caricamento dei modelli") model, preprocessor = load_models() if model is None: log_error("Impossibile caricare il modello. Arresto del programma.") end_progress_tracking() return False, stats # Verifica che il modello sia valido if not hasattr(model, 'predict'): log_error("Il modello caricato non è valido (manca il metodo predict). Arresto del programma.") end_progress_tracking() return False, stats log_result(f"Modello caricato correttamente con {len(preprocessor.get('feature_columns', []))} feature") # Carica la whitelist e processa i dati whitelist = load_whitelist(args.whitelist) # Verifica se ci sono dati nuovi last_id = load_last_analyzed_id() logging.debug(f"Last analyzed ID: {last_id}") # Se richiesto di saltare i vecchi record if args.skip_old: # Trova il record più recente try: with engine.connect() as conn: latest_id_query = "SELECT MAX(ID) FROM Esterna" latest_id = conn.execute(text(latest_id_query)).scalar() if latest_id: # Calcola un offset (es. ultimi 10k record) offset = min(latest_id - last_id, 10000) new_last_id = max(latest_id - offset, last_id) if new_last_id > last_id: log_warning(f"Saltando {new_last_id - last_id} record vecchi") last_id = new_last_id except Exception as e: log_error(f"Errore nel determinare l'ID più recente: {e}") # Usa max_id se specificato max_id = args.max_id # FLUSSO OTTIMIZZATO try: # 1. Estrai dati (ottimizzato) log_phase("Estrazione dati dal database") new_data = extract_data(engine, last_id, args.batch_size, max_id) if new_data.empty: log_result("Nessun nuovo dato da analizzare") end_progress_tracking() return True, stats total_records = len(new_data) stats['records'] = total_records last_analyzed_id = new_data['ID'].max() log_result(f"Estratti {total_records} record (ID da {last_id+1} a {last_analyzed_id})") # Elaborazione in parallelo o sequenziale if args.parallel and total_records > 1000: log_phase(f"Elaborazione parallela con {args.workers} worker") # Dividi i dati in batch più piccoli batch_size = min(1000, max(100, total_records // args.workers)) num_batches = (total_records + batch_size - 1) // batch_size batches = [] # Usa un metodo più sicuro per dividere i dati in batch # Evita warning di pandas utilizzando loc in modo esplicito for i in range(num_batches): start_idx = i * batch_size end_idx = min(start_idx + batch_size, total_records) # Crea una copia dei dati per ogni batch invece di una vista batch_indices = new_data.index[start_idx:end_idx] batch = new_data.loc[batch_indices].copy() batches.append(batch) logging.debug(f"Dati suddivisi in {len(batches)} batch per elaborazione parallela") # Processa batch in parallelo con gestione degli errori migliorata all_processed_ips = set() total_anomalies = 0 # Usa lock per aggiornamenti concorrenti più sicuri processed_lock = threading.Lock() with ThreadPoolExecutor(max_workers=args.workers) as executor: # Crea un dizionario di future future_to_batch = { executor.submit(process_batch, batch, engine, model, preprocessor, whitelist, args.sensibility): i for i, batch in enumerate(batches) } for future in as_completed(future_to_batch): batch_idx = future_to_batch[future] try: processed_ips, anomaly_count = future.result() # Usa lock per aggiornamenti thread-safe with processed_lock: if processed_ips: all_processed_ips.update(processed_ips) total_anomalies += anomaly_count logging.debug(f"Batch {batch_idx+1}/{len(batches)} completato: {anomaly_count} anomalie") except Exception as e: log_error(f"Errore nell'elaborazione del batch {batch_idx}: {e}") # Aggiorna le statistiche stats['anomalies'] = total_anomalies stats['unique_ips'] = len(all_processed_ips) log_result(f"Elaborazione completata: {len(all_processed_ips)} IP unici con anomalie, {total_anomalies} anomalie totali") else: # Elaborazione sequenziale (come prima) log_phase("Analisi dati e rilevamento anomalie") # Crea una copia esplicita per evitare SettingWithCopyWarning df = new_data.copy() # Estrai e prepara gli IP attaccanti logging.debug("Preparazione IP attaccanti...") # Converti in pochi passaggi per evitare blocchi df.loc[:, 'IP_Attaccante'] = df['Messaggio2'].apply( lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None ) # Conta solo un campione di IP whitelistati per evitare blocchi whitelisted_ips = set() if len(df) > 100: # Verifichiamo tutti gli IP per la whitelist all_ips = df['IP_Attaccante'].dropna().unique() logging.debug(f"Verifica di {len(all_ips)} IP unici contro la whitelist...") for ip in all_ips: if is_ip_whitelisted(ip, whitelist): whitelisted_ips.add(ip) logging.debug(f"Trovati {len(whitelisted_ips)} IP in whitelist su {len(all_ips)} IP unici") # Prepara i dati con metodo minimalista logging.debug("Preparazione dati per predizione...") features = prepare_data(df, preprocessor) if features is None or features.shape[0] == 0: log_error("Nessuna feature generata per la predizione.") end_progress_tracking() return True, stats # Fai la predizione con la sensibilità specificata logging.debug("Predizione anomalie...") predictions = predict_anomalies(model, features, args.sensibility) # Aggiusta le dimensioni se necessario if len(predictions) != len(df): log_warning(f"Dimensioni differenti: predizioni {len(predictions)}, dati {len(df)}") if len(predictions) < len(df): # Estendi l'array delle predizioni predictions = np.append(predictions, np.zeros(len(df) - len(predictions))) else: # Tronca l'array delle predizioni predictions = predictions[:len(df)] # Applica risultati usando .loc per evitare warning df.loc[:, 'anomaly'] = predictions # Aggiorna i contatori per ogni IP (senza duplicati) ip_counter = {} for idx, row in df.iterrows(): ip = row.get('IP_Attaccante') if pd.notna(ip) and ip not in ip_counter: ip_counter[ip] = True if ip not in whitelisted_ips: # Un nuovo IP analizzato update_counter('ip_analyzed') # Classifica in base al risultato della predizione if row['anomaly'] == -1: # Anomalo - non incrementiamo qui, lo farà update_known_attacker pass else: # Normale update_counter('ip_normal') # Aggiorna le statistiche stats['unique_ips'] = len(ip_counter) - len(whitelisted_ips) log_result(f"Analizzati {len(ip_counter)} IP unici, {len(whitelisted_ips)} in whitelist") # Gestisci anomalie anomalies = df[df['anomaly'] == -1] stats['anomalies'] = len(anomalies) if not anomalies.empty: log_phase(f"Gestione di {len(anomalies)} anomalie") # Mostra un esempio delle anomalie nei log sample_size = min(5, len(anomalies)) if sample_size > 0: logging.debug(f"Esempio di {sample_size} anomalie rilevate:") for idx, row in anomalies.head(sample_size).iterrows(): ip = row.get('IP_Attaccante') msg = row.get('Messaggio2', 'N/A') logging.debug(f" - [{idx}] IP: {ip}, Messaggio: {msg}") # Conteggia gli IP anomali per IP anomaly_ips = {} for idx, row in anomalies.iterrows(): ip = row.get('IP_Attaccante') if pd.notna(ip): if ip not in anomaly_ips: anomaly_ips[ip] = 0 anomaly_ips[ip] += 1 log_result(f"Trovati {len(anomaly_ips)} IP unici con anomalie") # Mostra i top 10 IP con più anomalie top_anomalies = sorted(anomaly_ips.items(), key=lambda x: x[1], reverse=True)[:10] if top_anomalies: logging.debug("Top 10 IP con più anomalie:") for ip, count in top_anomalies: logging.debug(f" - IP: {ip}, Anomalie: {count}") # Processa ciascun IP una sola volta processed_ips = set() for idx, row in anomalies.iterrows(): ip = row.get('IP_Attaccante') if pd.notna(ip) and ip not in processed_ips and not is_ip_whitelisted(ip, whitelist): processed_ips.add(ip) # Assegna un livello di rischio base risk_level = 'MEDIO' # Default port = None if 'Porta_Attaccante' in row and pd.notna(row['Porta_Attaccante']): port = row['Porta_Attaccante'] # Crea un messaggio informativo msg = f"Anomalia rilevata da {row.get('Messaggio2', 'N/A')}" try: # Gestisci l'anomalia con cattura dell'eccezione handle_anomaly(engine, ip, risk_level, port, msg, 'ddos_ia') except Exception as e: logging.error(f"Errore nella gestione dell'anomalia per IP {ip}: {e}") log_result(f"Elaborate {len(processed_ips)} anomalie uniche") else: log_result("Nessuna anomalia rilevata") # Salva l'ultimo ID analizzato try: save_last_analyzed_id(last_analyzed_id) except Exception as e: logging.error(f"Errore nel salvare l'ultimo ID analizzato: {e}") # Segnala il completamento log_phase(f"Analisi completata") log_result(f"Processati {len(new_data)} eventi, ID fino a {last_analyzed_id}") end_progress_tracking() # Forza la liberazione della memoria del new_data gc.collect() return True, stats except Exception as e: log_error(f"Errore durante l'analisi: {e}") import traceback logging.error(f"Traceback completo: {traceback.format_exc()}") end_progress_tracking() return False, stats except Exception as e: log_error(f"Errore generale: {e}") import traceback logging.error(f"Traceback completo: {traceback.format_exc()}") end_progress_tracking() return False, stats def main(): """ Funzione principale per il rilevamento DDoS """ # Parsing degli argomenti parser = argparse.ArgumentParser(description='Rilevamento DDoS in tempo reale') parser.add_argument('--debug', action='store_true', help='Abilita logging di debug dettagliato') parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione del batch di dati da analizzare') parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso del file whitelist') parser.add_argument('--ciclo', action='store_true', help='Esegui in un ciclo infinito fino all\'interruzione') parser.add_argument('--pausa', type=int, default=60, help='Secondi di pausa tra un ciclo e l\'altro (con --ciclo)') parser.add_argument('--parallel', action='store_true', help='Abilita elaborazione parallela per migliorare performance') parser.add_argument('--workers', type=int, default=4, help='Numero di worker per elaborazione parallela') parser.add_argument('--max-id', type=int, default=None, help='ID massimo da elaborare, utile per limitare arretrati') parser.add_argument('--skip-old', action='store_true', help='Salta i record vecchi e analizza solo i più recenti') parser.add_argument('--quiet', action='store_true', help='Modalità silenziosa, mostra solo errori e risultati fondamentali') parser.add_argument('--max-records', type=int, default=5000, help='Numero massimo di record da analizzare per ciclo') parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11), help='Sensibilità di rilevamento anomalie (1-10): 1=massima sensibilità, 10=minima sensibilità') args = parser.parse_args() # Imposta il livello di logging in base agli argomenti if args.debug: logging.getLogger().setLevel(logging.DEBUG) logging.debug("Modalità debug attivata") elif args.quiet: logging.getLogger().setLevel(logging.ERROR) print("Modalità silenziosa attivata: verranno mostrati solo errori e risultati fondamentali") # Visualizza info sulla sensibilità if args.sensibility != 5: log_result(f"Sensibilità di rilevamento: {args.sensibility}/10 " + ("(più sensibile)" if args.sensibility < 5 else "(meno sensibile)" if args.sensibility > 5 else "(standard)")) # Statistiche globali per i cicli cicli_stats = { 'cicli_completati': 0, 'total_records': 0, 'total_anomalies': 0, 'total_unique_ips': 0, 'start_time': time.time() } # Gestisce l'interruzione con CTRL+C def handle_interrupt(signum, frame): elapsed = time.time() - cicli_stats['start_time'] print(f"\n{Colors.BOLD}{Colors.RED}Interruzione ricevuta. Chiusura del programma...{Colors.END}") print(f"\n{Colors.BOLD}{Colors.BLUE}STATISTICHE TOTALI{Colors.END}") print(f"Tempo totale di esecuzione: {elapsed:.1f} secondi") print(f"Cicli completati: {cicli_stats['cicli_completati']}") print(f"Record analizzati: {cicli_stats['total_records']}") print(f"Anomalie rilevate: {cicli_stats['total_anomalies']}") print(f"IP unici analizzati: {cicli_stats['total_unique_ips']}") end_progress_tracking() exit(0) # Registra il gestore per SIGINT (CTRL+C) signal.signal(signal.SIGINT, handle_interrupt) # Ciclo infinito quando --ciclo è specificato ciclo_count = 0 # Esegui una singola analisi o in ciclo if args.ciclo: log_phase(f"Esecuzione in modalità ciclo infinito") while True: ciclo_count += 1 # Limita il massimo di record da elaborare per ciclo args.batch_size = min(args.batch_size, args.max_records) # Esegui l'analisi e cattura i risultati success, stats = esegui_analisi(args, ciclo_count) # Aggiorna le statistiche complessive if success and stats: cicli_stats['cicli_completati'] += 1 cicli_stats['total_records'] += stats.get('records', 0) cicli_stats['total_anomalies'] += stats.get('anomalies', 0) cicli_stats['total_unique_ips'] += stats.get('unique_ips', 0) if success: log_result(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...") time.sleep(args.pausa) else: log_error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi prima di riprovare...") time.sleep(args.pausa * 2) # Pausa più lunga in caso di errore else: # Modalità singola esegui_analisi(args, 0) if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "--test": logging.info("MODALITÀ TEST: verifica delle funzioni principali") try: engine = create_engine_with_retry(CONN_STRING) test_ip = "192.168.1.1" logging.info(f"Test 1: Verifica tabella known_attackers") is_known = is_known_attacker(engine, test_ip) logging.info(f"IP {test_ip} è un attaccante noto: {is_known}") logging.info(f"Test 2: Aggiornamento known_attacker") new_risk = update_known_attacker(engine, test_ip, "NORMALE", "80", "Test message") logging.info(f"Nuovo livello di rischio: {new_risk}") logging.info(f"Test 3: Verifica se ora è un attaccante noto") is_known = is_known_attacker(engine, test_ip) logging.info(f"IP {test_ip} è ora un attaccante noto: {is_known}") logging.info("MODALITÀ TEST completata") sys.exit(0) except Exception as e: logging.error(f"Errore nei test: {e}") sys.exit(1) else: main()