#!/usr/bin/env python3 import pandas as pd from sqlalchemy import create_engine from sqlalchemy.sql import text from joblib import load import logging import gc import os import time import sys from collections import defaultdict from datetime import datetime, timedelta import numpy as np import threading import argparse import signal from concurrent.futures import ThreadPoolExecutor, as_completed import warnings warnings.filterwarnings('ignore') # Configurazione del logging semplificata logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('detect_debug.log') ] ) # Configurazione del database DB_USER = os.environ.get('DB_USER', 'root') DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-') DB_HOST = os.environ.get('DB_HOST', 'localhost') DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK') CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}' # Percorsi dei file MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models') MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib') PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib') WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt') LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt') # Parametri semplificati RISK_LEVELS = { 'NORMALE': 0.1, 'BASSO': 0.3, 'MEDIO': 0.6, 'ALTO': 0.8, 'CRITICO': 0.95 } # Colori per output class Colors: BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' END = '\033[0m' def log_phase(message): print(f"\n{Colors.BOLD}{Colors.GREEN}▶ {message}{Colors.END}\n") logging.info(f"FASE: {message}") def log_result(message): print(f"{Colors.BLUE}✓ {message}{Colors.END}") logging.info(f"RISULTATO: {message}") def log_warning(message): print(f"{Colors.YELLOW}⚠ {message}{Colors.END}") logging.warning(message) def log_error(message): print(f"{Colors.RED}✗ {message}{Colors.END}") logging.error(message) # Variabili globali per statistiche stats = { 'records_processed': 0, 'anomalies_found': 0, 'ips_analyzed': 0, 'start_time': None } def reset_stats(): global stats stats['records_processed'] = 0 stats['anomalies_found'] = 0 stats['ips_analyzed'] = 0 stats['start_time'] = time.time() def update_stats(records=0, anomalies=0, ips=0): global stats stats['records_processed'] += records stats['anomalies_found'] += anomalies stats['ips_analyzed'] += ips def print_stats(): global stats elapsed = time.time() - stats['start_time'] if stats['start_time'] else 0 print(f""" {Colors.BOLD}======== STATISTICHE RILEVAMENTO ========{Colors.END} Tempo trascorso: {elapsed:.1f} secondi Record processati: {stats['records_processed']} Anomalie trovate: {stats['anomalies_found']} IP analizzati: {stats['ips_analyzed']} {Colors.BOLD}========================================={Colors.END} """) def create_engine_simple(): """Crea connessione database semplificata""" try: engine = create_engine( CONN_STRING, pool_size=3, max_overflow=5, pool_recycle=1800, pool_pre_ping=True, pool_timeout=30, echo=False ) # Test connessione with engine.connect() as conn: conn.execute(text("SELECT 1")).fetchone() return engine except Exception as e: log_error(f"Errore connessione database: {e}") return None def load_models_simple(): """Carica i modelli in modo semplificato""" try: log_phase("Caricamento modelli") if not os.path.exists(MODEL_PATH): log_error(f"Modello non trovato: {MODEL_PATH}") return None, None model = load(MODEL_PATH) log_result("Modello caricato") # Carica preprocessor se disponibile preprocessor = None if os.path.exists(PREPROCESSOR_PATH): preprocessor = load(PREPROCESSOR_PATH) log_result("Preprocessor caricato") else: log_warning("Preprocessor non trovato, usando fallback") preprocessor = {'feature_columns': [f'feature_{i}' for i in range(50)]} return model, preprocessor except Exception as e: log_error(f"Errore caricamento modelli: {e}") return None, None def load_whitelist_simple(): """Carica whitelist semplificata""" try: if not os.path.exists(WHITELIST_PATH): log_warning("Whitelist non trovata, usando lista vuota") return set() with open(WHITELIST_PATH, 'r') as f: whitelist = set(line.strip() for line in f if line.strip() and not line.startswith('#')) log_result(f"Whitelist caricata: {len(whitelist)} IP") return whitelist except Exception as e: log_warning(f"Errore caricamento whitelist: {e}") return set() def load_last_analyzed_id(): """Carica ultimo ID analizzato""" try: if os.path.exists(LAST_ID_PATH): with open(LAST_ID_PATH, 'r') as f: return int(f.read().strip()) return 0 except Exception as e: log_warning(f"Errore caricamento ultimo ID: {e}") return 0 def save_last_analyzed_id(last_id): """Salva ultimo ID analizzato""" try: with open(LAST_ID_PATH, 'w') as f: f.write(str(last_id)) except Exception as e: log_warning(f"Errore salvataggio ultimo ID: {e}") def extract_data_simple(engine, last_id=0, batch_size=10000): """Estrazione dati semplificata""" try: log_phase(f"Estrazione dati da ID {last_id}") # Query semplice query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3 FROM Esterna WHERE ID > :last_id ORDER BY ID ASC LIMIT :batch_size """) df = pd.read_sql(query, engine, params={ "last_id": last_id, "batch_size": batch_size }) log_result(f"Estratti {len(df)} record") return df except Exception as e: log_error(f"Errore estrazione dati: {e}") return pd.DataFrame() def prepare_data_simple(df): """Preparazione dati compatibile con analisys_01.py""" try: if df.empty: return None # Stessa logica di analisys_01.py per compatibilità feature_data = {} n_samples = len(df) # 1. Feature temporali (10 feature) if 'Data' in df.columns and 'Ora' in df.columns: try: df['Data'] = pd.to_datetime(df['Data'], errors='coerce') df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce') df['Timestamp'] = df['Data'] + df['Ora'] feature_data['hour'] = df['Timestamp'].dt.hour.fillna(0).values feature_data['day'] = df['Timestamp'].dt.dayofweek.fillna(0).values feature_data['minute'] = df['Timestamp'].dt.minute.fillna(0).values except: feature_data['hour'] = np.zeros(n_samples) feature_data['day'] = np.zeros(n_samples) feature_data['minute'] = np.zeros(n_samples) else: feature_data['hour'] = np.zeros(n_samples) feature_data['day'] = np.zeros(n_samples) feature_data['minute'] = np.zeros(n_samples) # 7 feature temporali aggiuntive for i in range(7): feature_data[f'time_{i}'] = np.random.random(n_samples) * 0.1 # 2. Feature protocollo (15 feature) if 'Messaggio1' in df.columns: proto_data = df['Messaggio1'].fillna('').astype(str) protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'SSH', 'FTP', 'DNS'] for i, protocol in enumerate(protocols): feature_data[f'proto_{i}'] = proto_data.str.contains(protocol, case=False).astype(int).values for i in range(len(protocols), 15): feature_data[f'proto_{i}'] = np.zeros(n_samples) else: for i in range(15): feature_data[f'proto_{i}'] = np.zeros(n_samples) # 3. Feature Host (5 feature) if 'Host' in df.columns: host_data = df['Host'].fillna('').astype(str) feature_data['host_fibra'] = host_data.str.contains('FIBRA', case=False).astype(int).values feature_data['host_empty'] = df['Host'].isna().astype(int).values feature_data['host_len'] = host_data.str.len().values / 100.0 else: feature_data['host_fibra'] = np.zeros(n_samples) feature_data['host_empty'] = np.zeros(n_samples) feature_data['host_len'] = np.zeros(n_samples) for i in range(3, 5): feature_data[f'host_{i}'] = np.zeros(n_samples) # 4. Feature IP (10 feature) if 'Messaggio2' in df.columns: ip_data = df['Messaggio2'].str.split(':').str[0].fillna('unknown').astype(str) for i in range(10): feature_data[f'ip_{i}'] = (pd.util.hash_array(ip_data.values) % (2**(i+3))) / (2**(i+3)) else: for i in range(10): feature_data[f'ip_{i}'] = np.zeros(n_samples) # 5. Feature ID (10 feature) if 'ID' in df.columns: id_values = df['ID'].fillna(0).values id_normalized = (id_values - id_values.min()) / (id_values.max() - id_values.min() + 1) for i in range(10): feature_data[f'id_{i}'] = np.roll(id_normalized, i) * (0.9 ** i) else: for i in range(10): feature_data[f'id_{i}'] = np.zeros(n_samples) # Assicura 50 feature totali total_features = len(feature_data) if total_features < 50: for i in range(total_features, 50): feature_data[f'extra_{i}'] = np.zeros(n_samples) elif total_features > 50: keys_to_remove = list(feature_data.keys())[50:] for key in keys_to_remove: del feature_data[key] # Crea array numpy feature_names = sorted(feature_data.keys()) X = np.column_stack([feature_data[name] for name in feature_names]) return X except Exception as e: log_error(f"Errore preparazione dati: {e}") return None def predict_anomalies_simple(model, features, sensitivity=5): """Predizione anomalie semplificata""" try: if features is None or features.shape[0] == 0: return np.array([]) # Predizione base predictions = model.predict(features) # Applica sensibilità se supportata if hasattr(model, 'decision_function'): try: scores = model.decision_function(features) threshold = -0.2 * (sensitivity / 5.0) predictions = np.where(scores < threshold, -1, 1) except: pass # Usa predizioni standard return predictions except Exception as e: log_error(f"Errore predizione: {e}") return np.array([]) def handle_anomaly_simple(engine, ip_address, risk_level='MEDIO'): """Gestione anomalia semplificata""" try: if not ip_address or pd.isna(ip_address): return False # Inserimento semplificato nella tabella Fibra with engine.connect() as conn: insert_query = text(""" INSERT INTO Fibra (IndirizzoIP, Data, Ora, Host, Attivo, Lista, NumeroAttacchi, LivelloDiRischio) VALUES (:ip, CURDATE(), CURTIME(), '', 1, 'ddos_detect', 1, 2) ON DUPLICATE KEY UPDATE Attivo = 1, NumeroAttacchi = NumeroAttacchi + 1, Data = CURDATE(), Ora = CURTIME() """) conn.execute(insert_query, {"ip": ip_address}) conn.commit() return True except Exception as e: log_warning(f"Errore gestione anomalia per {ip_address}: {e}") return False def process_batch_simple(df, engine, model, whitelist, sensitivity=5): """Processamento batch semplificato""" try: if df.empty: return 0, 0 # Prepara dati X = prepare_data_simple(df) if X is None: return 0, 0 # Predizione predictions = predict_anomalies_simple(model, X, sensitivity) if len(predictions) == 0: return 0, 0 # Trova anomalie anomaly_indices = np.where(predictions == -1)[0] anomaly_count = len(anomaly_indices) if anomaly_count == 0: return len(df), 0 # Estrai IP dalle anomalie processed_ips = 0 for idx in anomaly_indices: if 'Messaggio2' in df.columns: msg2 = df.iloc[idx]['Messaggio2'] if pd.notna(msg2) and ':' in str(msg2): ip = str(msg2).split(':')[0] # Controlla whitelist if ip not in whitelist: if handle_anomaly_simple(engine, ip): processed_ips += 1 return len(df), processed_ips except Exception as e: log_error(f"Errore processamento batch: {e}") return 0, 0 def run_detection(args): """Esecuzione rilevamento principale""" try: reset_stats() # Carica componenti engine = create_engine_simple() if not engine: return False model, preprocessor = load_models_simple() if not model: return False whitelist = load_whitelist_simple() last_id = load_last_analyzed_id() log_result(f"Avvio rilevamento da ID {last_id}") # Estrai e processa dati df = extract_data_simple(engine, last_id, args.batch_size) if df.empty: log_result("Nessun nuovo dato da analizzare") return True # Processa batch records_processed, anomalies_found = process_batch_simple( df, engine, model, whitelist, args.sensibility ) # Aggiorna statistiche update_stats(records_processed, anomalies_found, len(df['Messaggio2'].dropna().unique()) if 'Messaggio2' in df.columns else 0) # Salva ultimo ID if not df.empty: last_analyzed_id = df['ID'].max() save_last_analyzed_id(last_analyzed_id) # Mostra risultati print_stats() log_result(f"Rilevamento completato: {anomalies_found} anomalie su {records_processed} record") return True except Exception as e: log_error(f"Errore rilevamento: {e}") return False def main(): """Funzione principale semplificata""" parser = argparse.ArgumentParser(description='Rilevamento DDoS semplificato v02') parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione batch (default: 10k)') parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11), help='Sensibilità 1-10 (default: 5)') parser.add_argument('--ciclo', action='store_true', help='Esecuzione in ciclo continuo') parser.add_argument('--pausa', type=int, default=60, help='Pausa tra cicli in secondi (default: 60)') parser.add_argument('--debug', action='store_true', help='Debug logging') args = parser.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) log_phase("Sistema rilevamento DDoS SEMPLIFICATO v02") log_result(f"Config: batch {args.batch_size}, sensibilità {args.sensibility}") # Gestione interruzione def signal_handler(signum, frame): log_warning("Interruzione ricevuta") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Esecuzione if args.ciclo: log_result("Modalità ciclo continuo attivata") ciclo = 0 while True: ciclo += 1 log_phase(f"Ciclo {ciclo}") success = run_detection(args) if success: log_result(f"Ciclo {ciclo} completato. Pausa {args.pausa} secondi...") else: log_error(f"Errore nel ciclo {ciclo}. Pausa {args.pausa * 2} secondi...") time.sleep(args.pausa) time.sleep(args.pausa) else: # Esecuzione singola success = run_detection(args) sys.exit(0 if success else 1) if __name__ == "__main__": main()