#!/usr/bin/env python3 import pandas as pd from sqlalchemy import create_engine from sqlalchemy.sql import text from sklearn.ensemble import IsolationForest from sklearn.neighbors import LocalOutlierFactor from sklearn.svm import OneClassSVM from sklearn.feature_extraction.text import TfidfVectorizer from joblib import dump, load import logging import gc import os import time from datetime import datetime, timedelta import numpy as np import argparse import sys import traceback import threading import psutil import warnings import signal from concurrent.futures import ThreadPoolExecutor, TimeoutError import multiprocessing warnings.filterwarnings('ignore') # Configurazione del logging ultra-ottimizzata logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('analisys_debug.log') ] ) # Configurazione del database DB_USER = os.environ.get('DB_USER', 'root') DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-') DB_HOST = os.environ.get('DB_HOST', 'localhost') DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK') # Cartella per i modelli MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models') try: os.makedirs(MODEL_DIR, exist_ok=True) except Exception as e: logging.error(f"Errore nella creazione della directory models: {e}") MODEL_DIR = '.' # Percorsi dei modelli IF_MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib') LOF_MODEL_PATH = os.path.join(MODEL_DIR, 'lof.joblib') SVM_MODEL_PATH = os.path.join(MODEL_DIR, 'svm.joblib') ENSEMBLE_MODEL_PATH = os.path.join(MODEL_DIR, 'ensemble_weights.joblib') PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib') # Parametri ultra-ottimizzati TRAINING_FREQUENCY_HOURS = 12 MAX_MEMORY_USAGE = 80 CHUNK_SIZE = 5000 MAX_TRAINING_SAMPLES = 100000 # Ridotto per velocità MIN_TRAINING_SAMPLES = 500 QUERY_TIMEOUT = 300 # 5 minuti timeout per query CONNECTION_TIMEOUT = 30 # 30 secondi timeout connessione # Colori per output class Colors: HEADER = '\033[95m' BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' END = '\033[0m' def log_phase(message): print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n") logging.info(f"FASE: {message}") def log_result(message): print(f"{Colors.BLUE}✓ {message}{Colors.END}") logging.info(f"RISULTATO: {message}") def log_warning(message): print(f"{Colors.YELLOW}⚠ {message}{Colors.END}") logging.warning(message) def log_error(message): print(f"{Colors.RED}✗ {message}{Colors.END}") logging.error(message) # Variabili globali per timeout interrupted = False def signal_handler(signum, frame): global interrupted interrupted = True log_warning("Interruzione ricevuta, terminando...") sys.exit(1) signal.signal(signal.SIGINT, signal_handler) def check_memory_usage(): """Controlla l'utilizzo della memoria""" memory_percent = psutil.virtual_memory().percent if memory_percent > MAX_MEMORY_USAGE: log_warning(f"Memoria alta: {memory_percent}%. Forzando garbage collection...") gc.collect() return True return False def connect_to_database_ultra(): """Connessione database ultra-ottimizzata con timeout""" try: log_phase("Connessione database ultra-ottimizzata") connection_string = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}" # Configurazione ultra-ottimizzata engine = create_engine( connection_string, pool_size=3, # Ridotto per velocità max_overflow=5, pool_recycle=1800, pool_pre_ping=True, pool_timeout=CONNECTION_TIMEOUT, echo=False, connect_args={ 'charset': 'utf8mb4', 'use_unicode': True, 'autocommit': True, # True per velocità 'connection_timeout': CONNECTION_TIMEOUT, 'sql_mode': 'TRADITIONAL' } ) # Test connessione con timeout with engine.connect() as conn: result = conn.execute(text("SELECT 1")).fetchone() if result[0] == 1: log_result("Connessione database stabilita") return engine return None except Exception as e: log_error(f"Errore connessione database: {e}") return None def extract_data_ultra_fast(engine, window_hours=12, max_records=100000, batch_size=5000): """Estrazione dati ultra-veloce con timeout e query ottimizzate""" try: log_phase(f"Estrazione ultra-veloce - ultimi {window_hours} ore") # Query di conteggio veloce con LIMIT per evitare scan completo count_query = text(""" SELECT COUNT(*) FROM ( SELECT 1 FROM Esterna WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR) LIMIT :max_limit ) as subq """) total_count = 0 try: with engine.connect() as conn: # Timeout per la query result = conn.execute(count_query, { "window": window_hours, "max_limit": max_records * 2 }).fetchone() total_count = result[0] if result else 0 except Exception as e: log_error(f"Errore nel conteggio: {e}") return pd.DataFrame() if total_count == 0: log_warning("Nessun dato trovato") return pd.DataFrame() # Limita al massimo total_count = min(total_count, max_records) log_result(f"Trovati {total_count} record, estraendo max {max_records}") # Estrazione diretta con LIMIT per velocità massima if total_count <= max_records: log_result("Estrazione diretta con query singola") # Query ultra-ottimizzata - solo colonne essenziali query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3 FROM Esterna WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR) ORDER BY ID DESC LIMIT :max_records """) try: df = pd.read_sql(query, engine, params={ "window": window_hours, "max_records": max_records }) log_result(f"Estratti {len(df)} record in modalità diretta") return df except Exception as e: log_error(f"Errore nell'estrazione diretta: {e}") return pd.DataFrame() else: # Campionamento casuale per dataset grandi log_warning(f"Dataset grande, usando campionamento casuale") query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3 FROM Esterna WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR) AND RAND() < 0.5 ORDER BY RAND() LIMIT :max_records """) try: df = pd.read_sql(query, engine, params={ "window": window_hours, "max_records": max_records }) log_result(f"Estratti {len(df)} record con campionamento") return df except Exception as e: log_error(f"Errore nel campionamento: {e}") return pd.DataFrame() except Exception as e: log_error(f"Errore generale nell'estrazione: {e}") return pd.DataFrame() def prepare_data_ultra_fast(df): """Preparazione dati ultra-veloce""" try: log_phase("Preparazione dati ultra-veloce") if df.empty: log_error("DataFrame vuoto") return None, None # Feature engineering minimalista per massima velocità feature_data = {} # 1. Feature temporali base (9 feature) if 'Data' in df.columns and 'Ora' in df.columns: try: df['Data'] = pd.to_datetime(df['Data'], errors='coerce') df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce') df['Timestamp'] = df['Data'] + df['Ora'] feature_data['hour_of_day'] = df['Timestamp'].dt.hour.fillna(0) feature_data['day_of_week'] = df['Timestamp'].dt.dayofweek.fillna(0) except: feature_data['hour_of_day'] = np.zeros(len(df)) feature_data['day_of_week'] = np.zeros(len(df)) else: feature_data['hour_of_day'] = np.zeros(len(df)) feature_data['day_of_week'] = np.zeros(len(df)) # Aggiungi 7 feature temporali semplici for i in range(7): feature_data[f'time_feature_{i}'] = np.zeros(len(df)) # 2. Feature protocollo semplici (21 feature) if 'Messaggio1' in df.columns: proto_data = df['Messaggio1'].fillna('').astype(str) # Protocolli comuni protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'HTTPS', 'SSH', 'FTP', 'DNS', 'SMTP', 'POP3'] for i, protocol in enumerate(protocols[:10]): feature_data[f'protocol_{i}'] = proto_data.str.contains(protocol, case=False).astype(int) # Riempi rimanenti 11 feature for i in range(10, 21): feature_data[f'protocol_{i}'] = np.zeros(len(df)) else: for i in range(21): feature_data[f'protocol_{i}'] = np.zeros(len(df)) # 3. Feature Host semplici (2 feature) if 'Host' in df.columns: feature_data['host_fibra'] = df['Host'].fillna('').str.contains('FIBRA', case=False).astype(int) feature_data['host_empty'] = df['Host'].isna().astype(int) else: feature_data['host_fibra'] = np.zeros(len(df)) feature_data['host_empty'] = np.zeros(len(df)) # 4. Feature IP semplici (15 feature) if 'Messaggio2' in df.columns: ip_data = df['Messaggio2'].str.split(':').str[0].fillna('unknown') # Hash semplice per IP for i in range(15): feature_data[f'ip_hash_{i}'] = pd.util.hash_array(ip_data.values) % (2**(i+5)) / (2**(i+5)) else: for i in range(15): feature_data[f'ip_hash_{i}'] = np.zeros(len(df)) # 5. Riempi fino a 125 feature current_features = len(feature_data) remaining = 125 - current_features for i in range(remaining): feature_data[f'extra_{i}'] = np.zeros(len(df)) # Crea array numpy direttamente X = np.column_stack([feature_data[col] for col in sorted(feature_data.keys())]) # Preprocessor minimalista preprocessor = { 'feature_columns': sorted(feature_data.keys()), 'text_vectorizer': None, 'categorical_features': {} } # Salva preprocessor try: dump(preprocessor, PREPROCESSOR_PATH) log_result(f"Preprocessor salvato: {X.shape[1]} feature") except Exception as e: log_warning(f"Errore salvataggio preprocessor: {e}") log_result(f"Dati preparati: {X.shape[0]} esempi, {X.shape[1]} feature") return X, preprocessor except Exception as e: log_error(f"Errore nella preparazione dati: {e}") return None, None def train_models_ultra_fast(X): """Addestramento ultra-veloce""" try: log_phase("Addestramento modelli ultra-veloce") if X.shape[0] < MIN_TRAINING_SAMPLES: log_error(f"Troppo pochi campioni: {X.shape[0]} < {MIN_TRAINING_SAMPLES}") return None, None, None, None # Campionamento aggressivo per velocità if X.shape[0] > MAX_TRAINING_SAMPLES: log_warning(f"Campionamento da {X.shape[0]} a {MAX_TRAINING_SAMPLES} esempi") indices = np.random.choice(X.shape[0], MAX_TRAINING_SAMPLES, replace=False) X_train = X[indices] else: X_train = X log_result(f"Addestramento su {X_train.shape[0]} esempi") # 1. Isolation Forest ultra-veloce log_result("Addestramento Isolation Forest...") if_model = IsolationForest( n_estimators=50, # Ridotto drasticamente contamination=0.05, random_state=42, n_jobs=-1, max_samples=min(1000, X_train.shape[0]), # Limita campioni max_features=0.8 ) if_model.fit(X_train) log_result("✓ Isolation Forest completato") # 2. LOF ultra-veloce (solo su campione piccolo) log_result("Addestramento LOF...") lof_sample_size = min(10000, X_train.shape[0]) if X_train.shape[0] > lof_sample_size: lof_indices = np.random.choice(X_train.shape[0], lof_sample_size, replace=False) lof_sample = X_train[lof_indices] else: lof_sample = X_train lof_model = LocalOutlierFactor( n_neighbors=min(10, lof_sample.shape[0] // 20), contamination=0.05, novelty=True, n_jobs=-1 ) lof_model.fit(lof_sample) log_result("✓ LOF completato") # 3. SVM ultra-veloce (solo su campione molto piccolo) log_result("Addestramento SVM...") svm_sample_size = min(5000, X_train.shape[0]) if X_train.shape[0] > svm_sample_size: svm_indices = np.random.choice(X_train.shape[0], svm_sample_size, replace=False) svm_sample = X_train[svm_indices] else: svm_sample = X_train svm_model = OneClassSVM( kernel='rbf', gamma='scale', nu=0.05 ) svm_model.fit(svm_sample) log_result("✓ SVM completato") # 4. Salvataggio veloce log_result("Salvataggio modelli...") try: dump(if_model, IF_MODEL_PATH) dump(lof_model, LOF_MODEL_PATH) dump(svm_model, SVM_MODEL_PATH) ensemble_weights = { 'isolation_forest': 0.60, # Peso maggiore per IF 'lof': 0.25, 'svm': 0.15 } dump(ensemble_weights, ENSEMBLE_MODEL_PATH) log_result("✓ Tutti i modelli salvati") except Exception as e: log_error(f"Errore nel salvataggio: {e}") return None, None, None, None return if_model, lof_model, svm_model, ensemble_weights except Exception as e: log_error(f"Errore nell'addestramento: {e}") return None, None, None, None def save_model_timestamp(): """Salva timestamp addestramento""" try: engine = connect_to_database_ultra() if not engine: return False with engine.connect() as conn: create_table_query = text(""" CREATE TABLE IF NOT EXISTS model_metadata ( id INT AUTO_INCREMENT PRIMARY KEY, model_name VARCHAR(50) NOT NULL, last_trained TIMESTAMP DEFAULT CURRENT_TIMESTAMP, model_path VARCHAR(255), UNIQUE KEY unique_model (model_name) ) """) conn.execute(create_table_query) upsert_query = text(""" INSERT INTO model_metadata (model_name, last_trained, model_path) VALUES ('ensemble', NOW(), :model_path) ON DUPLICATE KEY UPDATE last_trained = NOW(), model_path = :model_path """) conn.execute(upsert_query, {"model_path": ENSEMBLE_MODEL_PATH}) log_result("Timestamp salvato") return True except Exception as e: log_warning(f"Errore salvataggio timestamp: {e}") return False def needs_training(force_training=False): """Verifica necessità addestramento""" if force_training: log_result("Riaddestramento forzato") return True try: engine = connect_to_database_ultra() if not engine: return True with engine.connect() as conn: query = text(""" SELECT last_trained FROM model_metadata WHERE model_name = 'ensemble' """) result = conn.execute(query).fetchone() if not result: log_result("Nessun addestramento precedente") return True last_trained = result[0] now = datetime.now() hours_diff = (now - last_trained).total_seconds() / 3600 if hours_diff >= TRAINING_FREQUENCY_HOURS: log_result(f"Ultimo addestramento: {hours_diff:.1f} ore fa - necessario") return True else: log_result(f"Ultimo addestramento: {hours_diff:.1f} ore fa - non necessario") return False except Exception as e: log_warning(f"Errore verifica: {e}") return True def test_database_connection(): """Test connessione veloce""" try: engine = connect_to_database_ultra() if not engine: return False with engine.connect() as conn: result = conn.execute(text("SELECT 1")).fetchone() if result and result[0] == 1: log_result("Test database superato") # Verifica tabella Esterna try: count = conn.execute(text("SELECT COUNT(*) FROM Esterna LIMIT 1")).fetchone()[0] log_result(f"Tabella Esterna accessibile") except: log_error("Tabella Esterna non accessibile") return False return True return False except Exception as e: log_error(f"Errore test: {e}") return False def main(): """Funzione principale ultra-ottimizzata""" parser = argparse.ArgumentParser(description='Addestramento ultra-veloce per grandi volumi') parser.add_argument('--force-training', action='store_true', help='Forza riaddestramento') parser.add_argument('--test', action='store_true', help='Modalità test') parser.add_argument('--time-window', type=int, default=12, help='Finestra temporale ore (default: 12)') parser.add_argument('--max-records', type=int, default=100000, help='Max record (default: 100k)') parser.add_argument('--batch-size', type=int, default=5000, help='Batch size (default: 5k)') parser.add_argument('--debug', action='store_true', help='Debug logging') args = parser.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) log_phase("Sistema addestramento ULTRA-VELOCE") log_result(f"Config: {args.time_window}h, max {args.max_records}, batch {args.batch_size}") start_time = time.time() # Test veloce if args.test: log_phase("Test veloce") if test_database_connection(): log_result("Test OK") sys.exit(0) else: log_error("Test FAILED") sys.exit(1) # Test connessione if not test_database_connection(): log_error("Database non raggiungibile") sys.exit(1) # Verifica necessità if not needs_training(args.force_training): log_result("Addestramento non necessario") sys.exit(0) try: # Connessione engine = connect_to_database_ultra() if not engine: log_error("Connessione fallita") sys.exit(1) # Estrazione ultra-veloce df = extract_data_ultra_fast(engine, args.time_window, args.max_records, args.batch_size) if df.empty: log_error("Nessun dato estratto") sys.exit(1) # Preparazione ultra-veloce X, preprocessor = prepare_data_ultra_fast(df) if X is None: log_error("Preparazione dati fallita") sys.exit(1) # Addestramento ultra-veloce models = train_models_ultra_fast(X) if all(m is not None for m in models): log_phase("SUCCESSO!") # Salva timestamp save_model_timestamp() # Statistiche finali elapsed = time.time() - start_time memory_usage = psutil.virtual_memory().percent log_result(f"Tempo totale: {elapsed:.1f} secondi") log_result(f"Memoria finale: {memory_usage:.1f}%") log_result(f"Campioni addestramento: {X.shape[0]}") log_result(f"Feature utilizzate: {X.shape[1]}") else: log_error("Addestramento fallito") sys.exit(1) except Exception as e: log_error(f"Errore generale: {e}") logging.error(traceback.format_exc()) sys.exit(1) if __name__ == "__main__": main()