import pandas as pd from sqlalchemy import create_engine from joblib import load import logging import gc import os import time from collections import defaultdict from datetime import datetime, timedelta, timezone from scipy import sparse from scipy.sparse import hstack import paramiko import ipaddress from dotenv import load_dotenv import numpy as np import sys import pickle from sqlalchemy.sql import text # Carica le variabili d'ambiente load_dotenv() # Configurazione del logging avanzata per il debug logging.basicConfig( level=logging.DEBUG, # Cambiato da INFO a DEBUG per maggiori informazioni format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('ddetect_debug.log') # Aggiunto file di log separato per debug ] ) # Configurazione del database DB_USER = os.getenv('DB_USER', 'root') DB_PASSWORD = os.getenv('DB_PASSWORD', 'Hdgtejskjjc0-') DB_HOST = os.getenv('DB_HOST', 'localhost') DB_NAME = os.getenv('DB_NAME', 'LOG_MIKROTIK') CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}' # Percorsi dei file del modello MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models') MODEL_PATH = os.path.join(MODEL_DIR, 'model.pkl') PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.pkl') # Percorso del file di whitelist WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt') # Percorso per tracciare l'ultimo ID analizzato LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt') # Configura il file di blocco degli IP ip_block_file = 'ip_block.txt' ip_activity_tracker = defaultdict(list) # Traccia le attività degli IP ip_count_tracker = defaultdict(int) # Traccia il conteggio delle occorrenze degli IP ip_last_seen = {} # Traccia l'ultimo timestamp in cui l'IP è stato visto # Definizione dei livelli di rischio e soglie RISK_LEVELS = { 'NORMALE': 0.1, # Nuovo livello aggiunto 'BASSO': 0.3, 'MEDIO': 0.6, 'ALTO': 0.8, 'CRITICO': 0.95 } # Stampa di debug per le configurazioni logging.debug(f"Percorsi: MODEL_DIR={MODEL_DIR}, WHITELIST_PATH={WHITELIST_PATH}") logging.debug(f"Livelli di rischio configurati: {RISK_LEVELS}") # Funzioni per la sincronizzazione con il router MikroTik def load_ip_block(file_path): """ Carica gli IP bloccati dal file di blocco. Formato per ogni riga: IP:count:last_seen_timestamp """ ip_set = set() if not os.path.exists(file_path): logging.info(f"Il file {file_path} non esiste. Verrà creato uno nuovo.") return ip_set with open(file_path, 'r') as f: for line in f: stripped_line = line.strip() if not stripped_line: logging.warning("Linea vuota trovata, saltata.") continue parts = stripped_line.split(':', 2) # Split solo sui primi due due punti if len(parts) == 3: ip, count, last_seen_str = parts logging.debug(f"Parsing IP: {ip}, Count: {count}, Timestamp: {last_seen_str}") try: # Verifica se il count è un intero count = int(count) # Verifica se il timestamp è nel formato corretto con fuso orario last_seen = datetime.fromisoformat(last_seen_str) ip_set.add(ip) except ValueError as ve: logging.warning(f"Formato non valido per l'IP {ip}: count='{count}', timestamp='{last_seen_str}'. Errore: {ve}") else: logging.warning(f"Linea non valida nel file di blocco: {stripped_line}") return ip_set def get_current_ddos_ia(ssh, list_name='ddos_ia'): """ Recupera gli IP attualmente presenti nella lista ddos_ia del router. """ command = f"/ip firewall address-list print where list={list_name}" stdin, stdout, stderr = ssh.exec_command(command) output = stdout.read().decode('utf-8') error = stderr.read().decode('utf-8') if error: logging.error(f"Errore durante l'esecuzione del comando: {error}") return set() current_ips = set() for line in output.splitlines(): line = line.strip() # Ignora linee non-dati if not line or line.startswith('Flags:') or line.startswith('Columns:') or line.startswith(';;;'): continue parts = line.split() if len(parts) >= 4: ip_candidate = parts[3] try: # Verifica se il candidato è un IP valido ip = ipaddress.ip_address(ip_candidate) current_ips.add(str(ip)) logging.debug(f"IP valido trovato: {ip}") except ValueError: logging.warning(f"Non è un IP valido: {ip_candidate}") return current_ips def add_ip_to_ddos_ia(ssh, ip, timeout='480:00:00', comment='DDoS Attacker', list_name='ddos_ia'): """ Aggiunge un IP alla lista ddos_ia del router. """ command = f"/ip firewall address-list add list={list_name} address={ip} timeout={timeout} comment=\"{comment}\"" logging.info(f"Aggiungo {ip} alla lista {list_name}...") stdin, stdout, stderr = ssh.exec_command(command, timeout=10) output = stdout.read().decode('utf-8').strip() error = stderr.read().decode('utf-8').strip() if output: logging.debug(f"Output aggiunta IP {ip}: {output}") if error: logging.error(f"Errore durante l'aggiunta di {ip}: {error}") def is_ip_whitelisted(ip, whitelist): """ Verifica se un IP è presente nella whitelist. """ try: ip_obj = ipaddress.ip_address(ip) except ValueError: logging.warning(f"IP non valido durante la verifica whitelist: {ip}") return False for entry in whitelist: if isinstance(entry, ipaddress.IPv4Network) or isinstance(entry, ipaddress.IPv6Network): if ip_obj in entry: return True elif isinstance(entry, ipaddress.IPv4Address) or isinstance(entry, ipaddress.IPv6Address): if ip_obj == entry: return True return False def run_ssh_commands(router_ip, username, password, port, ip_block_file, whitelist, list_name='ddos_ia', max_retries=3, retry_delay=5): """ Sincronizza gli IP bloccati con la lista ddos_ia sul router MikroTik. Aggiunge solo gli IP mancanti senza rimuovere alcuno e verifica la whitelist. """ retries = 0 while retries < max_retries: try: # Crea una connessione SSH logging.info(f"Connettendo a {router_ip} sulla porta {port}... (Tentativo {retries + 1})") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(router_ip, username=username, password=password, port=int(port)) logging.info("Connessione SSH riuscita.") # Carica gli IP dal file di blocco blocked_ips = load_ip_block(ip_block_file) logging.info(f"IP bloccati da aggiungere: {len(blocked_ips)}") # Ottieni gli IP attuali nella lista ddos_ia current_ddos_ips = get_current_ddos_ia(ssh, list_name) logging.info(f"IP attualmente nella lista {list_name}: {len(current_ddos_ips)}") # Determina quali IP aggiungere, escludendo quelli nella whitelist ips_to_add = {ip for ip in blocked_ips - current_ddos_ips if not is_ip_whitelisted(ip, whitelist)} logging.info(f"IP da aggiungere dopo verifica whitelist: {len(ips_to_add)}") # Aggiungi gli IP mancanti for ip in ips_to_add: add_ip_to_ddos_ia(ssh, ip, list_name=list_name) # **Non rimuovere alcun IP** # Chiudi la connessione SSH ssh.close() logging.info("Connessione SSH chiusa.") logging.info("Aggiornamento della lista ddos_ia completato.") # Successo, esci dalla funzione return except paramiko.SSHException as ssh_error: logging.error(f"Errore SSH: {ssh_error}") except FileNotFoundError as fnf_error: logging.error(f"Errore file non trovato: {fnf_error}") except Exception as e: logging.error(f"Errore durante l'esecuzione del comando SSH: {e}") # Incrementa il contatore dei ritentativi retries += 1 logging.info(f"Ritentativo {retries} di {max_retries} dopo {retry_delay} secondi...") time.sleep(retry_delay) logging.error("Massimo numero di ritentativi raggiunto. Operazione fallita.") def save_ip_block(file_path): """ Salva gli IP bloccati nel file di blocco con il formato IP:count:last_seen_timestamp. """ try: with open(file_path, 'w') as f: for ip, count in ip_count_tracker.items(): last_seen = ip_last_seen.get(ip) if last_seen: # Assicurati di salvare il timestamp in formato ISO con fuso orario last_seen_str = last_seen.astimezone(timezone.utc).isoformat() f.write(f"{ip}:{count}:{last_seen_str}\n") logging.info(f"Salvati {len(ip_count_tracker)} IP nel file di blocco.") except Exception as e: logging.error(f"Errore durante il salvataggio del file di blocco: {e}") def load_models(): """ Carica il modello e gli oggetti di preprocessing """ logging.info("Caricamento del modello e degli oggetti di preprocessing...") try: model_exists = os.path.exists(MODEL_PATH) preprocessor_exists = os.path.exists(PREPROCESSOR_PATH) logging.debug(f"File modello esiste: {model_exists}, file preprocessor esiste: {preprocessor_exists}") if model_exists and preprocessor_exists: model = joblib.load(MODEL_PATH) preprocessor = joblib.load(PREPROCESSOR_PATH) logging.info("Modello e preprocessor caricati con successo.") else: logging.warning("Modello o preprocessor non trovati. Utilizzo di un modello semplice per test.") # Crea un modello dummy per scopi di test from sklearn.ensemble import IsolationForest model = IsolationForest(contamination=0.05, random_state=42) preprocessor = None logging.info("Caricamento completato.") return model, preprocessor except Exception as e: logging.error(f"Errore nel caricamento del modello: {e}") # Crea un modello dummy per scopi di test from sklearn.ensemble import IsolationForest model = IsolationForest(contamination=0.05, random_state=42) preprocessor = None logging.warning("Utilizzando un modello di fallback a causa dell'errore.") return model, preprocessor def classify_risk(anomaly_score): """ Classifica il livello di rischio in base allo score di anomalia """ logging.debug(f"Classificazione rischio per score: {anomaly_score}") if anomaly_score < RISK_LEVELS['NORMALE']: return 'NORMALE' elif anomaly_score < RISK_LEVELS['BASSO']: return 'BASSO' elif anomaly_score < RISK_LEVELS['MEDIO']: return 'MEDIO' elif anomaly_score < RISK_LEVELS['ALTO']: return 'ALTO' else: return 'CRITICO' def is_known_attacker(engine, ip_address): """ Verifica se un IP è un attaccante noto Crea la tabella se non esiste """ try: with engine.connect() as conn: # Crea la tabella se non esiste logging.debug(f"Verifica tabella known_attackers per IP {ip_address}") create_table_query = text(""" CREATE TABLE IF NOT EXISTS known_attackers ( id INT AUTO_INCREMENT PRIMARY KEY, ip_address VARCHAR(45) NOT NULL, first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, attack_count INT DEFAULT 1, risk_level VARCHAR(20) DEFAULT 'NORMALE', ports_used TEXT DEFAULT NULL, attack_patterns TEXT DEFAULT NULL, is_blocked TINYINT(1) DEFAULT 0, UNIQUE KEY unique_ip (ip_address) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) # Assicurati che la transazione sia valida conn.execute(create_table_query) # Verifica se l'IP esiste check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip") logging.debug(f"Esecuzione query: {check_query} con parametri: {{'ip': {ip_address}}}") result = conn.execute(check_query, {"ip": ip_address}).fetchone() exists = result is not None logging.debug(f"IP {ip_address} è un attaccante noto: {exists}") return exists except Exception as e: logging.error(f"Errore nel verificare se l'IP {ip_address} è un attaccante noto: {e}") return False def update_known_attacker(engine, ip_address, risk_level, port=None, message=None): """ Aggiorna o inserisce un attaccante noto Incrementa il livello di rischio se l'IP viene visto più volte """ try: conn = engine.connect() trans = conn.begin() # Inizia una transazione esplicita try: # Verifica se l'IP esiste già logging.debug(f"Aggiornamento attaccante noto: {ip_address}, rischio iniziale: {risk_level}") query = text("SELECT * FROM known_attackers WHERE ip_address = :ip") result = conn.execute(query, {"ip": ip_address}).fetchone() if result: # Ottieni il conteggio attacchi e il rischio attuale attack_count = result[4] + 1 # Indice 4 = attack_count current_risk = result[5] # Indice 5 = risk_level current_ports = result[6] # Indice 6 = ports_used logging.debug(f"IP {ip_address}: conteggio attuale={attack_count-1}, rischio attuale={current_risk}") # Aggiorna la lista delle porte new_ports = current_ports or "" if port and port not in new_ports: new_ports = f"{new_ports},{port}" if new_ports else port # Incrementa il livello di rischio basato sul numero di rilevamenti new_risk = risk_level # Escalation del rischio in base al numero di rilevamenti if risk_level == 'NORMALE' and attack_count >= 10: new_risk = 'BASSO' logging.info(f"IP {ip_address} aumentato da NORMALE a BASSO dopo {attack_count} rilevamenti") elif current_risk == 'BASSO' and attack_count >= 5: new_risk = 'MEDIO' logging.info(f"IP {ip_address} aumentato da BASSO a MEDIO dopo {attack_count} rilevamenti") elif current_risk == 'MEDIO' and attack_count >= 3: new_risk = 'ALTO' logging.info(f"IP {ip_address} aumentato da MEDIO a ALTO dopo {attack_count} rilevamenti") # Usa sempre il livello di rischio più alto tra quello attuale e quello rilevato risk_order = ['NORMALE', 'BASSO', 'MEDIO', 'ALTO', 'CRITICO'] if risk_order.index(current_risk) > risk_order.index(new_risk): new_risk = current_risk # Mantiene il rischio più alto logging.debug(f"IP {ip_address}: nuovo conteggio={attack_count}, nuovo rischio={new_risk}, porte={new_ports}") # Aggiorna l'esistente update_query = text(""" UPDATE known_attackers SET last_seen = NOW(), attack_count = attack_count + 1, risk_level = :risk, ports_used = :ports WHERE ip_address = :ip """) conn.execute(update_query, {"ip": ip_address, "risk": new_risk, "ports": new_ports}) # Commit della transazione trans.commit() # Restituisci il nuovo livello di rischio return new_risk else: # Inserisci nuovo logging.debug(f"Inserimento nuovo attaccante: {ip_address}, rischio={risk_level}, porta={port}") insert_query = text(""" INSERT INTO known_attackers (ip_address, risk_level, ports_used, attack_patterns, is_blocked) VALUES (:ip, :risk, :port, :message, 0) """) result = conn.execute(insert_query, {"ip": ip_address, "risk": risk_level, "port": port, "message": message}) logging.debug(f"Risultato inserimento: {result.rowcount} righe inserite") # Commit della transazione trans.commit() return risk_level except Exception as e: # Rollback in caso di errore trans.rollback() logging.error(f"Errore nell'aggiornare l'attaccante noto {ip_address}: {e}") logging.error(f"Dettagli errore: {str(e)}") return risk_level finally: # Chiudi la connessione conn.close() except Exception as e: logging.error(f"Errore nel creare la connessione per {ip_address}: {e}") return risk_level def should_block_ip(risk_level): """ Determina se un IP dovrebbe essere bloccato in base al suo livello di rischio """ should_block = risk_level in ['ALTO', 'CRITICO'] logging.debug(f"IP con rischio {risk_level} dovrebbe essere bloccato: {should_block}") return should_block def handle_anomaly(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'): """ Gestisce un'anomalia decidendo se bloccare l'IP e aggiornare le tabelle """ try: # Prima aggiorna l'attaccante noto, il che potrebbe aumentare il livello di rischio logging.debug(f"Gestione anomalia per IP {ip_address} con rischio {risk_level}, porta {port}") updated_risk_level = update_known_attacker(engine, ip_address, risk_level, port, message) # Verifica se l'IP dovrebbe essere bloccato if should_block_ip(updated_risk_level): # Se è ALTO o CRITICO, inserisci nella tabella ip_list per il blocco insert_anomaly_to_db(engine, ip_address, updated_risk_level, list_name) logging.info(f"IP {ip_address} con rischio {updated_risk_level} aggiunto alla lista di blocco") return True else: logging.info(f"IP {ip_address} con rischio {updated_risk_level} monitorato ma non bloccato") return False except Exception as e: logging.error(f"Errore nella gestione dell'anomalia per IP {ip_address}: {e}") return False def insert_anomaly_to_db(engine, ip_address, risk_level, list_name): """ Inserisce o aggiorna un'anomalia nel database """ try: with engine.connect() as conn: # Verifica se la colonna risk_level esiste nella tabella ip_list try: # Verifica la struttura della tabella check_column_query = text(""" SELECT COUNT(*) AS column_exists FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = :db_name AND TABLE_NAME = 'ip_list' AND COLUMN_NAME = 'risk_level' """) db_name = os.environ.get('MYSQL_DATABASE', 'LOG_MIKROTIK') result = conn.execute(check_column_query, {"db_name": db_name}).fetchone() if result[0] == 0: # La colonna non esiste, aggiungiamola logging.info("La colonna risk_level non esiste nella tabella ip_list. Aggiunta in corso...") alter_query = text(""" ALTER TABLE ip_list ADD COLUMN risk_level VARCHAR(20) DEFAULT 'MEDIO' """) conn.execute(alter_query) logging.info("Colonna risk_level aggiunta con successo.") else: logging.debug("La colonna risk_level esiste già nella tabella ip_list.") except Exception as e: logging.error(f"Errore nella verifica/aggiunta della colonna risk_level: {e}") # Continua comunque, potrebbe fallire nell'insert ma almeno ci abbiamo provato # Ora procediamo con l'inserimento upsert_query = text(""" INSERT INTO ip_list (list_name, ip_address, retrieved_at, risk_level) VALUES (:list_name, :ip_address, NOW(), :risk_level) ON DUPLICATE KEY UPDATE retrieved_at = NOW(), risk_level = :risk_level """) conn.execute(upsert_query, { "list_name": list_name, "ip_address": ip_address, "risk_level": risk_level }) logging.info(f"IP {ip_address} inserito/aggiornato nella lista {list_name} con rischio {risk_level}") return True except Exception as e: logging.error(f"Errore nell'inserire l'IP {ip_address} nel database: {e}") return False def check_ip_in_db(engine, ip_address, list_name='ddos_ia'): """ Verifica se un IP è già presente nel database """ try: with engine.connect() as conn: query = text(""" SELECT COUNT(*) as count FROM ip_list WHERE ip_address = :ip_address AND list_name = :list_name """) result = conn.execute(query, {"ip_address": ip_address, "list_name": list_name}).fetchone() return result[0] > 0 except Exception as e: logging.error(f"Errore nella verifica dell'IP {ip_address}: {e}") return False def load_whitelist(file_path): """ Carica la whitelist dal file. """ whitelist = [] if not os.path.exists(file_path): logging.warning(f"Il file whitelist {file_path} non esiste. Nessun IP sarà escluso.") return whitelist with open(file_path, 'r') as f: for line in f: stripped_line = line.strip() if not stripped_line or stripped_line.startswith('#'): continue # Salta linee vuote o commenti try: if '/' in stripped_line: network = ipaddress.ip_network(stripped_line, strict=False) whitelist.append(network) else: ip = ipaddress.ip_address(stripped_line) whitelist.append(ip) logging.debug(f"Aggiunto alla whitelist: {stripped_line}") except ValueError as ve: logging.warning(f"Formato non valido nella whitelist: {stripped_line}. Errore: {ve}") logging.info(f"Whitelisted {len(whitelist)} IP o network.") return whitelist def test_database_connection(): """ Test di connessione al database """ try: logging.debug("Tentativo di connessione al database...") engine = create_engine(f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}') with engine.connect() as conn: result = conn.execute(text("SELECT 1")).fetchone() if result[0] == 1: logging.debug("Test connessione al database riuscito!") tables = conn.execute(text("SHOW TABLES")).fetchall() logging.debug(f"Tabelle disponibili: {[t[0] for t in tables]}") return True return False except Exception as e: logging.error(f"Errore nel test di connessione al database: {e}") return False def load_last_analyzed_id(): """ Carica l'ultimo ID analizzato dal file """ try: if os.path.exists(LAST_ID_PATH): with open(LAST_ID_PATH, 'r') as f: last_id = int(f.read().strip()) return last_id else: logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.") return 0 except Exception as e: logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}") return 0 def save_last_analyzed_id(last_id): """ Salva l'ultimo ID analizzato nel file """ try: with open(LAST_ID_PATH, 'w') as f: f.write(str(last_id)) logging.info(f"Ultimo ID analizzato salvato: {last_id}") except Exception as e: logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}") def extract_data(engine, last_analyzed_id, batch_size=10000): """ Estrae i dati dal database a partire dall'ultimo ID analizzato """ try: query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4 FROM Fibra WHERE ID > :last_id ORDER BY ID ASC LIMIT :batch_size """) new_data = pd.read_sql(query, engine, params={"last_id": last_analyzed_id, "batch_size": batch_size}) logging.info(f"Dati estratti: {len(new_data)} record.") return new_data except Exception as e: logging.error(f"Errore durante l'esecuzione della query SQL: {e}") return pd.DataFrame() def prepare_data(new_data, preprocessor=None): """ Prepara i dati per la predizione """ try: # Minimo preprocessing per i dati if 'Data' in new_data.columns and 'Ora' in new_data.columns: new_data['Data'] = pd.to_datetime(new_data['Data'], errors='coerce') new_data['Ora'] = pd.to_timedelta(new_data['Ora'].astype(str), errors='coerce') new_data.dropna(subset=['Data', 'Ora'], inplace=True) new_data['Timestamp'] = new_data['Data'] + new_data['Ora'] # Se preprocessor è None, creiamo una matrice di features semplificata if preprocessor is None: from category_encoders import HashingEncoder from sklearn.feature_extraction.text import TfidfVectorizer # Encoder per Host e IndirizzoIP he_host = HashingEncoder(n_components=8, hash_method='md5') X_host = he_host.fit_transform(new_data['Host'].astype(str)) he_ip = HashingEncoder(n_components=8, hash_method='md5') X_ip = he_ip.fit_transform(new_data['IndirizzoIP'].astype(str)) # Unione dei messaggi new_data['Messaggio'] = new_data[['Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4']].fillna('').agg(' '.join, axis=1) # TF-IDF per i messaggi vectorizer = TfidfVectorizer(max_features=100) X_messages = vectorizer.fit_transform(new_data['Messaggio']) # Combinazione delle features from scipy.sparse import hstack X = hstack([X_host, X_ip, X_messages]).tocsr() return X else: # Usa il preprocessor fornito return preprocessor.transform(new_data) except Exception as e: logging.error(f"Errore nella preparazione dei dati: {e}") return None def predict_anomalies(model, features): """ Predice le anomalie usando il modello fornito """ try: if features is None: logging.error("Impossibile predire anomalie: features è None") return [] predictions = model.predict(features) return predictions except Exception as e: logging.error(f"Errore nella predizione delle anomalie: {e}") return [] def get_details(engine, ids): """ Ottieni i dettagli completi per gli ID specificati """ try: if not ids: return pd.DataFrame() id_list = ','.join(map(str, ids)) query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4 FROM Fibra WHERE ID IN ({}) """.format(id_list)) details = pd.read_sql(query, engine) # Converti timestamp if 'Data' in details.columns and 'Ora' in details.columns: details['Data'] = pd.to_datetime(details['Data'], errors='coerce') details['Ora'] = pd.to_timedelta(details['Ora'].astype(str), errors='coerce') details['Timestamp'] = details['Data'] + details['Ora'] return details except Exception as e: logging.error(f"Errore nell'ottenere i dettagli: {e}") return pd.DataFrame() def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2): """ Crea una connessione al database con tentativi multipli """ from sqlalchemy import create_engine, event from sqlalchemy.pool import QueuePool for attempt in range(max_retries): try: # Configurazione ottimizzata per SQLAlchemy engine = create_engine( conn_string, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True, pool_timeout=30, echo=False, # Assicuriamo che le transazioni siano esplicite isolation_level="READ COMMITTED" ) # Test di connessione with engine.connect() as conn: conn.execute(text("SELECT 1")).fetchone() logging.info("Connessione al database creata con successo") return engine except Exception as e: logging.error(f"Tentativo {attempt+1} fallito: {e}") if attempt < max_retries - 1: logging.info(f"Nuovo tentativo tra {retry_delay} secondi...") time.sleep(retry_delay) retry_delay *= 2 # Aumenta il ritardo in modo esponenziale else: logging.error("Impossibile connettersi al database dopo tutti i tentativi") raise def main(): # Test connessione database if not test_database_connection(): logging.error("Impossibile connettersi al database. Verificare le credenziali e la disponibilità del server.") return try: # Connessione al database logging.info("Connessione al database...") engine = create_engine_with_retry(CONN_STRING) # Caricamento del modello model, preprocessor = load_models() # Caricamento della whitelist whitelist = load_whitelist(WHITELIST_PATH) logging.info(f"Whitelisted {len(whitelist)} IP o network.") # Caricamento dell'ultimo ID analizzato last_analyzed_id = load_last_analyzed_id() logging.info(f"Last analyzed ID: {last_analyzed_id}") # Caricamento degli IP blacklistati blocked_ips = load_ip_block(ip_block_file) logging.info(f"Caricati {len(blocked_ips)} IP bloccati dal file di blocco.") # Estrazione dei dati logging.info(f"Estrazione dei nuovi dati a partire da ID > {last_analyzed_id}...") try: new_data = extract_data(engine, last_analyzed_id) if new_data.empty: logging.info("Nessun nuovo dato da analizzare.") return # Preparazione dei dati logging.info("Preparazione dei dati...") features = prepare_data(new_data, preprocessor) # Predizione di anomalie logging.info("Predizione di anomalie...") predictions = predict_anomalies(model, features) # Applica le predizioni new_data['anomaly'] = predictions new_data['anomaly_score'] = 0.0 # Default # Verifica quali sono le anomalie anomalies = new_data[new_data['anomaly'] == -1].copy() # Stampa il numero di anomalie e alcuni esempi logging.info(f"Rilevate {len(anomalies)} anomalie su {len(new_data)} eventi.") if not anomalies.empty: # Calcola lo score di anomalia (può essere omesso se non disponibile) if hasattr(model, 'decision_function'): anomaly_scores = model.decision_function(features) anomaly_scores = (anomaly_scores - anomaly_scores.min()) / (anomaly_scores.max() - anomaly_scores.min()) new_data['anomaly_score'] = anomaly_scores # Classifica il rischio new_data['risk_level'] = new_data['anomaly_score'].apply(classify_risk) # Conta i livelli di rischio risk_distribution = new_data['risk_level'].value_counts().to_dict() logging.info(f"Distribuzione livelli di rischio: {risk_distribution}") # Estrae i dettagli anomalies_details = get_details(engine, anomalies['ID'].tolist()) if 'Timestamp' not in anomalies.columns and 'Timestamp' in anomalies_details.columns: anomalies['Timestamp'] = anomalies_details['Timestamp'] logging.info(f"Dettaglio anomalie:\n{anomalies.head(10)}") # Gestione delle anomalie anomalies = new_data[new_data['anomaly'] == -1].copy() if not anomalies.empty: logging.info(f"Anomalie rilevate nel blocco corrente: {len(anomalies)}") # Stampa le colonne disponibili per debug logging.info(f"Colonne disponibili: {anomalies.columns.tolist()}") # Merge con i dettagli anomalies['ID'] = anomalies['ID'].astype(int) anomalies_details['ID'] = anomalies_details['ID'].astype(int) anomalies = anomalies.merge(anomalies_details, on='ID', how='left', suffixes=('', '_details')) if 'Timestamp_details' in anomalies.columns: anomalies['Timestamp'] = anomalies['Timestamp_details'] anomalies.drop(columns=['Timestamp_details'], inplace=True) # Estrae la porta da 'Messaggio2' se esiste if 'Messaggio2' in anomalies.columns: try: # Estrazione più robusta di IP e porta anomalies['source_port'] = None anomalies['dest_port'] = None # Crea una funzione per estrarre le porte def extract_ports(row): if pd.isna(row['Messaggio2']): return None, None try: parts = row['Messaggio2'].split(':') logging.debug(f"Messaggio2: {row['Messaggio2']}, parti: {parts}") if len(parts) >= 3: # Formato IP:PORTA:IP:PORTA source_port = parts[1].split(' ')[-1] dest_port = parts[-1] return source_port, dest_port except Exception as e: logging.error(f"Errore nell'estrazione porte: {e} per messaggio: {row['Messaggio2']}") return None, None # Applica la funzione ports_data = anomalies.apply(extract_ports, axis=1, result_type='expand') anomalies['source_port'] = ports_data[0] anomalies['dest_port'] = ports_data[1] # Debug delle porte estratte logging.debug(f"Estratto {len(anomalies[anomalies['dest_port'].notna()])} porte di destinazione") # Filtra le anomalie escludendo quelle con porta 443 anomalies_filtered = anomalies[anomalies['dest_port'] != '443'].copy() logging.info(f"Anomalie dopo esclusione porta 443: {len(anomalies_filtered)}") except Exception as e: logging.error(f"Errore nell'estrazione della porta: {e}") anomalies_filtered = anomalies.copy() else: anomalies_filtered = anomalies.copy() logging.warning("Colonna 'Messaggio2' non trovata, non posso filtrare per porta.") # Processa gli IP sospetti ip_column = 'IndirizzoIP' if 'IndirizzoIP' in anomalies_filtered.columns else 'ip_address' if ip_column not in anomalies_filtered.columns: logging.error(f"Colonna IP non trovata. Colonne disponibili: {anomalies_filtered.columns.tolist()}") else: # Debug delle colonne chiave prima del processing logging.debug(f"Sample IP: {anomalies_filtered[ip_column].iloc[0] if not anomalies_filtered.empty else 'N/A'}") logging.debug(f"Colonne utilizzate per il processing: {ip_column}, rischio, porta, messaggio") for idx, row in anomalies_filtered.iterrows(): ip = row[ip_column] risk_level = row['risk_level'] if 'risk_level' in row else 'NORMALE' port = row['dest_port'] if 'dest_port' in row and not pd.isna(row['dest_port']) else None message = None if 'Messaggio1' in row and not pd.isna(row['Messaggio1']): message = row['Messaggio1'] if 'Messaggio2' in row and not pd.isna(row['Messaggio2']): message += " " + row['Messaggio2'] # Verifica se è nella whitelist if not is_ip_whitelisted(ip, whitelist): # Gestisci l'anomalia (aggiorna known_attackers e, se necessario, ip_list) handle_anomaly(engine, ip, risk_level, port, message, list_name) if risk_level in ['ALTO', 'CRITICO']: logging.warning(f"ALLARME: IP ad alto rischio {ip} (Livello: {risk_level})") # Salva l'ultimo ID analizzato save_last_analyzed_id(new_data['ID'].max()) except Exception as e: logging.error(f"Errore durante l'esecuzione: {e}") import traceback logging.error(traceback.format_exc()) except Exception as e: logging.error(f"Errore generale: {e}") import traceback logging.error(traceback.format_exc()) # Esecuzione del main se lo script è eseguito direttamente if __name__ == "__main__": logging.info("Avvio dell'analisi DDoS Detection...") # Opzione per test manuale delle funzioni principali if len(sys.argv) > 1 and sys.argv[1] == "--test": logging.info("MODALITÀ TEST: verifica delle funzioni principali") try: engine = create_engine_with_retry(CONN_STRING) test_ip = "192.168.1.1" logging.info(f"Test 1: Verifica tabella known_attackers") is_known = is_known_attacker(engine, test_ip) logging.info(f"IP {test_ip} è un attaccante noto: {is_known}") logging.info(f"Test 2: Aggiornamento known_attacker") new_risk = update_known_attacker(engine, test_ip, "NORMALE", "80", "Test message") logging.info(f"Nuovo livello di rischio: {new_risk}") logging.info(f"Test 3: Verifica se ora è un attaccante noto") is_known = is_known_attacker(engine, test_ip) logging.info(f"IP {test_ip} è ora un attaccante noto: {is_known}") logging.info("MODALITÀ TEST completata") sys.exit(0) except Exception as e: logging.error(f"Errore nei test: {e}") sys.exit(1) main()