#!/usr/bin/env python3 import pandas as pd from sqlalchemy import create_engine from sqlalchemy.sql import text from joblib import load import logging import gc import os import time import sys from collections import defaultdict from datetime import datetime, timedelta import numpy as np import threading import argparse import signal from concurrent.futures import ThreadPoolExecutor, as_completed import warnings warnings.filterwarnings('ignore') # Configurazione del logging semplificata logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('detect_debug.log') ] ) # Configurazione del database try: from config_database import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD print(f"✅ Config caricata: {DB_HOST}:{DB_PORT}/{DB_NAME}") except ImportError: # Fallback se config_database.py non esiste DB_USER = os.environ.get('DB_USER', 'root') DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-') DB_HOST = os.environ.get('DB_HOST', 'localhost') DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK') DB_PORT = '3306' CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}' # Percorsi dei file MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models') MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib') PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib') WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt') LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt') # Parametri semplificati RISK_LEVELS = { 'NORMALE': 0.1, 'BASSO': 0.3, 'MEDIO': 0.6, 'ALTO': 0.8, 'CRITICO': 0.95 } # Colori per output class Colors: BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' CYAN = '\033[96m' MAGENTA = '\033[95m' WHITE = '\033[97m' END = '\033[0m' def log_phase(message): print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n") logging.info(f"FASE: {message}") def log_result(message): print(f"{Colors.BLUE}✓ {message}{Colors.END}") logging.info(f"RISULTATO: {message}") def log_warning(message): print(f"{Colors.YELLOW}⚠ {message}{Colors.END}") logging.warning(message) def log_error(message): print(f"{Colors.RED}✗ {message}{Colors.END}") logging.error(message) def log_info(message): print(f"{Colors.CYAN}i {message}{Colors.END}") logging.info(message) def log_anomaly(message): print(f"{Colors.BOLD}{Colors.RED}! {message}{Colors.END}") logging.warning(message) def log_success(message): print(f"{Colors.BOLD}{Colors.GREEN}* {message}{Colors.END}") logging.info(message) # Variabili globali per statistiche in tempo reale live_stats = { 'records_processed': 0, 'anomalies_found': 0, 'ips_analyzed': 0, 'ips_blocked': 0, 'start_time': None, 'last_update': 0, 'current_batch': 0, 'total_batches': 0, 'processing_rate': 0, 'anomaly_rate': 0 } def reset_stats(): """Reset delle statistiche""" global live_stats live_stats['records_processed'] = 0 live_stats['anomalies_found'] = 0 live_stats['ips_analyzed'] = 0 live_stats['ips_blocked'] = 0 live_stats['start_time'] = time.time() live_stats['last_update'] = 0 live_stats['current_batch'] = 0 live_stats['total_batches'] = 0 live_stats['processing_rate'] = 0 live_stats['anomaly_rate'] = 0 def update_stats(records=0, anomalies=0, ips=0, blocked=0): """Aggiorna le statistiche""" global live_stats live_stats['records_processed'] += records live_stats['anomalies_found'] += anomalies live_stats['ips_analyzed'] += ips live_stats['ips_blocked'] += blocked # Calcola rate elapsed = time.time() - live_stats['start_time'] if live_stats['start_time'] else 1 live_stats['processing_rate'] = live_stats['records_processed'] / elapsed live_stats['anomaly_rate'] = (live_stats['anomalies_found'] / max(1, live_stats['records_processed'])) * 100 def show_live_stats(force=False): """Mostra statistiche in tempo reale""" global live_stats current_time = time.time() # Aggiorna ogni 3 secondi o se forzato if not force and (current_time - live_stats['last_update']) < 3: return elapsed = current_time - live_stats['start_time'] if live_stats['start_time'] else 0 # Calcola ETA se abbiamo info sui batch eta_str = "N/A" if live_stats['total_batches'] > 0 and live_stats['current_batch'] > 0: progress = live_stats['current_batch'] / live_stats['total_batches'] if progress > 0: remaining_time = (elapsed / progress) - elapsed if remaining_time > 0: eta_str = f"{remaining_time:.0f}s" # Header colorato print(f"\n{Colors.BOLD}{Colors.WHITE}{'='*70}{Colors.END}") print(f"{Colors.BOLD}{Colors.CYAN}📊 STATISTICHE RILEVAMENTO DDoS - TEMPO REALE{Colors.END}") print(f"{Colors.BOLD}{Colors.WHITE}{'='*70}{Colors.END}") # Statistiche principali print(f"{Colors.BOLD}⏱️ Tempo trascorso:{Colors.END} {Colors.GREEN}{elapsed:.1f}s{Colors.END}") print(f"{Colors.BOLD}📈 Record processati:{Colors.END} {Colors.BLUE}{live_stats['records_processed']:,}{Colors.END}") print(f"{Colors.BOLD}🚨 Anomalie trovate:{Colors.END} {Colors.RED}{live_stats['anomalies_found']:,}{Colors.END}") print(f"{Colors.BOLD}🔍 IP analizzati:{Colors.END} {Colors.CYAN}{live_stats['ips_analyzed']:,}{Colors.END}") print(f"{Colors.BOLD}🛡️ IP bloccati:{Colors.END} {Colors.YELLOW}{live_stats['ips_blocked']:,}{Colors.END}") # Metriche di performance print(f"{Colors.BOLD}⚡ Velocità:{Colors.END} {Colors.MAGENTA}{live_stats['processing_rate']:.1f} record/sec{Colors.END}") print(f"{Colors.BOLD}📊 Tasso anomalie:{Colors.END} {Colors.RED}{live_stats['anomaly_rate']:.2f}%{Colors.END}") # Progress batch se disponibile if live_stats['total_batches'] > 0: batch_progress = (live_stats['current_batch'] / live_stats['total_batches']) * 100 print(f"{Colors.BOLD}📦 Batch:{Colors.END} {Colors.GREEN}{live_stats['current_batch']}/{live_stats['total_batches']}{Colors.END} ({batch_progress:.1f}%) - ETA: {eta_str}") print(f"{Colors.BOLD}{Colors.WHITE}{'='*70}{Colors.END}\n") live_stats['last_update'] = current_time def show_spinner(message, duration=1): """Mostra uno spinner animato""" spinner_chars = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] end_time = time.time() + duration i = 0 while time.time() < end_time: print(f"\r{Colors.CYAN}{spinner_chars[i % len(spinner_chars)]} {message}{Colors.END}", end='') sys.stdout.flush() time.sleep(0.1) i += 1 print(f"\r{Colors.GREEN}✓ {message}{Colors.END}") def create_engine_simple(): """Crea connessione database con feedback""" try: log_info("Configurazione connessione database...") log_info(f"Host: {DB_HOST}, Database: {DB_NAME}, User: {DB_USER}") show_spinner("Creazione engine database...", 1) engine = create_engine( CONN_STRING, pool_size=3, max_overflow=5, pool_recycle=1800, pool_pre_ping=True, pool_timeout=30, echo=False ) show_spinner("Test connessione...", 1) # Test connessione with engine.connect() as conn: conn.execute(text("SELECT 1")).fetchone() log_result("Connessione database stabilita") return engine except Exception as e: log_error(f"Errore connessione database: {e}") return None def load_models_simple(): """Carica i modelli con feedback dettagliato""" try: log_phase("Caricamento modelli di machine learning") # Verifica esistenza file log_info(f"Verifica file modello: {MODEL_PATH}") if not os.path.exists(MODEL_PATH): log_error(f"Modello non trovato: {MODEL_PATH}") return None, None file_size = os.path.getsize(MODEL_PATH) / 1024 log_info(f"Dimensione file modello: {file_size:.1f} KB") show_spinner("Caricamento Isolation Forest...", 2) model = load(MODEL_PATH) log_result("✓ Isolation Forest caricato") # Carica preprocessor se disponibile preprocessor = None if os.path.exists(PREPROCESSOR_PATH): show_spinner("Caricamento preprocessor...", 1) preprocessor = load(PREPROCESSOR_PATH) n_features = len(preprocessor.get('feature_columns', [])) log_result(f"✓ Preprocessor caricato ({n_features} feature)") else: log_warning("Preprocessor non trovato, usando fallback") preprocessor = {'feature_columns': [f'feature_{i}' for i in range(50)]} log_success("Tutti i modelli caricati con successo") return model, preprocessor except Exception as e: log_error(f"Errore caricamento modelli: {e}") return None, None def load_whitelist_simple(): """Carica whitelist con feedback""" try: log_info("Caricamento whitelist IP...") if not os.path.exists(WHITELIST_PATH): log_warning(f"File whitelist non trovato: {WHITELIST_PATH}") log_info("Creazione whitelist vuota") return set() show_spinner("Lettura file whitelist...", 1) with open(WHITELIST_PATH, 'r') as f: lines = f.readlines() whitelist = set() for line in lines: line = line.strip() if line and not line.startswith('#'): whitelist.add(line) log_result(f"Whitelist caricata: {len(whitelist)} IP") # Mostra alcuni esempi se disponibili if whitelist: examples = list(whitelist)[:3] log_info(f"Esempi IP whitelistati: {', '.join(examples)}") return whitelist except Exception as e: log_warning(f"Errore caricamento whitelist: {e}") return set() def load_last_analyzed_id(): """Carica ultimo ID analizzato con feedback""" try: if os.path.exists(LAST_ID_PATH): with open(LAST_ID_PATH, 'r') as f: last_id = int(f.read().strip()) log_info(f"Ultimo ID analizzato: {last_id:,}") return last_id else: log_info("Nessun ID precedente trovato, partendo da 0") return 0 except Exception as e: log_warning(f"Errore caricamento ultimo ID: {e}") return 0 def save_last_analyzed_id(last_id): """Salva ultimo ID analizzato""" try: with open(LAST_ID_PATH, 'w') as f: f.write(str(last_id)) log_info(f"Ultimo ID salvato: {last_id:,}") except Exception as e: log_warning(f"Errore salvataggio ultimo ID: {e}") def extract_data_simple(engine, last_id=0, batch_size=10000): """Estrazione dati con feedback dettagliato""" try: log_phase(f"Estrazione dati da ID {last_id:,}") log_info(f"Parametri: batch_size={batch_size:,}, last_id={last_id:,}") show_spinner("Preparazione query di estrazione...", 1) # Query semplice query = text(""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3 FROM Esterna WHERE ID > :last_id ORDER BY ID ASC LIMIT :batch_size """) log_info("Esecuzione query sul database...") start_time = time.time() df = pd.read_sql(query, engine, params={ "last_id": last_id, "batch_size": batch_size }) elapsed = time.time() - start_time if df.empty: log_warning("Nessun nuovo record trovato") else: log_result(f"Estratti {len(df):,} record in {elapsed:.1f} secondi") log_info(f"Range ID: {df['ID'].min():,} - {df['ID'].max():,}") log_info(f"Colonne disponibili: {list(df.columns)}") # Analisi rapida dei dati if 'Messaggio2' in df.columns: unique_ips = df['Messaggio2'].str.split(':').str[0].nunique() log_info(f"IP unici nel batch: {unique_ips:,}") return df except Exception as e: log_error(f"Errore estrazione dati: {e}") return pd.DataFrame() def prepare_data_simple(df): """Preparazione dati compatibile con analisys_02.py""" try: if df.empty: return None log_info(f"Preparazione feature per {len(df):,} record...") # Stessa logica di analisys_02.py per compatibilità feature_data = {} n_samples = len(df) show_spinner("Estrazione feature temporali...", 1) # 1. Feature temporali (10 feature) if 'Data' in df.columns and 'Ora' in df.columns: try: df['Data'] = pd.to_datetime(df['Data'], errors='coerce') df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce') df['Timestamp'] = df['Data'] + df['Ora'] feature_data['hour'] = df['Timestamp'].dt.hour.fillna(0).values feature_data['day'] = df['Timestamp'].dt.dayofweek.fillna(0).values feature_data['minute'] = df['Timestamp'].dt.minute.fillna(0).values log_info("✓ Feature temporali estratte") except: feature_data['hour'] = np.zeros(n_samples) feature_data['day'] = np.zeros(n_samples) feature_data['minute'] = np.zeros(n_samples) log_warning("⚠ Fallback feature temporali") else: feature_data['hour'] = np.zeros(n_samples) feature_data['day'] = np.zeros(n_samples) feature_data['minute'] = np.zeros(n_samples) # 7 feature temporali aggiuntive for i in range(7): feature_data[f'time_{i}'] = np.random.random(n_samples) * 0.1 show_spinner("Analisi protocolli di rete...", 1) # 2. Feature protocollo (15 feature) if 'Messaggio1' in df.columns: proto_data = df['Messaggio1'].fillna('').astype(str) protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'SSH', 'FTP', 'DNS'] protocol_counts = {} for i, protocol in enumerate(protocols): matches = proto_data.str.contains(protocol, case=False).astype(int) feature_data[f'proto_{i}'] = matches.values protocol_counts[protocol] = matches.sum() if any(protocol_counts.values()): log_info(f"✓ Protocolli rilevati: {protocol_counts}") for i in range(len(protocols), 15): feature_data[f'proto_{i}'] = np.zeros(n_samples) else: for i in range(15): feature_data[f'proto_{i}'] = np.zeros(n_samples) show_spinner("Elaborazione host e IP...", 1) # 3. Feature Host (5 feature) if 'Host' in df.columns: host_data = df['Host'].fillna('').astype(str) feature_data['host_fibra'] = host_data.str.contains('FIBRA', case=False).astype(int).values feature_data['host_empty'] = df['Host'].isna().astype(int).values feature_data['host_len'] = host_data.str.len().values / 100.0 else: feature_data['host_fibra'] = np.zeros(n_samples) feature_data['host_empty'] = np.zeros(n_samples) feature_data['host_len'] = np.zeros(n_samples) for i in range(3, 5): feature_data[f'host_{i}'] = np.zeros(n_samples) # 4. Feature IP (10 feature) if 'Messaggio2' in df.columns: ip_data = df['Messaggio2'].str.split(':').str[0].fillna('unknown').astype(str) for i in range(10): feature_data[f'ip_{i}'] = (pd.util.hash_array(ip_data.values) % (2**(i+3))) / (2**(i+3)) else: for i in range(10): feature_data[f'ip_{i}'] = np.zeros(n_samples) # 5. Feature ID (10 feature) if 'ID' in df.columns: id_values = df['ID'].fillna(0).values id_normalized = (id_values - id_values.min()) / (id_values.max() - id_values.min() + 1) for i in range(10): feature_data[f'id_{i}'] = np.roll(id_normalized, i) * (0.9 ** i) else: for i in range(10): feature_data[f'id_{i}'] = np.zeros(n_samples) # Assicura 50 feature totali total_features = len(feature_data) if total_features < 50: for i in range(total_features, 50): feature_data[f'extra_{i}'] = np.zeros(n_samples) elif total_features > 50: keys_to_remove = list(feature_data.keys())[50:] for key in keys_to_remove: del feature_data[key] # Crea array numpy feature_names = sorted(feature_data.keys()) X = np.column_stack([feature_data[name] for name in feature_names]) log_result(f"Matrice feature preparata: {X.shape[0]:,} × {X.shape[1]}") return X except Exception as e: log_error(f"Errore preparazione dati: {e}") return None def predict_anomalies_simple(model, features, sensitivity=5): """Predizione anomalie con feedback""" try: if features is None or features.shape[0] == 0: return np.array([]) log_info(f"Predizione su {features.shape[0]:,} campioni (sensibilità: {sensitivity}/10)") show_spinner("Esecuzione predizione ML...", 1) # Predizione base predictions = model.predict(features) # Applica sensibilità se supportata if hasattr(model, 'decision_function'): try: scores = model.decision_function(features) threshold = -0.2 * (sensitivity / 5.0) predictions = np.where(scores < threshold, -1, 1) log_info(f"✓ Applicata sensibilità personalizzata (threshold: {threshold:.3f})") except: log_warning("⚠ Fallback a predizione standard") anomaly_count = np.sum(predictions == -1) normal_count = np.sum(predictions == 1) log_result(f"Predizione completata: {anomaly_count:,} anomalie, {normal_count:,} normali") return predictions except Exception as e: log_error(f"Errore predizione: {e}") return np.array([]) def handle_anomaly_simple(engine, ip_address, risk_level='ALTO'): """Gestione anomalia con feedback - Usa tabella ip_list""" try: if not ip_address or pd.isna(ip_address): return False log_anomaly(f"Gestione anomalia per IP: {ip_address}") # Inserimento nella tabella ip_list (struttura corretta) with engine.connect() as conn: insert_query = text(""" INSERT INTO ip_list (list_name, ip_address, risk_level) VALUES ('ddos_detect_v03', :ip, :risk_level) ON DUPLICATE KEY UPDATE retrieved_at = CURRENT_TIMESTAMP, risk_level = :risk_level """) conn.execute(insert_query, { "ip": ip_address, "risk_level": risk_level }) conn.commit() log_success(f"IP {ip_address} inserito/aggiornato nella tabella ip_list") return True except Exception as e: log_warning(f"Errore gestione anomalia per {ip_address}: {e}") return False def process_batch_simple(df, engine, model, whitelist, sensitivity=5): """Processamento batch con feedback dettagliato""" try: if df.empty: return 0, 0 log_info(f"Processamento batch di {len(df):,} record...") # Prepara dati X = prepare_data_simple(df) if X is None: log_warning("Preparazione dati fallita") return 0, 0 # Predizione predictions = predict_anomalies_simple(model, X, sensitivity) if len(predictions) == 0: log_warning("Nessuna predizione ottenuta") return 0, 0 # Trova anomalie anomaly_indices = np.where(predictions == -1)[0] anomaly_count = len(anomaly_indices) log_info(f"Anomalie rilevate nel batch: {anomaly_count:,}") if anomaly_count == 0: return len(df), 0 # Estrai IP dalle anomalie processed_ips = 0 blocked_ips = [] for idx in anomaly_indices: if 'Messaggio2' in df.columns: msg2 = df.iloc[idx]['Messaggio2'] if pd.notna(msg2) and ':' in str(msg2): ip = str(msg2).split(':')[0] # Controlla whitelist if ip not in whitelist: if handle_anomaly_simple(engine, ip, 'ALTO'): processed_ips += 1 blocked_ips.append(ip) else: log_info(f"IP {ip} in whitelist, ignorato") if blocked_ips: log_anomaly(f"IP bloccati in questo batch: {len(blocked_ips)}") # Mostra alcuni esempi examples = blocked_ips[:3] log_info(f"Esempi IP bloccati: {', '.join(examples)}") return len(df), processed_ips except Exception as e: log_error(f"Errore processamento batch: {e}") return 0, 0 def run_detection(args): """Esecuzione rilevamento principale con feedback completo""" try: log_phase("Avvio sistema di rilevamento DDoS v03") reset_stats() # Carica componenti engine = create_engine_simple() if not engine: return False # Pulizia automatica IP vecchi (se richiesta) if args.cleanup: log_phase("🧹 PULIZIA AUTOMATICA IP VECCHI") removed_count = cleanup_old_ips(engine, args.retention_days) if removed_count > 0: log_result(f"Pulizia completata: {removed_count} IP rimossi") elif removed_count == 0: log_info("Pulizia completata: nessun IP da rimuovere") model, preprocessor = load_models_simple() if not model: return False whitelist = load_whitelist_simple() last_id = load_last_analyzed_id() log_success(f"Sistema inizializzato - Rilevamento da ID {last_id:,}") # Estrai e processa dati df = extract_data_simple(engine, last_id, args.batch_size) if df.empty: log_result("Nessun nuovo dato da analizzare") show_live_stats(force=True) return True # Imposta info batch per statistiche live_stats['total_batches'] = 1 live_stats['current_batch'] = 1 # Processa batch log_phase("Analisi anomalie in corso") records_processed, anomalies_found = process_batch_simple( df, engine, model, whitelist, args.sensibility ) # Aggiorna statistiche unique_ips = 0 if 'Messaggio2' in df.columns: unique_ips = df['Messaggio2'].str.split(':').str[0].nunique() update_stats(records_processed, anomalies_found, unique_ips, anomalies_found) # Salva ultimo ID if not df.empty: last_analyzed_id = df['ID'].max() save_last_analyzed_id(last_analyzed_id) # Mostra risultati finali show_live_stats(force=True) log_phase("Rilevamento completato") log_success(f"Risultati: {anomalies_found} anomalie su {records_processed:,} record") if anomalies_found > 0: anomaly_percentage = (anomalies_found / records_processed) * 100 log_anomaly(f"Tasso di anomalie: {anomaly_percentage:.2f}%") return True except Exception as e: log_error(f"Errore rilevamento: {e}") return False def main(): """Funzione principale con interfaccia migliorata""" parser = argparse.ArgumentParser(description='Rilevamento DDoS v03 - Con feedback dettagliato') parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione batch (default: 10k)') parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11), help='Sensibilità 1-10 (1=più sensibile)') parser.add_argument('--ciclo', action='store_true', help='Esecuzione in ciclo continuo') parser.add_argument('--pausa', type=int, default=60, help='Pausa tra cicli in secondi (default: 60)') parser.add_argument('--debug', action='store_true', help='Debug logging') parser.add_argument('--cleanup', action='store_true', help='Esegui pulizia IP vecchi prima del rilevamento') parser.add_argument('--retention-days', type=int, default=7, help='Giorni di ritenzione IP bloccati (default: 7)') args = parser.parse_args() if args.debug: logging.getLogger().setLevel(logging.DEBUG) # Header con informazioni dettagliate print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}") print(f"{Colors.BOLD}{Colors.CYAN}🛡️ SISTEMA RILEVAMENTO DDoS v03 - FEEDBACK DETTAGLIATO{Colors.END}") print(f"{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}") log_info(f"Configurazione batch: {args.batch_size:,} record") log_info(f"Sensibilita rilevamento: {args.sensibility}/10") log_info(f"Debug mode: {'ON' if args.debug else 'OFF'}") log_info(f"Modalita ciclo: {'ON' if args.ciclo else 'OFF'}") log_info(f"Pulizia automatica: {'ON' if args.cleanup else 'OFF'}") if args.cleanup: log_info(f"Ritenzione IP: {args.retention_days} giorni") if args.ciclo: log_info(f"Pausa tra cicli: {args.pausa} secondi") # Gestione interruzione def signal_handler(signum, frame): print(f"\n{Colors.BOLD}{Colors.YELLOW}⚠ Interruzione ricevuta{Colors.END}") show_live_stats(force=True) log_warning("Sistema arrestato dall'utente") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Esecuzione if args.ciclo: log_success("🔄 Modalità ciclo continuo attivata") ciclo = 0 while True: ciclo += 1 print(f"\n{Colors.BOLD}{Colors.MAGENTA}{'='*50}{Colors.END}") print(f"{Colors.BOLD}{Colors.MAGENTA}🔄 CICLO {ciclo}{Colors.END}") print(f"{Colors.BOLD}{Colors.MAGENTA}{'='*50}{Colors.END}") success = run_detection(args) if success: log_success(f"Ciclo {ciclo} completato con successo") log_info(f"Pausa di {args.pausa} secondi prima del prossimo ciclo...") else: log_error(f"Errore nel ciclo {ciclo}") log_warning(f"Pausa estesa di {args.pausa * 2} secondi...") time.sleep(args.pausa) # Countdown visivo for remaining in range(args.pausa, 0, -1): print(f"\r{Colors.CYAN}⏳ Prossimo ciclo tra: {remaining:02d}s{Colors.END}", end='') sys.stdout.flush() time.sleep(1) print() # Nuova linea else: # Esecuzione singola success = run_detection(args) if success: print(f"\n{Colors.BOLD}{Colors.GREEN}🎉 RILEVAMENTO COMPLETATO CON SUCCESSO!{Colors.END}") else: print(f"\n{Colors.BOLD}{Colors.RED}❌ RILEVAMENTO FALLITO!{Colors.END}") sys.exit(0 if success else 1) def cleanup_old_ips(engine, retention_days=7): """ Rimuove IP vecchi dalla tabella ip_list per ddos_detect_v03 Args: engine: Connessione database retention_days: Giorni dopo i quali rimuovere gli IP (default: 7) """ try: log_info(f"Pulizia IP più vecchi di {retention_days} giorni...") with engine.connect() as conn: # Query per contare IP da rimuovere count_query = text(""" SELECT COUNT(*) as count FROM ip_list WHERE list_name = 'ddos_detect_v03' AND retrieved_at < DATE_SUB(NOW(), INTERVAL :days DAY) """) old_count = conn.execute(count_query, {"days": retention_days}).fetchone()[0] if old_count > 0: # Rimuovi IP vecchi cleanup_query = text(""" DELETE FROM ip_list WHERE list_name = 'ddos_detect_v03' AND retrieved_at < DATE_SUB(NOW(), INTERVAL :days DAY) """) result = conn.execute(cleanup_query, {"days": retention_days}) removed_count = result.rowcount log_result(f"Rimossi {removed_count} IP vecchi dalla lista ddos_detect_v03") return removed_count else: log_info("Nessun IP vecchio da rimuovere") return 0 except Exception as e: log_anomaly(f"Errore pulizia IP vecchi: {e}") return -1 if __name__ == "__main__": main()