#!/usr/bin/env python3 import pandas as pd from sqlalchemy import create_engine from sqlalchemy.sql import text from joblib import load import logging import gc import os import time import sys from collections import defaultdict from datetime import datetime, timedelta, timezone import ipaddress import numpy as np from sklearn.ensemble import IsolationForest import threading import argparse import signal import multiprocessing from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor from category_encoders import HashingEncoder from sklearn.feature_extraction.text import TfidfVectorizer import psutil import warnings warnings.filterwarnings('ignore') # Configurazione del logging ottimizzata logging.basicConfig( level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('ddetect_debug.log') ] ) # Configurazione del database DB_USER = os.environ.get('DB_USER', 'root') DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-') DB_HOST = os.environ.get('DB_HOST', 'localhost') DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK') CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}' # Percorsi dei file MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models') os.makedirs(MODEL_DIR, exist_ok=True) MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib') PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib') WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt') LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt') # Definizione dei livelli di rischio e soglie RISK_LEVELS = { 'NORMALE': 0.1, 'BASSO': 0.3, 'MEDIO': 0.6, 'ALTO': 0.8, 'CRITICO': 0.95 } # Ottimizzazioni per grandi volumi di dati CHUNK_SIZE = 5000 # Dimensione ottimale per chunk di dati MAX_MEMORY_USAGE = 80 # Percentuale massima di memoria utilizzabile CACHE_SIZE = 10000 # Dimensione cache per IP whitelistati # Cache globale per ottimizzazioni ip_whitelist_cache = {} model_cache = None preprocessor_cache = None class Colors: HEADER = '\033[95m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' def log_phase(message): """Evidenzia una nuova fase principale dell'esecuzione""" print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n") logging.info(f"FASE: {message}") def log_result(message): """Evidenzia un risultato importante""" print(f"{Colors.BLUE}✓ {message}{Colors.END}") logging.info(f"RISULTATO: {message}") def log_warning(message): """Evidenzia un avviso importante""" print(f"{Colors.YELLOW}⚠ {message}{Colors.END}") logging.warning(message) def log_error(message): """Evidenzia un errore importante""" print(f"{Colors.RED}✗ {message}{Colors.END}") logging.error(message) # Variabili globali per il tracciamento dell'avanzamento progress_counters = { 'ip_whitelisted': 0, 'ip_analyzed': 0, 'ip_normal': 0, 'ip_low': 0, 'ip_medium': 0, 'ip_high': 0, 'ip_critical': 0, 'metrics_processed': 0, 'last_update': 0, 'in_progress': False, 'operation': '', 'start_time': None } def check_memory_usage(): """Controlla l'utilizzo della memoria e forza garbage collection se necessario""" memory_percent = psutil.virtual_memory().percent if memory_percent > MAX_MEMORY_USAGE: logging.warning(f"Utilizzo memoria alto: {memory_percent}%. Forzando garbage collection...") gc.collect() return True return False def reset_counters(): """Resetta i contatori per una nuova esecuzione""" global progress_counters progress_counters.update({ 'ip_whitelisted': 0, 'ip_analyzed': 0, 'ip_normal': 0, 'ip_low': 0, 'ip_medium': 0, 'ip_high': 0, 'ip_critical': 0, 'metrics_processed': 0, 'last_update': 0, 'in_progress': False, 'operation': '', 'start_time': None }) def start_progress_tracking(operation): """Inizia il tracciamento dell'operazione""" global progress_counters reset_counters() progress_counters['in_progress'] = True progress_counters['operation'] = operation progress_counters['start_time'] = time.time() threading.Thread(target=progress_reporter, daemon=True).start() logging.info(f"Avvio monitoraggio operazione: {operation}") def update_counter(counter_name, increment=1): """Aggiorna un contatore specifico""" global progress_counters if counter_name in progress_counters: progress_counters[counter_name] += increment def end_progress_tracking(): """Termina il tracciamento e mostra il report finale""" global progress_counters if not progress_counters['in_progress']: return progress_counters['in_progress'] = False report_progress(force=True) logging.info(f"Monitoraggio completato per: {progress_counters['operation']}") def report_progress(force=False): """Riporta lo stato attuale dei contatori""" global progress_counters if not progress_counters['in_progress'] and not force: return current_time = time.time() if not force and (current_time - progress_counters['last_update']) < 10: return elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0 report = f""" {Colors.BOLD}======== REPORT DI PROGRESSO - {progress_counters['operation']} ========{Colors.END} Tempo trascorso: {elapsed:.1f} secondi IP Whitelistati esclusi: {progress_counters['ip_whitelisted']} Metriche elaborate: {progress_counters['metrics_processed']} IP Analizzati: {progress_counters['ip_analyzed']} Classificazione rischio: - IP NORMALI: {progress_counters['ip_normal']} - IP BASSI: {progress_counters['ip_low']} - IP MEDI: {progress_counters['ip_medium']} - IP ALTI: {progress_counters['ip_high']} - IP CRITICI: {progress_counters['ip_critical']} Memoria utilizzata: {psutil.virtual_memory().percent:.1f}% {Colors.BOLD}================================================================{Colors.END} """ print(report) logging.info(report.replace(Colors.BOLD, '').replace(Colors.END, '')) progress_counters['last_update'] = current_time def progress_reporter(): """Thread che riporta periodicamente i progressi""" while progress_counters['in_progress']: report_progress() time.sleep(2) def test_database_connection(): """Test di connessione al database""" try: logging.debug("Tentativo di connessione al database...") engine = create_engine(CONN_STRING) with engine.connect() as conn: result = conn.execute(text("SELECT 1")).fetchone() if result[0] == 1: logging.debug("Test connessione al database riuscito!") return True return False except Exception as e: logging.error(f"Errore nel test di connessione al database: {e}") return False def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2): """Crea una connessione al database con tentativi multipli e ottimizzazioni""" for attempt in range(max_retries): try: # Configurazione ottimizzata per grandi volumi di dati engine = create_engine( conn_string, pool_size=10, # Aumentato per parallelismo max_overflow=20, # Aumentato per picchi di carico pool_recycle=1800, # Ridotto per evitare timeout pool_pre_ping=True, pool_timeout=60, # Aumentato per operazioni lunghe echo=False, isolation_level="READ COMMITTED", connect_args={ 'charset': 'utf8mb4', 'use_unicode': True, 'autocommit': True, 'sql_mode': 'TRADITIONAL' } ) with engine.connect() as conn: conn.execute(text("SELECT 1")).fetchone() logging.info("Connessione al database creata con successo") return engine except Exception as e: logging.error(f"Tentativo {attempt+1} fallito: {e}") if attempt < max_retries - 1: logging.info(f"Nuovo tentativo tra {retry_delay} secondi...") time.sleep(retry_delay) retry_delay *= 2 else: logging.error("Impossibile connettersi al database dopo tutti i tentativi") raise def load_models(): """Carica i modelli di rilevamento delle anomalie addestrati con cache""" global model_cache, preprocessor_cache # Usa cache se disponibile if model_cache is not None and preprocessor_cache is not None: return model_cache, preprocessor_cache try: # Carica il modello logging.info(f"Caricamento modello da {MODEL_PATH}...") if os.path.exists(MODEL_PATH): model_cache = load(MODEL_PATH) logging.debug("Modello caricato con successo!") else: logging.error(f"File modello non trovato: {MODEL_PATH}") return None, None # Carica il preprocessor try: logging.info(f"Caricamento preprocessor da {PREPROCESSOR_PATH}...") if os.path.exists(PREPROCESSOR_PATH): preprocessor_cache = load(PREPROCESSOR_PATH) if isinstance(preprocessor_cache, dict) and 'feature_columns' in preprocessor_cache: feature_count = len(preprocessor_cache['feature_columns']) if feature_count < 125: logging.warning(f"Il modello si aspetta 125 feature, " f"ma il preprocessor ne ha {feature_count}") return model_cache, preprocessor_cache else: logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.") else: logging.error(f"File preprocessor non trovato: {PREPROCESSOR_PATH}") except Exception as e: logging.error(f"Errore nel caricamento del preprocessor: {e}") # Crea un preprocessor di fallback preprocessor_cache = {'feature_columns': [f'feature_{i}' for i in range(125)]} return model_cache, preprocessor_cache except Exception as e: logging.error(f"Errore nel caricamento dei modelli: {e}") return None, None def load_whitelist(whitelist_path=None): """Carica la whitelist da file con ottimizzazioni""" if whitelist_path is None: whitelist_path = '/root/whitelist.txt' try: if not os.path.exists(whitelist_path): logging.warning(f"File whitelist non trovato: {whitelist_path}") return {'exact_ips': set(), 'networks': []} with open(whitelist_path, 'r') as f: whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')] exact_ips = set() networks = [] for entry in whitelist_entries: try: if '/' in entry: network = ipaddress.ip_network(entry, strict=False) networks.append(network) else: exact_ips.add(entry) except Exception as e: logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}") whitelist = { 'exact_ips': exact_ips, 'networks': networks } logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.") return whitelist except Exception as e: logging.error(f"Errore nel caricamento della whitelist: {e}") return {'exact_ips': set(), 'networks': []} def is_ip_whitelisted(ip, whitelist): """Verifica se un IP è nella whitelist con cache ottimizzata""" global ip_whitelist_cache if pd.isna(ip) or not ip: return False # Controlla cache if ip in ip_whitelist_cache: if ip_whitelist_cache[ip]: update_counter('ip_whitelisted') return ip_whitelist_cache[ip] try: # Verifica diretta negli IP esatti if ip in whitelist.get('exact_ips', set()): ip_whitelist_cache[ip] = True # Limita dimensione cache if len(ip_whitelist_cache) > CACHE_SIZE: # Rimuovi il 20% più vecchio items_to_remove = list(ip_whitelist_cache.keys())[:CACHE_SIZE//5] for key in items_to_remove: del ip_whitelist_cache[key] update_counter('ip_whitelisted') return True # Verifica nelle reti (limitata per performance) try: ip_obj = ipaddress.ip_address(ip) except ValueError: ip_whitelist_cache[ip] = False return False # Limita verifica reti per performance for i, network in enumerate(whitelist.get('networks', [])): if i >= 1000: # Limite per evitare blocchi break if ip_obj in network: ip_whitelist_cache[ip] = True update_counter('ip_whitelisted') return True ip_whitelist_cache[ip] = False return False except Exception as e: logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}") ip_whitelist_cache[ip] = False return False def load_last_analyzed_id(): """Carica l'ultimo ID analizzato dal file""" try: if os.path.exists(LAST_ID_PATH): with open(LAST_ID_PATH, 'r') as f: last_id = int(f.read().strip()) return last_id else: logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.") return 0 except Exception as e: logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}") return 0 def save_last_analyzed_id(last_id): """Salva l'ultimo ID analizzato nel file""" try: with open(LAST_ID_PATH, 'w') as f: f.write(str(last_id)) logging.info(f"Ultimo ID analizzato salvato: {last_id}") except Exception as e: logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}") def extract_data_optimized(engine, last_id=0, batch_size=1000, max_id=None): """Estrazione dati ottimizzata per grandi volumi""" try: if max_id: logging.info(f"Limitazione estrazione fino a ID {max_id}") # Query di conteggio ottimizzata if max_id: query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id} AND ID <= {max_id}" else: query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id}" with engine.connect() as conn: count_result = conn.execute(text(query_count)).scalar() if count_result == 0: logging.info("Nessun nuovo record da estrarre") return pd.DataFrame() logging.info(f"Trovati {count_result} nuovi record da estrarre") # Ottimizzazione per grandi dataset effective_batch_size = min(batch_size, CHUNK_SIZE) if count_result > 100000: logging.warning(f"Dataset molto grande ({count_result}). Limitando a ultimi 50k record.") with engine.connect() as conn: latest_id_query = "SELECT MAX(ID) FROM Esterna" latest_id = conn.execute(text(latest_id_query)).scalar() if latest_id: max_id = latest_id last_id = max(last_id, latest_id - 50000) count_result = min(count_result, 50000) # Estrazione ottimizzata con chunking frames = [] current_id = last_id # Colonne essenziali per ridurre memoria essential_columns = ['ID', 'Data', 'Ora', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3'] columns_str = ', '.join(essential_columns) num_batches = (count_result + effective_batch_size - 1) // effective_batch_size for i in range(num_batches): # Controllo memoria if check_memory_usage(): logging.warning("Memoria alta, riducendo batch size") effective_batch_size = max(100, effective_batch_size // 2) with engine.connect() as conn: if max_id: query = f""" SELECT {columns_str} FROM Esterna WHERE ID > {current_id} AND ID <= {max_id} ORDER BY ID ASC LIMIT {effective_batch_size} """ else: query = f""" SELECT {columns_str} FROM Esterna WHERE ID > {current_id} ORDER BY ID ASC LIMIT {effective_batch_size} """ result = conn.execute(text(query)) chunk = pd.DataFrame(result.fetchall(), columns=result.keys()) if chunk.empty: break current_id = chunk['ID'].max() # Ottimizzazione timestamp if 'Data' in chunk.columns and 'Ora' in chunk.columns: try: chunk['Data'] = pd.to_datetime(chunk['Data'], errors='coerce') chunk['Ora'] = pd.to_timedelta(chunk['Ora'].astype(str), errors='coerce') chunk['Timestamp'] = chunk['Data'] + chunk['Ora'] except Exception as e: logging.warning(f"Impossibile creare colonna Timestamp: {e}") frames.append(chunk) logging.info(f"Estratti {len(chunk)} record, batch {i+1}/{num_batches}") if not frames: return pd.DataFrame() result = pd.concat(frames, ignore_index=True) logging.info(f"Estrazione completata: {len(result)} record totali") return result except Exception as e: logging.error(f"Errore nell'estrazione dei dati: {e}") return pd.DataFrame() def prepare_data_optimized(df, preprocessor): """Preparazione dati ottimizzata per grandi volumi""" try: if df.empty: return None # Crea copia ottimizzata df = df.copy() # Numero atteso di feature expected_features = 125 feature_data = {} feature_count = 0 # 1. Feature temporali essenziali (9 feature) - versione semplificata time_features = [ 'time_since_last', 'events_last_hour', 'events_last_day', 'time_since_last_mean', 'time_since_last_std', 'time_since_last_min', 'time_since_last_max', 'events_last_hour_max', 'events_last_day_max' ] for feat in time_features: feature_data[feat] = np.zeros(len(df)) feature_count += 1 # 2. Feature TF-IDF semplificate (21 feature) if 'Messaggio1' in df.columns: try: # Versione semplificata per performance proto_data = df['Messaggio1'].fillna('').astype(str) # Usa solo i protocolli più comuni per velocità common_protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'HTTPS', 'SSH', 'FTP'] for i, protocol in enumerate(common_protocols[:21]): feature_data[f'protocol_tfidf_{i}'] = proto_data.str.contains(protocol, case=False).astype(int) feature_count += 1 # Riempi rimanenti for i in range(len(common_protocols), 21): feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df)) feature_count += 1 except Exception as e: logging.error(f"Errore nell'estrazione TF-IDF: {e}") for i in range(21): feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df)) feature_count += 1 else: for i in range(21): feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df)) feature_count += 1 # 3. Feature Host semplificate (2 feature) if 'Host' in df.columns: feature_data['host_FIBRA'] = df['Host'].fillna('').str.contains('FIBRA', case=False).astype(int) feature_data['host_nan'] = df['Host'].isna().astype(int) else: feature_data['host_FIBRA'] = np.zeros(len(df)) feature_data['host_nan'] = np.zeros(len(df)) feature_count += 2 # 4. Encoding IP ottimizzato (15 feature) if 'IP_Attaccante' not in df.columns and 'Messaggio2' in df.columns: df['IP_Attaccante'] = df['Messaggio2'].str.split(':').str[0] try: # Versione semplificata dell'encoding per performance if 'IP_Attaccante' in df.columns: ip_data = df['IP_Attaccante'].fillna('unknown').astype(str) # Hash semplice per IP for i in range(15): feature_data[f'col_{i}'] = pd.util.hash_array(ip_data.values) % (2**16) / (2**16) feature_count += 1 else: for i in range(15): feature_data[f'col_{i}'] = np.zeros(len(df)) feature_count += 1 except Exception as e: logging.error(f"Errore nell'encoding delle colonne: {e}") for i in range(15): feature_data[f'col_{i}'] = np.zeros(len(df)) feature_count += 1 # 5. Feature aggiuntive (36 feature) for i in range(15): feature_data[f'additional_col_{i}'] = np.zeros(len(df)) feature_count += 1 for i in range(21): feature_data[f'additional_tfidf_{i}'] = np.zeros(len(df)) feature_count += 1 # 6. Riempi fino a 125 remaining = expected_features - feature_count if remaining > 0: for i in range(remaining): feature_data[f'extra_col_{i}'] = np.zeros(len(df)) feature_count += 1 # Crea array numpy direttamente per efficienza X = np.column_stack([feature_data[col] for col in sorted(feature_data.keys())]) logging.debug(f"Generate {feature_count} feature ottimizzate") return X except Exception as e: logging.error(f"Errore nella preparazione dei dati: {e}") return None def predict_anomalies_optimized(model, features, sensitivity=5): """Predizione anomalie ottimizzata""" try: if features.shape[1] != 125: logging.error(f"Dimensione feature errata: trovate {features.shape[1]}, attese 125") return np.zeros(features.shape[0]) update_counter('metrics_processed', features.shape[0]) if hasattr(model, 'predict'): with warnings.catch_warnings(): warnings.filterwarnings("ignore") # Predizione ottimizzata predictions = model.predict(features) # Applica sensibilità se supportata if hasattr(model, 'decision_function'): try: decision_scores = model.decision_function(features) threshold_multiplier = sensitivity / 5.0 custom_threshold = -0.2 * threshold_multiplier predictions = np.where(decision_scores < custom_threshold, -1, 1) num_anomalies = np.sum(predictions == -1) logging.debug(f"Trovate {num_anomalies} anomalie con sensibilità {sensitivity}") except Exception as e: logging.warning(f"Errore con decision_function: {e}") return predictions else: logging.error("Modello non ha il metodo predict") return np.zeros(features.shape[0]) except Exception as e: logging.error(f"Errore nella predizione: {e}") return np.zeros(features.shape[0]) def process_batch_optimized(batch_data, engine, model, preprocessor, whitelist, sensitivity=5): """Processamento batch ottimizzato""" try: # Prepara i dati X = prepare_data_optimized(batch_data, preprocessor) if X is None or X.shape[0] == 0: return set(), 0 # Predizione predictions = predict_anomalies_optimized(model, X, sensitivity) # Trova anomalie anomaly_indices = np.where(predictions == -1)[0] anomaly_count = len(anomaly_indices) if anomaly_count == 0: return set(), 0 # Estrai IP anomali processed_ips = set() # Estrai IP attaccanti se necessario if 'IP_Attaccante' not in batch_data.columns and 'Messaggio2' in batch_data.columns: batch_data['IP_Attaccante'] = batch_data['Messaggio2'].str.split(':').str[0] for idx in anomaly_indices: if idx < len(batch_data): row = batch_data.iloc[idx] ip = row.get('IP_Attaccante') if pd.notna(ip) and not is_ip_whitelisted(ip, whitelist): processed_ips.add(ip) # Gestione anomalia semplificata try: handle_anomaly_optimized(engine, ip, 'MEDIO', None, 'Anomalia rilevata', 'ddos_ia') except Exception as e: logging.warning(f"Errore gestione anomalia per IP {ip}: {e}") return processed_ips, anomaly_count except Exception as e: logging.error(f"Errore nell'elaborazione del batch: {e}") return set(), 0 def handle_anomaly_optimized(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'): """Gestione anomalia ottimizzata""" try: if not ip_address or pd.isna(ip_address): return False # Inserimento semplificato per performance with engine.connect() as conn: insert_query = text(""" INSERT INTO Fibra (IndirizzoIP, Data, Ora, Attivo, Lista, NumeroAttacchi, LivelloDiRischio) VALUES (:ip, CURDATE(), CURTIME(), 1, :lista, 1, 2) ON DUPLICATE KEY UPDATE NumeroAttacchi = NumeroAttacchi + 1, Data = CURDATE(), Ora = CURTIME() """) conn.execute(insert_query, {"ip": ip_address, "lista": list_name}) # Aggiorna contatori update_counter('ip_medium') return True except Exception as e: logging.error(f"Errore nella gestione dell'anomalia per IP {ip_address}: {e}") return False def main(): """Funzione principale ottimizzata""" parser = argparse.ArgumentParser(description='Rilevamento DDoS ottimizzato per grandi volumi') parser.add_argument('--debug', action='store_true', help='Abilita logging di debug') parser.add_argument('--batch-size', type=int, default=5000, help='Dimensione batch ottimizzata') parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso whitelist') parser.add_argument('--ciclo', action='store_true', help='Esecuzione in ciclo') parser.add_argument('--pausa', type=int, default=60, help='Pausa tra cicli') parser.add_argument('--parallel', action='store_true', help='Elaborazione parallela') parser.add_argument('--workers', type=int, default=min(4, multiprocessing.cpu_count()), help='Numero worker') parser.add_argument('--max-id', type=int, default=None, help='ID massimo') parser.add_argument('--skip-old', action='store_true', help='Salta record vecchi') parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11), help='Sensibilità 1-10') args = parser.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) log_phase("Avvio sistema di rilevamento DDoS ottimizzato") # Test connessione if not test_database_connection(): log_error("Impossibile connettersi al database") sys.exit(1) log_result("Connessione database stabilita") try: engine = create_engine_with_retry(CONN_STRING) # Carica modelli log_phase("Caricamento modelli") model, preprocessor = load_models() if model is None: log_error("Impossibile caricare il modello") sys.exit(1) log_result("Modelli caricati con successo") # Carica whitelist whitelist = load_whitelist(args.whitelist) def esegui_analisi_ottimizzata(): start_progress_tracking("rilevamento DDoS ottimizzato") try: last_id = load_last_analyzed_id() # Estrazione dati ottimizzata log_phase("Estrazione dati ottimizzata") new_data = extract_data_optimized(engine, last_id, args.batch_size, args.max_id) if new_data.empty: log_result("Nessun nuovo dato da analizzare") end_progress_tracking() return True total_records = len(new_data) last_analyzed_id = new_data['ID'].max() log_result(f"Estratti {total_records} record") # Elaborazione ottimizzata if args.parallel and total_records > 1000: log_phase(f"Elaborazione parallela con {args.workers} worker") # Dividi in batch ottimizzati batch_size = min(CHUNK_SIZE, max(500, total_records // args.workers)) batches = [new_data[i:i+batch_size].copy() for i in range(0, total_records, batch_size)] all_processed_ips = set() total_anomalies = 0 with ThreadPoolExecutor(max_workers=args.workers) as executor: futures = [ executor.submit(process_batch_optimized, batch, engine, model, preprocessor, whitelist, args.sensibility) for batch in batches ] for future in as_completed(futures): try: processed_ips, anomaly_count = future.result() all_processed_ips.update(processed_ips) total_anomalies += anomaly_count except Exception as e: log_error(f"Errore nell'elaborazione batch: {e}") log_result(f"Elaborazione completata: {len(all_processed_ips)} IP anomali, {total_anomalies} anomalie totali") else: # Elaborazione sequenziale ottimizzata log_phase("Elaborazione sequenziale ottimizzata") processed_ips, anomaly_count = process_batch_optimized(new_data, engine, model, preprocessor, whitelist, args.sensibility) log_result(f"Elaborate {len(processed_ips)} IP anomali, {anomaly_count} anomalie") # Salva ultimo ID save_last_analyzed_id(last_analyzed_id) log_phase("Analisi completata") end_progress_tracking() # Garbage collection del new_data gc.collect() return True except Exception as e: log_error(f"Errore durante l'analisi: {e}") end_progress_tracking() return False # Esecuzione if args.ciclo: log_phase("Modalità ciclo infinito") ciclo_count = 0 def handle_interrupt(signum, frame): log_warning("Interruzione ricevuta. Chiusura...") sys.exit(0) signal.signal(signal.SIGINT, handle_interrupt) while True: ciclo_count += 1 log_result(f"Inizio ciclo {ciclo_count}") success = esegui_analisi_ottimizzata() if success: log_result(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...") time.sleep(args.pausa) else: log_error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi...") time.sleep(args.pausa * 2) else: esegui_analisi_ottimizzata() except Exception as e: log_error(f"Errore generale: {e}") sys.exit(1) if __name__ == "__main__": main()