ids.alfacom.it/extracted_idf/analisys_02.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

1129 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from joblib import dump, load
import logging
import gc
import os
import time
from datetime import datetime, timedelta
import numpy as np
import argparse
import sys
import traceback
import warnings
import threading
warnings.filterwarnings('ignore')
# Configurazione del logging semplificata senza emoji
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('analisys_debug.log', encoding='utf-8')
]
)
# Configurazione del database (modificabile per server remoto)
try:
# Prova a importare configurazione esterna
from config_database import DB_USER, DB_PASSWORD, DB_HOST, DB_NAME, DB_PORT
log_info = lambda x: None # Placeholder temporaneo
log_info("✓ Configurazione caricata da config_database.py")
except ImportError:
# Fallback su configurazione interna
DB_USER = os.environ.get('DB_USER', 'root')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
DB_HOST = os.environ.get('DB_HOST', 'localhost') # Cambia questo per server remoto
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
DB_PORT = os.environ.get('DB_PORT', '3306')
# Cartella per i modelli
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
try:
os.makedirs(MODEL_DIR, exist_ok=True)
except Exception as e:
logging.error(f"Errore nella creazione della directory models: {e}")
MODEL_DIR = '.'
# Percorsi dei modelli
IF_MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
LOF_MODEL_PATH = os.path.join(MODEL_DIR, 'lof.joblib')
SVM_MODEL_PATH = os.path.join(MODEL_DIR, 'svm.joblib')
ENSEMBLE_MODEL_PATH = os.path.join(MODEL_DIR, 'ensemble_weights.joblib')
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
# Parametri semplificati con timeout più aggressivi per server remoti
TRAINING_FREQUENCY_HOURS = 12
MAX_TRAINING_SAMPLES = 50000
MIN_TRAINING_SAMPLES = 500
CONNECTION_TIMEOUT = 5 # Timeout ridotto da 10 a 5 secondi per server remoti
# Colori per output
class Colors:
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
CYAN = '\033[96m'
MAGENTA = '\033[95m'
END = '\033[0m'
def log_phase(message):
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n")
# Log senza emoji per compatibilità Windows
logging.info(f"FASE: {message.replace('', '').strip()}")
def log_result(message):
print(f"{Colors.BLUE}{message}{Colors.END}")
# Log senza emoji per compatibilità Windows
clean_message = message.replace('', '').replace('🎉', '').replace('💾', '').replace('⚙️', '').replace('🔍', '').replace('🌲', '').replace('⏱️', '').replace('🎯', '').strip()
logging.info(f"RISULTATO: {clean_message}")
def log_warning(message):
print(f"{Colors.YELLOW}{message}{Colors.END}")
# Log senza emoji per compatibilità Windows
clean_message = message.replace('', '').strip()
logging.warning(clean_message)
def log_error(message):
print(f"{Colors.RED}{message}{Colors.END}")
# Log senza emoji per compatibilità Windows
clean_message = message.replace('', '').strip()
logging.error(clean_message)
def log_info(message):
print(f"{Colors.CYAN} {message}{Colors.END}")
# Log senza emoji per compatibilità Windows
clean_message = message.replace('', '').replace('📊', '').replace('🔧', '').replace('🔄', '').replace('📅', '').replace('', '').replace('📋', '').replace('🔢', '').replace('🆕', '').replace('', '').replace('⏭️', '').strip()
logging.info(clean_message)
def log_progress(message):
print(f"{Colors.MAGENTA}{message}{Colors.END}")
# Variabili globali per progress tracking
progress_data = {
'current_step': 0,
'total_steps': 0,
'step_name': '',
'start_time': None,
'last_update': 0,
'details': ''
}
def start_progress(total_steps, operation_name):
"""Inizia il tracking del progresso"""
global progress_data
progress_data['current_step'] = 0
progress_data['total_steps'] = total_steps
progress_data['step_name'] = operation_name
progress_data['start_time'] = time.time()
progress_data['last_update'] = 0
progress_data['details'] = ''
print(f"\n{Colors.BOLD}{Colors.CYAN}🚀 AVVIO: {operation_name}{Colors.END}")
print(f"{Colors.CYAN}📊 Passi totali: {total_steps}{Colors.END}")
show_progress_bar()
def update_progress(step_increment=1, details=''):
"""Aggiorna il progresso"""
global progress_data
progress_data['current_step'] += step_increment
if details:
progress_data['details'] = details
current_time = time.time()
# Mostra aggiornamento ogni 2 secondi o se ci sono dettagli
if details or (current_time - progress_data['last_update']) >= 2:
show_progress_bar()
progress_data['last_update'] = current_time
def show_progress_bar():
"""Mostra una progress bar colorata"""
global progress_data
if progress_data['total_steps'] == 0:
return
current = min(progress_data['current_step'], progress_data['total_steps'])
percentage = (current / progress_data['total_steps']) * 100
# Calcola tempo trascorso e stimato
elapsed = time.time() - progress_data['start_time'] if progress_data['start_time'] else 0
if percentage > 0:
estimated_total = (elapsed / percentage) * 100
remaining = max(0, estimated_total - elapsed)
else:
remaining = 0
# Crea la barra di progresso
bar_length = 30
filled_length = int(bar_length * percentage // 100)
bar = '' * filled_length + '' * (bar_length - filled_length)
# Formatta i tempi
elapsed_str = f"{elapsed:.1f}s"
remaining_str = f"{remaining:.1f}s" if remaining > 0 else "N/A"
# Mostra la barra
print(f"\r{Colors.BOLD}[{Colors.GREEN}{bar}{Colors.END}{Colors.BOLD}] {percentage:.1f}% "
f"({current}/{progress_data['total_steps']}) "
f"⏱️ {elapsed_str}{remaining_str}{Colors.END}", end='')
# Mostra dettagli se disponibili
if progress_data['details']:
print(f"\n{Colors.CYAN} └─ {progress_data['details']}{Colors.END}")
sys.stdout.flush()
def end_progress(success=True):
"""Termina il tracking del progresso"""
global progress_data
elapsed = time.time() - progress_data['start_time'] if progress_data['start_time'] else 0
if success:
print(f"\n{Colors.BOLD}{Colors.GREEN}✅ COMPLETATO: {progress_data['step_name']} in {elapsed:.1f} secondi{Colors.END}\n")
else:
print(f"\n{Colors.BOLD}{Colors.RED}❌ FALLITO: {progress_data['step_name']} dopo {elapsed:.1f} secondi{Colors.END}\n")
def show_spinner(message, duration=1):
"""Mostra uno spinner animato"""
spinner_chars = ['', '', '', '', '', '', '', '', '', '']
end_time = time.time() + duration
i = 0
while time.time() < end_time:
print(f"\r{Colors.CYAN}{spinner_chars[i % len(spinner_chars)]} {message}{Colors.END}", end='')
sys.stdout.flush()
time.sleep(0.1)
i += 1
print(f"\r{Colors.GREEN}{message}{Colors.END}")
def connect_to_database():
"""Connessione database con feedback dettagliato e timeout aggressivi per server remoti"""
try:
log_phase("Connessione al database")
log_info(f"Host: {DB_HOST}:{DB_PORT}")
log_info(f"Database: {DB_NAME}")
log_info(f"User: {DB_USER}")
show_spinner("Creazione connessione...", 1)
connection_string = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
# Configurazione ultra-ottimizzata per server remoti lenti
engine = create_engine(
connection_string,
pool_size=1, # Pool ridotto per server remoti
max_overflow=1, # Overflow minimo
pool_recycle=900, # Recycle più frequente (15 min)
pool_pre_ping=True, # Pre-ping abilitato
pool_timeout=CONNECTION_TIMEOUT,
echo=False,
connect_args={
'connection_timeout': CONNECTION_TIMEOUT,
'autocommit': True,
'raise_on_warnings': False,
'use_unicode': True,
'charset': 'utf8mb4',
'sql_mode': 'TRADITIONAL'
}
)
show_spinner("Test connessione...", 1)
# Test connessione velocissimo con timeout ridotto
try:
with engine.connect() as conn:
# Query più semplice possibile
result = conn.execute(text("SELECT 1 AS test")).fetchone()
if result and result[0] == 1:
log_result("Database connesso con successo")
return engine
else:
log_error("Test connessione fallito - risposta non valida")
return None
except Exception as e:
log_error(f"Test connessione fallito: {e}")
if "Can't connect" in str(e):
log_info("💡 Suggerimenti:")
log_info(f" - Verifica che MySQL/MariaDB sia attivo su {DB_HOST}:{DB_PORT}")
log_info(" - Per server remoto, modifica DB_HOST nel codice")
log_info(" - Oppure usa variabili d'ambiente: set DB_HOST=ip_server_remoto")
log_info(" - Per test senza database usa: --demo")
return None
except Exception as e:
log_error(f"Errore connessione: {e}")
return None
def extract_data_simple(engine, max_records=50000):
"""Estrazione dati con feedback dettagliato e timeout ottimizzati per server remoti"""
try:
log_phase(f"Estrazione dati da server remoto (max {max_records:,} record)")
start_progress(5, "Estrazione Dati Remoti")
# Step 1: Test di connessione veloce
update_progress(1, "Test connessione server remoto...")
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1")).fetchone()
log_info("✓ Connessione server remoto confermata")
except Exception as e:
end_progress(False)
log_error(f"Connessione server remoto fallita: {e}")
return pd.DataFrame()
# Step 2: Verifica rapida tabella
update_progress(1, "Verifica tabella Esterna su server remoto...")
try:
with engine.connect() as conn:
# Query molto veloce per contare record
count_query = text("SELECT COUNT(*) FROM Esterna WHERE ID > (SELECT MAX(ID) - 50000 FROM Esterna)")
recent_count = conn.execute(count_query).fetchone()[0]
log_info(f"✓ Record recenti disponibili: {recent_count:,}")
if recent_count == 0:
log_warning("Nessun record recente trovato")
# Prova un conteggio generale più piccolo
total_count = conn.execute(text("SELECT COUNT(*) FROM Esterna LIMIT 1")).fetchone()[0]
log_info(f"Record totali nella tabella: {total_count:,}")
except Exception as e:
log_warning(f"Verifica tabella: {e}")
# Step 3: Preparazione query ottimizzata per server remoto
update_progress(1, "Preparazione query ottimizzata...")
# Query con LIMIT più aggressivo per evitare timeout su server remoto
safe_limit = min(max_records, 10000) # Limite massimo sicuro per server remoto
query = text("""
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3
FROM Esterna
WHERE ID > (SELECT MAX(ID) - :safe_limit FROM Esterna)
ORDER BY ID DESC
LIMIT :max_records
""")
# Step 4: Esecuzione query con gestione timeout
update_progress(1, f"Esecuzione query remota ({safe_limit:,} record max)...")
start_time = time.time()
log_info("Invio query al server remoto...")
try:
# Esecuzione con parametri di timeout specifici per server remoto
df = pd.read_sql(
query,
engine,
params={"safe_limit": safe_limit, "max_records": max_records}
)
elapsed = time.time() - start_time
# Step 5: Verifica e finalizzazione risultati
update_progress(1, f"Elaborazione {len(df):,} record ricevuti...")
if df.empty:
log_warning("Nessun record estratto dal server remoto")
# Prova una query di fallback più semplice
log_info("Tentativo query di fallback...")
try:
fallback_query = text("SELECT * FROM Esterna ORDER BY ID DESC LIMIT 1000")
df = pd.read_sql(fallback_query, engine)
if not df.empty:
log_info(f"✓ Query di fallback riuscita: {len(df)} record")
except Exception as fallback_e:
log_error(f"Anche la query di fallback è fallita: {fallback_e}")
else:
log_info(f"✓ Record estratti: {len(df):,}")
log_info(f"✓ Colonne disponibili: {list(df.columns)}")
if len(df) > 0:
log_info(f"✓ Range ID: {df['ID'].min()} - {df['ID'].max()}")
log_info(f"✓ Velocità estrazione: {len(df)/elapsed:.0f} record/sec")
end_progress(True)
log_result(f"Estratti {len(df):,} record in {elapsed:.1f} secondi dal server remoto")
return df
except Exception as e:
end_progress(False)
log_error(f"Errore query server remoto: {e}")
# Tentativo di recovery con query più semplice
log_info("🔄 Tentativo di recovery con query semplificata...")
try:
simple_query = text("SELECT ID, Messaggio1, Messaggio2 FROM Esterna ORDER BY ID DESC LIMIT 500")
df_simple = pd.read_sql(simple_query, engine)
if not df_simple.empty:
log_info(f"✓ Recovery riuscito: {len(df_simple)} record con colonne base")
# Aggiungi colonne mancanti con valori di default
for col in ['Data', 'Ora', 'Host', 'IndirizzoIP', 'Messaggio3']:
if col not in df_simple.columns:
if col in ['Data']:
df_simple[col] = pd.Timestamp.now()
elif col in ['Ora']:
df_simple[col] = "12:00:00"
else:
df_simple[col] = "N/A"
return df_simple
except Exception as recovery_e:
log_error(f"Recovery fallito: {recovery_e}")
return pd.DataFrame()
except Exception as e:
end_progress(False)
log_error(f"Errore generale estrazione server remoto: {e}")
return pd.DataFrame()
def prepare_data_simple(df):
"""Preparazione dati con feedback dettagliato"""
try:
log_phase("Preparazione dati per addestramento")
if df.empty:
log_error("DataFrame vuoto")
return None, None
start_progress(6, "Preparazione Features")
# Feature minimaliste - solo 50 feature per velocità massima
feature_data = {}
n_samples = len(df)
log_info(f"Preparazione feature per {n_samples:,} campioni")
# Step 1: Feature temporali base (10 feature)
update_progress(1, "Estrazione feature temporali...")
if 'Data' in df.columns and 'Ora' in df.columns:
try:
df['Data'] = pd.to_datetime(df['Data'], errors='coerce')
df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce')
df['Timestamp'] = df['Data'] + df['Ora']
feature_data['hour'] = df['Timestamp'].dt.hour.fillna(0).values
feature_data['day'] = df['Timestamp'].dt.dayofweek.fillna(0).values
feature_data['minute'] = df['Timestamp'].dt.minute.fillna(0).values
log_info("✓ Feature temporali estratte da Data/Ora")
except:
feature_data['hour'] = np.zeros(n_samples)
feature_data['day'] = np.zeros(n_samples)
feature_data['minute'] = np.zeros(n_samples)
log_warning("⚠ Fallback per feature temporali")
else:
feature_data['hour'] = np.zeros(n_samples)
feature_data['day'] = np.zeros(n_samples)
feature_data['minute'] = np.zeros(n_samples)
log_warning("⚠ Colonne Data/Ora non trovate")
# Aggiungi 7 feature temporali semplici
for i in range(7):
feature_data[f'time_{i}'] = np.random.random(n_samples) * 0.1
# Step 2: Feature protocollo semplici (15 feature)
update_progress(1, "Analisi protocolli di rete...")
if 'Messaggio1' in df.columns:
proto_data = df['Messaggio1'].fillna('').astype(str)
protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'SSH', 'FTP', 'DNS']
protocol_counts = {}
for i, protocol in enumerate(protocols):
matches = proto_data.str.contains(protocol, case=False).astype(int)
feature_data[f'proto_{i}'] = matches.values
protocol_counts[protocol] = matches.sum()
log_info(f"✓ Protocolli rilevati: {protocol_counts}")
# Riempi rimanenti
for i in range(len(protocols), 15):
feature_data[f'proto_{i}'] = np.zeros(n_samples)
else:
for i in range(15):
feature_data[f'proto_{i}'] = np.zeros(n_samples)
log_warning("⚠ Colonna Messaggio1 non trovata")
# Step 3: Feature Host semplici (5 feature)
update_progress(1, "Analisi host di rete...")
if 'Host' in df.columns:
host_data = df['Host'].fillna('').astype(str)
fibra_count = host_data.str.contains('FIBRA', case=False).sum()
empty_count = df['Host'].isna().sum()
feature_data['host_fibra'] = host_data.str.contains('FIBRA', case=False).astype(int).values
feature_data['host_empty'] = df['Host'].isna().astype(int).values
feature_data['host_len'] = host_data.str.len().values / 100.0
log_info(f"✓ Host FIBRA: {fibra_count}, Host vuoti: {empty_count}")
else:
feature_data['host_fibra'] = np.zeros(n_samples)
feature_data['host_empty'] = np.zeros(n_samples)
feature_data['host_len'] = np.zeros(n_samples)
log_warning("⚠ Colonna Host non trovata")
# Riempi fino a 5
for i in range(3, 5):
feature_data[f'host_{i}'] = np.zeros(n_samples)
# Step 4: Feature IP semplici (10 feature)
update_progress(1, "Elaborazione indirizzi IP...")
if 'Messaggio2' in df.columns:
ip_data = df['Messaggio2'].str.split(':').str[0].fillna('unknown').astype(str)
unique_ips = len(ip_data.unique())
# Hash semplice per IP
for i in range(10):
feature_data[f'ip_{i}'] = (pd.util.hash_array(ip_data.values) % (2**(i+3))) / (2**(i+3))
log_info(f"✓ IP unici elaborati: {unique_ips:,}")
else:
for i in range(10):
feature_data[f'ip_{i}'] = np.zeros(n_samples)
log_warning("⚠ Colonna Messaggio2 non trovata")
# Step 5: Feature ID semplici (10 feature)
update_progress(1, "Elaborazione ID record...")
if 'ID' in df.columns:
id_values = df['ID'].fillna(0).values
id_min, id_max = id_values.min(), id_values.max()
id_normalized = (id_values - id_min) / (id_max - id_min + 1)
for i in range(10):
feature_data[f'id_{i}'] = np.roll(id_normalized, i) * (0.9 ** i)
log_info(f"✓ Range ID: {id_min:,} - {id_max:,}")
else:
for i in range(10):
feature_data[f'id_{i}'] = np.zeros(n_samples)
log_warning("⚠ Colonna ID non trovata")
# Step 6: Finalizzazione
update_progress(1, "Finalizzazione matrice feature...")
# Verifica che abbiamo esattamente 50 feature
total_features = len(feature_data)
if total_features != 50:
log_warning(f"Feature count: {total_features}, aggiustando a 50")
if total_features < 50:
for i in range(total_features, 50):
feature_data[f'extra_{i}'] = np.zeros(n_samples)
else:
# Rimuovi feature in eccesso
keys_to_remove = list(feature_data.keys())[50:]
for key in keys_to_remove:
del feature_data[key]
# Crea array numpy
feature_names = sorted(feature_data.keys())
X = np.column_stack([feature_data[name] for name in feature_names])
# Preprocessor semplice
preprocessor = {
'feature_columns': feature_names,
'n_features': len(feature_names)
}
# Salva preprocessor
try:
dump(preprocessor, PREPROCESSOR_PATH)
log_info(f"✓ Preprocessor salvato: {X.shape[1]} feature")
except Exception as e:
log_warning(f"Errore salvataggio preprocessor: {e}")
end_progress(True)
log_result(f"Matrice preparata: {X.shape[0]:,} esempi × {X.shape[1]} feature")
return X, preprocessor
except Exception as e:
end_progress(False)
log_error(f"Errore preparazione dati: {e}")
return None, None
def detect_and_filter_anomalies(df, contamination=0.01, flood_threshold=10.0):
"""
Pre-filtering: rileva e rimuove anomalie evidenti dai dati di training
per evitare data poisoning accidentale
Args:
df: DataFrame con i dati
contamination: Percentuale stimata di contaminazione (non usata)
flood_threshold: Soglia record/sec oltre la quale si considera flooding
"""
try:
log_phase("Pre-filtering anomalie dai dati di training")
if df.empty or len(df) < 100:
log_warning("Dataset troppo piccolo per pre-filtering")
return df, {}
original_count = len(df)
log_info(f"Record originali: {original_count:,}")
start_progress(4, "Pre-filtering Anomalie")
# Step 1: Analisi pattern evidenti di flooding
update_progress(1, "Rilevamento pattern flooding...")
flooding_ips = set()
if 'Messaggio2' in df.columns:
# Conta connessioni per IP in finestre temporali strette
df['IP'] = df['Messaggio2'].str.split(':').str[0]
# Analizza IP con troppi record in poco tempo
if 'Data' in df.columns and 'Ora' in df.columns:
try:
df['DateTime'] = pd.to_datetime(df['Data'].astype(str) + ' ' + df['Ora'].astype(str), errors='coerce')
df['DateTime'] = df['DateTime'].fillna(pd.Timestamp.now())
# Raggruppa per IP e analizza densità temporale
ip_stats = df.groupby('IP').agg({
'DateTime': ['count', 'min', 'max'],
'ID': 'count'
}).reset_index()
ip_stats.columns = ['IP', 'count', 'min_time', 'max_time', 'total_records']
ip_stats['time_span_seconds'] = (ip_stats['max_time'] - ip_stats['min_time']).dt.total_seconds()
ip_stats['records_per_second'] = ip_stats['total_records'] / (ip_stats['time_span_seconds'] + 1)
# Soglie per flooding detection
high_volume_threshold = df['IP'].value_counts().quantile(0.95) # Top 5% IP per volume
high_rate_threshold = flood_threshold # >X record/sec indica flooding
# Identifica IP sospetti
suspicious_ips = ip_stats[
(ip_stats['total_records'] > high_volume_threshold) |
(ip_stats['records_per_second'] > high_rate_threshold)
]
if not suspicious_ips.empty:
flooding_ips = set(suspicious_ips['IP'].tolist())
log_warning(f"IP sospetti identificati: {len(flooding_ips)}")
# Mostra esempi
for ip in list(flooding_ips)[:3]:
ip_data = suspicious_ips[suspicious_ips['IP'] == ip].iloc[0]
log_info(f" - {ip}: {ip_data['total_records']} record, {ip_data['records_per_second']:.1f} rec/sec")
except Exception as e:
log_warning(f"Errore analisi temporale: {e}")
# Step 2: Rilevamento pattern protocollo anomali
update_progress(1, "Analisi pattern protocollo...")
protocol_anomalies = set()
if 'Messaggio1' in df.columns:
# Analizza distribuzione protocolli per IP
proto_analysis = df.groupby('IP')['Messaggio1'].apply(
lambda x: x.str.contains('TCP', case=False).sum() / len(x) if len(x) > 10 else 0
)
# IP con percentuale TCP anomala (>99% indica flooding)
high_tcp_ips = proto_analysis[proto_analysis > 0.99].index.tolist()
protocol_anomalies = set(high_tcp_ips)
if protocol_anomalies:
log_warning(f"IP con pattern protocollo anomalo: {len(protocol_anomalies)}")
# Step 3: Combinazione anomalie
update_progress(1, "Consolidamento anomalie...")
all_anomalous_ips = flooding_ips.union(protocol_anomalies)
# Step 4: Filtering dei record
update_progress(1, "Rimozione record anomali...")
if all_anomalous_ips:
# Rimuovi record dagli IP anomali
clean_df = df[~df['IP'].isin(all_anomalous_ips)].copy()
# Pulizia colonne temporanee
clean_df = clean_df.drop(columns=['IP', 'DateTime'], errors='ignore')
filtered_count = original_count - len(clean_df)
filtered_percentage = (filtered_count / original_count) * 100
log_warning(f"Record filtrati: {filtered_count:,} ({filtered_percentage:.1f}%)")
log_result(f"Dataset pulito: {len(clean_df):,} record")
# Statistiche di filtering
filtering_stats = {
'original_records': original_count,
'filtered_records': filtered_count,
'clean_records': len(clean_df),
'filtering_percentage': filtered_percentage,
'anomalous_ips': len(all_anomalous_ips),
'flooding_ips': len(flooding_ips),
'protocol_anomalies': len(protocol_anomalies)
}
end_progress(True)
return clean_df, filtering_stats
else:
# Nessuna anomalia rilevata
clean_df = df.drop(columns=['IP'], errors='ignore')
log_info("✓ Nessuna anomalia rilevata - dataset già pulito")
end_progress(True)
return clean_df, {'clean_records': len(clean_df), 'filtered_records': 0}
except Exception as e:
end_progress(False)
log_error(f"Errore pre-filtering: {e}")
# Ritorna dataset originale in caso di errore
return df, {'error': str(e)}
def train_models_simple(X):
"""Addestramento con feedback dettagliato"""
try:
log_phase("Addestramento modelli di machine learning")
if X.shape[0] < MIN_TRAINING_SAMPLES:
log_error(f"Troppo pochi campioni: {X.shape[0]:,} < {MIN_TRAINING_SAMPLES:,}")
return None, None, None, None
start_progress(7, "Addestramento Modelli")
# Step 1: Preparazione dataset
update_progress(1, "Preparazione dataset di addestramento...")
# Campionamento se necessario
if X.shape[0] > MAX_TRAINING_SAMPLES:
log_warning(f"Campionamento da {X.shape[0]:,} a {MAX_TRAINING_SAMPLES:,}")
indices = np.random.choice(X.shape[0], MAX_TRAINING_SAMPLES, replace=False)
X_train = X[indices]
else:
X_train = X
log_info(f"Dataset finale: {X_train.shape[0]:,} esempi × {X_train.shape[1]} feature")
# Step 2: Isolation Forest
update_progress(1, "Addestramento Isolation Forest (anomaly detection)...")
log_info("🌲 Configurazione: 30 alberi, 500 campioni max")
if_start = time.time()
if_model = IsolationForest(
n_estimators=30,
contamination=0.05,
random_state=42,
n_jobs=2,
max_samples=min(500, X_train.shape[0]),
max_features=0.7
)
if_model.fit(X_train)
if_elapsed = time.time() - if_start
log_result(f"✓ Isolation Forest completato in {if_elapsed:.1f}s")
# Step 3: LOF
update_progress(1, "Addestramento Local Outlier Factor...")
lof_sample_size = min(5000, X_train.shape[0])
if X_train.shape[0] > lof_sample_size:
lof_indices = np.random.choice(X_train.shape[0], lof_sample_size, replace=False)
lof_sample = X_train[lof_indices]
log_info(f"🔍 LOF su campione di {lof_sample_size:,} esempi")
else:
lof_sample = X_train
log_info(f"🔍 LOF su tutti i {X_train.shape[0]:,} esempi")
lof_start = time.time()
lof_model = LocalOutlierFactor(
n_neighbors=min(5, lof_sample.shape[0] // 50),
contamination=0.05,
novelty=True,
n_jobs=2
)
lof_model.fit(lof_sample)
lof_elapsed = time.time() - lof_start
log_result(f"✓ LOF completato in {lof_elapsed:.1f}s")
# Step 4: SVM
update_progress(1, "Addestramento One-Class SVM...")
svm_sample_size = min(2000, X_train.shape[0])
if X_train.shape[0] > svm_sample_size:
svm_indices = np.random.choice(X_train.shape[0], svm_sample_size, replace=False)
svm_sample = X_train[svm_indices]
log_info(f"⚙️ SVM su campione di {svm_sample_size:,} esempi")
else:
svm_sample = X_train
log_info(f"⚙️ SVM su tutti i {X_train.shape[0]:,} esempi")
svm_start = time.time()
svm_model = OneClassSVM(
kernel='rbf',
gamma='scale',
nu=0.05
)
svm_model.fit(svm_sample)
svm_elapsed = time.time() - svm_start
log_result(f"✓ SVM completato in {svm_elapsed:.1f}s")
# Step 5: Configurazione ensemble
update_progress(1, "Configurazione ensemble weights...")
ensemble_weights = {
'isolation_forest': 0.70, # Peso maggiore per IF
'lof': 0.20,
'svm': 0.10
}
log_info(f"🎯 Pesi ensemble: IF={ensemble_weights['isolation_forest']}, LOF={ensemble_weights['lof']}, SVM={ensemble_weights['svm']}")
# Step 6: Salvataggio modelli
update_progress(1, "Salvataggio modelli su disco...")
try:
save_start = time.time()
dump(if_model, IF_MODEL_PATH)
log_info(f"💾 Isolation Forest salvato: {os.path.getsize(IF_MODEL_PATH)/1024:.1f} KB")
dump(lof_model, LOF_MODEL_PATH)
log_info(f"💾 LOF salvato: {os.path.getsize(LOF_MODEL_PATH)/1024:.1f} KB")
dump(svm_model, SVM_MODEL_PATH)
log_info(f"💾 SVM salvato: {os.path.getsize(SVM_MODEL_PATH)/1024:.1f} KB")
dump(ensemble_weights, ENSEMBLE_MODEL_PATH)
log_info(f"💾 Ensemble weights salvato: {os.path.getsize(ENSEMBLE_MODEL_PATH)/1024:.1f} KB")
save_elapsed = time.time() - save_start
log_result(f"✓ Tutti i modelli salvati in {save_elapsed:.1f}s")
except Exception as e:
log_error(f"Errore salvataggio: {e}")
return None, None, None, None
# Step 7: Finalizzazione
update_progress(1, "Finalizzazione addestramento...")
total_time = if_elapsed + lof_elapsed + svm_elapsed + save_elapsed
log_info(f"⏱️ Tempo totale addestramento: {total_time:.1f}s")
end_progress(True)
return if_model, lof_model, svm_model, ensemble_weights
except Exception as e:
end_progress(False)
log_error(f"Errore addestramento: {e}")
return None, None, None, None
def save_model_timestamp():
"""Salva timestamp con feedback"""
try:
timestamp = datetime.now().isoformat()
with open(os.path.join(MODEL_DIR, 'last_training.txt'), 'w') as f:
f.write(timestamp)
log_result(f"Timestamp salvato: {timestamp}")
return True
except Exception as e:
log_warning(f"Errore salvataggio timestamp: {e}")
return False
def needs_training(force_training=False, training_frequency_hours=12.0):
"""Verifica necessità addestramento con feedback"""
if force_training:
log_result("🔄 Riaddestramento forzato richiesto")
return True
try:
timestamp_file = os.path.join(MODEL_DIR, 'last_training.txt')
if not os.path.exists(timestamp_file):
log_result("🆕 Nessun addestramento precedente trovato")
return True
with open(timestamp_file, 'r') as f:
last_trained_str = f.read().strip()
last_trained = datetime.fromisoformat(last_trained_str)
now = datetime.now()
hours_diff = (now - last_trained).total_seconds() / 3600
log_info(f"📅 Ultimo addestramento: {last_trained.strftime('%Y-%m-%d %H:%M:%S')}")
log_info(f"⏰ Ore trascorse: {hours_diff:.1f}")
if hours_diff >= training_frequency_hours:
log_result(f"✅ Addestramento necessario (>{training_frequency_hours}h)")
return True
else:
log_result(f"⏭️ Addestramento non necessario (<{training_frequency_hours}h)")
return False
except Exception as e:
log_warning(f"Errore verifica timestamp: {e}")
return True
def test_database_connection():
"""Test connessione con feedback dettagliato e query ultra-leggere per server remoti"""
try:
log_phase("Test connessione database")
engine = connect_to_database()
if not engine:
return False
with engine.connect() as conn:
# Test base con timeout
show_spinner("Test query base...", 1)
try:
result = conn.execute(text("SELECT 1")).fetchone()
if result and result[0] == 1:
log_result("✓ Query base funzionante")
# Test tabella Esterna con query ULTRA-LEGGERA
show_spinner("Test esistenza tabella Esterna...", 1)
try:
# Query leggerissima - solo verifica che la tabella esista
exists_result = conn.execute(text("SHOW TABLES LIKE 'Esterna'")).fetchone()
if exists_result:
log_result("✓ Tabella Esterna trovata")
# Test velocissimo - solo 1 record
show_spinner("Test accesso record...", 1)
try:
# Query ultra-veloce - solo 1 record, nessun COUNT
sample_result = conn.execute(text("SELECT ID FROM Esterna LIMIT 1")).fetchone()
if sample_result:
log_info(f"✓ Esempio ID record: {sample_result[0]}")
# Test colonne base senza COUNT pesante
show_spinner("Verifica colonne base...", 1)
try:
# Query veloce per vedere le colonne
columns_result = conn.execute(text("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'Esterna' AND TABLE_SCHEMA = DATABASE() LIMIT 5")).fetchall()
if columns_result:
column_names = [col[0] for col in columns_result]
log_info(f"✓ Prime colonne: {', '.join(column_names)}")
# Test ID massimo con LIMIT per sicurezza
show_spinner("Test ID massimo...", 1)
try:
max_id_result = conn.execute(text("SELECT MAX(ID) FROM Esterna")).fetchone()
if max_id_result and max_id_result[0]:
log_info(f"✓ ID massimo: {max_id_result[0]:,}")
else:
log_warning("⚠ Nessun ID massimo trovato")
except Exception as e:
log_warning(f"⚠ Test ID massimo saltato: {e}")
except Exception as e:
log_warning(f"⚠ Test colonne saltato: {e}")
else:
log_warning("⚠ Tabella Esterna vuota o inaccessibile")
except Exception as e:
log_warning(f"⚠ Test accesso record fallito: {e}")
else:
log_error("✗ Tabella 'Esterna' non trovata")
return False
except Exception as e:
log_error(f"✗ Test tabella Esterna fallito: {e}")
return False
log_result("🎉 Tutti i test database superati!")
return True
else:
log_error("✗ Query base fallita")
return False
except Exception as e:
log_error(f"✗ Errore test query base: {e}")
return False
return False
except Exception as e:
log_error(f"✗ Errore generale test database: {e}")
return False
def main():
"""Funzione principale con feedback dettagliato"""
parser = argparse.ArgumentParser(description='Addestramento semplificato v02 - Con feedback dettagliato')
parser.add_argument('--force-training', action='store_true', help='Forza riaddestramento')
parser.add_argument('--test', action='store_true', help='Test connessione')
parser.add_argument('--max-records', type=int, default=50000, help='Max record (default: 50k)')
parser.add_argument('--debug', action='store_true', help='Debug logging')
parser.add_argument('--demo', action='store_true', help='Modalità demo con dati simulati (senza database)')
parser.add_argument('--training-hours', type=float, default=12.0, help='Ore minime tra addestramenti (default: 12)')
parser.add_argument('--no-filtering', action='store_true', help='Disabilita pre-filtering anomalie (non raccomandato)')
parser.add_argument('--filtering-sensitivity', type=float, default=10.0, help='Soglia filtering: >X record/sec = flooding (default: 10)')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Header con informazioni
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}🤖 SISTEMA ADDESTRAMENTO DDoS v02 - FEEDBACK DETTAGLIATO{Colors.END}")
print(f"{Colors.BOLD}{Colors.CYAN}{'='*60}{Colors.END}")
log_info(f"📊 Configurazione: max {args.max_records:,} record")
log_info(f"🔧 Debug mode: {'ON' if args.debug else 'OFF'}")
log_info(f"🔄 Force training: {'ON' if args.force_training else 'OFF'}")
log_info(f"⏰ Frequenza addestramento: {args.training_hours} ore")
log_info(f"🛡️ Pre-filtering: {'OFF' if args.no_filtering else 'ON'}")
if not args.no_filtering:
log_info(f"⚡ Soglia flooding: >{args.filtering_sensitivity} record/sec")
if args.demo:
log_info("🎭 Modalità DEMO attivata (dati simulati)")
start_time = time.time()
# Test veloce
if args.test:
if args.demo:
log_result("🎭 Test in modalità demo - tutti i test simulati superati!")
print(f"\n{Colors.BOLD}{Colors.GREEN}🎉 TUTTI I TEST DEMO SUPERATI!{Colors.END}")
sys.exit(0)
else:
if test_database_connection():
print(f"\n{Colors.BOLD}{Colors.GREEN}🎉 TUTTI I TEST SUPERATI!{Colors.END}")
sys.exit(0)
else:
print(f"\n{Colors.BOLD}{Colors.RED}❌ TEST FALLITI!{Colors.END}")
sys.exit(1)
# Modalità demo senza database
if args.demo:
log_warning("🎭 Modalità DEMO: Utilizzando dati simulati invece del database")
# Genera dati simulati
log_phase("Generazione dati simulati")
start_progress(3, "Simulazione Dataset")
update_progress(1, f"Creazione {args.max_records:,} record simulati...")
time.sleep(1) # Simula tempo di caricamento
# Crea DataFrame simulato
np.random.seed(42)
df = pd.DataFrame({
'ID': range(1, args.max_records + 1),
'Data': pd.date_range('2024-01-01', periods=args.max_records, freq='1min'),
'Ora': ['12:00:00'] * args.max_records,
'Host': np.random.choice(['FIBRA-HOST-001', 'FIBRA-HOST-002', 'SERVER-001'], args.max_records),
'IndirizzoIP': [f"192.168.{np.random.randint(1,255)}.{np.random.randint(1,255)}" for _ in range(args.max_records)],
'Messaggio1': np.random.choice(['TCP', 'UDP', 'HTTP', 'SSH'], args.max_records),
'Messaggio2': [f"10.0.{np.random.randint(1,255)}.{np.random.randint(1,255)}:{np.random.randint(1000,9999)}" for _ in range(args.max_records)],
'Messaggio3': [f"192.168.1.{np.random.randint(1,255)}:{np.random.randint(80,443)}" for _ in range(args.max_records)]
})
update_progress(1, "Validazione dati simulati...")
time.sleep(0.5)
log_info(f"✓ Dataset simulato creato: {len(df):,} record")
log_info(f"✓ Colonne: {list(df.columns)}")
log_info(f"✓ Range ID: {df['ID'].min()} - {df['ID'].max()}")
update_progress(1, "Finalizzazione dataset simulato...")
time.sleep(0.5)
end_progress(True)
else:
# Test connessione normale
if not test_database_connection():
log_error("Database non raggiungibile - Impossibile continuare")
log_info("💡 Prova con --demo per testare senza database")
sys.exit(1)
# Verifica necessità
if not needs_training(args.force_training, args.training_hours):
print(f"\n{Colors.BOLD}{Colors.YELLOW}⏭️ ADDESTRAMENTO NON NECESSARIO{Colors.END}")
sys.exit(0)
# Connessione ed estrazione normali
engine = connect_to_database()
if not engine:
log_error("Connessione fallita")
sys.exit(1)
df = extract_data_simple(engine, args.max_records)
if df.empty:
log_error("Nessun dato estratto - Impossibile continuare")
sys.exit(1)
try:
# PRE-FILTERING: Rimuovi anomalie evidenti prima dell'addestramento
if not args.demo and not args.no_filtering:
# Solo su dati reali, non su simulati, e se filtering è abilitato
log_phase("🛡️ PREVENZIONE DATA POISONING")
df_clean, filtering_stats = detect_and_filter_anomalies(df, flood_threshold=args.filtering_sensitivity)
if 'error' in filtering_stats:
log_warning(f"Pre-filtering fallito: {filtering_stats['error']}")
log_info("Continuo con dataset originale")
df_clean = df
else:
# Mostra statistiche di filtering
if filtering_stats.get('filtered_records', 0) > 0:
log_result(f"🔄 Dataset ripulito:")
log_info(f" • Record originali: {filtering_stats['original_records']:,}")
log_info(f" • Record filtrati: {filtering_stats['filtered_records']:,}")
log_info(f" • Record puliti: {filtering_stats['clean_records']:,}")
log_info(f" • Percentuale filtrata: {filtering_stats.get('filtering_percentage', 0):.1f}%")
log_info(f" • IP anomali rimossi: {filtering_stats.get('anomalous_ips', 0)}")
else:
log_result("✅ Dataset già pulito - nessuna anomalia rilevata")
else:
# In modalità demo o con filtering disabilitato, usa dati originali
df_clean = df
if args.no_filtering:
log_warning("⚠️ Pre-filtering DISABILITATO - rischio data poisoning!")
else:
log_info("🎭 Demo mode: pre-filtering saltato")
# Preparazione con dataset pulito
X, preprocessor = prepare_data_simple(df_clean)
if X is None:
log_error("Preparazione dati fallita")
sys.exit(1)
# Addestramento
models = train_models_simple(X)
if all(m is not None for m in models):
# Salva timestamp
save_model_timestamp()
# Statistiche finali
elapsed = time.time() - start_time
print(f"\n{Colors.BOLD}{Colors.GREEN}{'='*60}{Colors.END}")
if args.demo:
print(f"{Colors.BOLD}{Colors.GREEN}🎭 ADDESTRAMENTO DEMO COMPLETATO CON SUCCESSO!{Colors.END}")
else:
print(f"{Colors.BOLD}{Colors.GREEN}🎉 ADDESTRAMENTO COMPLETATO CON SUCCESSO!{Colors.END}")
print(f"{Colors.BOLD}{Colors.GREEN}{'='*60}{Colors.END}")
log_result(f"⏱️ Tempo totale: {elapsed:.1f} secondi")
log_result(f"📊 Campioni processati: {X.shape[0]:,}")
log_result(f"🔢 Feature generate: {X.shape[1]}")
log_result(f"💾 Modelli salvati in: {MODEL_DIR}")
if args.demo:
print(f"\n{Colors.CYAN}🎭 Modalità demo completata! I modelli funzionano con dati simulati.{Colors.END}")
print(f"{Colors.CYAN} Per usare dati reali, assicurati che MySQL sia in esecuzione.{Colors.END}\n")
else:
print(f"\n{Colors.CYAN}🚀 Ora puoi eseguire il rilevamento con:{Colors.END}")
print(f"{Colors.CYAN} python detect_multi_03.py --batch-size 1000 --debug{Colors.END}\n")
else:
log_error("Addestramento fallito")
sys.exit(1)
except Exception as e:
log_error(f"Errore generale: {e}")
logging.error(traceback.format_exc())
sys.exit(1)
if __name__ == "__main__":
main()