Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
910 lines
35 KiB
Python
910 lines
35 KiB
Python
#!/usr/bin/env python3
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.sql import text
|
|
from sklearn.ensemble import IsolationForest
|
|
from sklearn.neighbors import LocalOutlierFactor
|
|
from sklearn.svm import OneClassSVM
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from joblib import dump, load
|
|
import logging
|
|
import gc
|
|
import os
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
import numpy as np
|
|
import argparse
|
|
import sys
|
|
import traceback
|
|
import threading
|
|
import psutil
|
|
import warnings
|
|
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.metrics import classification_report
|
|
import multiprocessing
|
|
warnings.filterwarnings('ignore')
|
|
|
|
# Configurazione del logging ottimizzata
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Cambiato da DEBUG a INFO per performance
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('analisys_debug.log')
|
|
]
|
|
)
|
|
|
|
# Configurazione del database
|
|
DB_USER = os.environ.get('DB_USER', 'root')
|
|
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
|
|
DB_HOST = os.environ.get('DB_HOST', 'localhost')
|
|
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
|
|
|
|
# Cartella per i modelli con gestione errori ottimizzata
|
|
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
|
|
try:
|
|
os.makedirs(MODEL_DIR, exist_ok=True)
|
|
logging.debug(f"Directory models creata/verificata: {MODEL_DIR}")
|
|
except Exception as e:
|
|
logging.error(f"Errore nella creazione della directory models: {e}")
|
|
MODEL_DIR = os.path.join(os.getcwd(), 'models')
|
|
try:
|
|
os.makedirs(MODEL_DIR, exist_ok=True)
|
|
logging.debug(f"Directory models creata come fallback in: {MODEL_DIR}")
|
|
except Exception as e2:
|
|
logging.error(f"Impossibile creare la directory models: {e2}")
|
|
MODEL_DIR = '.'
|
|
|
|
# Percorsi dei modelli
|
|
IF_MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
|
|
LOF_MODEL_PATH = os.path.join(MODEL_DIR, 'lof.joblib')
|
|
SVM_MODEL_PATH = os.path.join(MODEL_DIR, 'svm.joblib')
|
|
ENSEMBLE_MODEL_PATH = os.path.join(MODEL_DIR, 'ensemble_weights.joblib')
|
|
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
|
|
ACCUMULATED_DATA_PATH = os.path.join(MODEL_DIR, 'accumulated_data.pkl')
|
|
LAST_TRAINING_PATH = os.path.join(MODEL_DIR, 'last_training.txt')
|
|
|
|
# Parametri di configurazione ottimizzati
|
|
TRAINING_FREQUENCY_HOURS = 12
|
|
CONTINUOUS_LEARNING = True
|
|
MAX_MEMORY_USAGE = 85 # Percentuale massima di memoria utilizzabile
|
|
CHUNK_SIZE = 10000 # Dimensione chunk per elaborazione
|
|
MAX_TRAINING_SAMPLES = 500000 # Limite massimo campioni per addestramento
|
|
MIN_TRAINING_SAMPLES = 1000 # Minimo campioni necessari
|
|
|
|
# Colori per output
|
|
class Colors:
|
|
HEADER = '\033[95m'
|
|
BLUE = '\033[94m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
END = '\033[0m'
|
|
|
|
def log_phase(message):
|
|
"""Evidenzia una nuova fase principale dell'esecuzione"""
|
|
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n")
|
|
logging.info(f"FASE: {message}")
|
|
|
|
def log_result(message):
|
|
"""Evidenzia un risultato importante"""
|
|
print(f"{Colors.BLUE}✓ {message}{Colors.END}")
|
|
logging.info(f"RISULTATO: {message}")
|
|
|
|
def log_warning(message):
|
|
"""Evidenzia un avviso importante"""
|
|
print(f"{Colors.YELLOW}⚠ {message}{Colors.END}")
|
|
logging.warning(message)
|
|
|
|
def log_error(message):
|
|
"""Evidenzia un errore importante"""
|
|
print(f"{Colors.RED}✗ {message}{Colors.END}")
|
|
logging.error(message)
|
|
|
|
# Variabili globali per il tracciamento dell'avanzamento
|
|
progress_status = {
|
|
'in_progress': False,
|
|
'operation': '',
|
|
'start_time': None,
|
|
'current_step': 0,
|
|
'total_steps': 0,
|
|
'details': '',
|
|
'last_update': 0
|
|
}
|
|
|
|
def check_memory_usage():
|
|
"""Controlla l'utilizzo della memoria e forza garbage collection se necessario"""
|
|
memory_percent = psutil.virtual_memory().percent
|
|
if memory_percent > MAX_MEMORY_USAGE:
|
|
logging.warning(f"Utilizzo memoria alto: {memory_percent}%. Forzando garbage collection...")
|
|
gc.collect()
|
|
return True
|
|
return False
|
|
|
|
def start_progress_tracking(operation, total_steps=100):
|
|
"""Inizia il tracciamento di un'operazione lunga"""
|
|
global progress_status
|
|
progress_status['in_progress'] = True
|
|
progress_status['operation'] = operation
|
|
progress_status['start_time'] = time.time()
|
|
progress_status['current_step'] = 0
|
|
progress_status['total_steps'] = total_steps
|
|
progress_status['details'] = 'Inizializzazione...'
|
|
progress_status['last_update'] = 0
|
|
|
|
threading.Thread(target=progress_reporter, daemon=True).start()
|
|
logging.info(f"Avvio: {operation} (totale passi previsti: {total_steps})")
|
|
|
|
def update_progress(step=None, increment=1, details=''):
|
|
"""Aggiorna lo stato di avanzamento di un'operazione lunga"""
|
|
global progress_status
|
|
if not progress_status['in_progress']:
|
|
return
|
|
|
|
if step is not None:
|
|
progress_status['current_step'] = step
|
|
else:
|
|
progress_status['current_step'] += increment
|
|
|
|
if details:
|
|
progress_status['details'] = details
|
|
|
|
if details and (time.time() - progress_status['last_update']) > 5:
|
|
report_progress()
|
|
|
|
def end_progress_tracking(success=True):
|
|
"""Termina il tracciamento dell'avanzamento"""
|
|
global progress_status
|
|
if not progress_status['in_progress']:
|
|
return
|
|
|
|
elapsed = time.time() - progress_status['start_time']
|
|
if success:
|
|
logging.info(f"Completato: {progress_status['operation']} in {elapsed:.1f} secondi")
|
|
else:
|
|
logging.error(f"Fallito: {progress_status['operation']} dopo {elapsed:.1f} secondi")
|
|
|
|
progress_status['in_progress'] = False
|
|
progress_status['current_step'] = progress_status['total_steps']
|
|
report_progress(force=True)
|
|
|
|
def report_progress(force=False):
|
|
"""Riporta lo stato di avanzamento attuale"""
|
|
global progress_status
|
|
if not progress_status['in_progress'] and not force:
|
|
return
|
|
|
|
current_time = time.time()
|
|
if not force and (current_time - progress_status['last_update']) < 30:
|
|
return
|
|
|
|
elapsed = current_time - progress_status['start_time']
|
|
percent = (progress_status['current_step'] / progress_status['total_steps']) * 100 if progress_status['total_steps'] > 0 else 0
|
|
|
|
remaining = "N/A"
|
|
if percent > 0:
|
|
remaining_seconds = (elapsed / percent) * (100 - percent)
|
|
if remaining_seconds < 60:
|
|
remaining = f"{remaining_seconds:.0f} secondi"
|
|
elif remaining_seconds < 3600:
|
|
remaining = f"{remaining_seconds/60:.1f} minuti"
|
|
else:
|
|
remaining = f"{remaining_seconds/3600:.1f} ore"
|
|
|
|
memory_usage = psutil.virtual_memory().percent
|
|
message = f"""
|
|
{Colors.BOLD}======== PROGRESSO ADDESTRAMENTO ========{Colors.END}
|
|
Operazione: {progress_status['operation']}
|
|
Completamento: {percent:.1f}%
|
|
Tempo trascorso: {elapsed:.1f} secondi
|
|
Tempo rimanente: {remaining}
|
|
Memoria utilizzata: {memory_usage:.1f}%
|
|
Dettagli: {progress_status['details']}
|
|
{Colors.BOLD}=========================================={Colors.END}
|
|
"""
|
|
print(message)
|
|
logging.info(f"Progresso {progress_status['operation']}: {percent:.1f}% - {progress_status['details']}")
|
|
progress_status['last_update'] = current_time
|
|
|
|
def progress_reporter():
|
|
"""Thread che riporta periodicamente i progressi"""
|
|
while progress_status['in_progress']:
|
|
report_progress()
|
|
time.sleep(5)
|
|
|
|
def extract_time_features_optimized(df):
|
|
"""Estrae caratteristiche temporali dai dati in modo ottimizzato"""
|
|
try:
|
|
logging.info("Estrazione caratteristiche temporali ottimizzata...")
|
|
|
|
# Converti timestamp in modo vettorizzato
|
|
if 'Timestamp' not in df.columns and 'Data' in df.columns and 'Ora' in df.columns:
|
|
df['Data'] = pd.to_datetime(df['Data'], errors='coerce')
|
|
df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce')
|
|
df['Timestamp'] = df['Data'] + df['Ora']
|
|
|
|
if 'Timestamp' in df.columns:
|
|
df['hour_of_day'] = df['Timestamp'].dt.hour
|
|
df['day_of_week'] = df['Timestamp'].dt.dayofweek
|
|
else:
|
|
df['hour_of_day'] = 0
|
|
df['day_of_week'] = 0
|
|
|
|
# Estrazione IP ottimizzata
|
|
update_progress(details="Estrazione IP e porte (ottimizzata)")
|
|
if 'Messaggio2' in df.columns:
|
|
# Usa operazioni vettorizzate per performance
|
|
df['IP_Attaccante'] = df['Messaggio2'].str.split(':').str[0]
|
|
df['Porta_Attaccante'] = df['Messaggio2'].str.split(':').str[1]
|
|
|
|
if 'Messaggio3' in df.columns:
|
|
df['IP_Attaccato'] = df['Messaggio3'].str.split(':').str[0]
|
|
df['Porta_Attaccato'] = df['Messaggio3'].str.split(':').str[1]
|
|
|
|
# Feature temporali semplificate per performance
|
|
df['time_since_last'] = 0
|
|
df['events_last_hour'] = 0
|
|
df['events_last_day'] = 0
|
|
df['time_since_last_mean'] = 0
|
|
df['time_since_last_std'] = 0
|
|
df['time_since_last_min'] = 0
|
|
df['time_since_last_max'] = 0
|
|
df['events_last_hour_max'] = 0
|
|
df['events_last_day_max'] = 0
|
|
|
|
# Calcolo statistiche solo per campione rappresentativo per velocità
|
|
if len(df) > 10000:
|
|
sample_df = df.sample(n=10000, random_state=42)
|
|
logging.info("Usando campione di 10k record per statistiche temporali")
|
|
else:
|
|
sample_df = df
|
|
|
|
# Calcolo statistiche aggregate per IP (versione ottimizzata)
|
|
if 'IP_Attaccante' in sample_df.columns:
|
|
ip_stats = sample_df.groupby('IP_Attaccante').agg({
|
|
'Timestamp': ['count', 'min', 'max']
|
|
}).fillna(0)
|
|
|
|
# Applica statistiche al dataset completo (approssimazione)
|
|
ip_counts = df['IP_Attaccante'].value_counts()
|
|
df['events_last_hour'] = df['IP_Attaccante'].map(ip_counts).fillna(0)
|
|
df['events_last_day'] = df['events_last_hour'] # Semplificazione
|
|
|
|
logging.info("Caratteristiche temporali estratte con successo (ottimizzate)")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'estrazione delle caratteristiche temporali: {e}")
|
|
return df
|
|
|
|
def connect_to_database():
|
|
"""Connette al database MySQL con configurazione ottimizzata"""
|
|
try:
|
|
logging.info("Connessione al database...")
|
|
|
|
connection_string = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
|
|
|
|
# Configurazione ottimizzata per addestramento
|
|
engine = create_engine(
|
|
connection_string,
|
|
pool_size=5,
|
|
max_overflow=10,
|
|
pool_recycle=3600,
|
|
pool_pre_ping=True,
|
|
pool_timeout=30,
|
|
echo=False,
|
|
connect_args={
|
|
'charset': 'utf8mb4',
|
|
'use_unicode': True,
|
|
'autocommit': False, # False per transazioni di addestramento
|
|
'sql_mode': 'TRADITIONAL'
|
|
}
|
|
)
|
|
|
|
# Test connessione
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SELECT 1")).fetchone()
|
|
|
|
return engine
|
|
except Exception as e:
|
|
logging.error(f"Errore nella connessione al database: {e}")
|
|
return None
|
|
|
|
def extract_data_for_training_optimized(engine, window_hours=12, max_records=500000, batch_size=20000):
|
|
"""Estrae dati per l'addestramento in modo ottimizzato per grandi volumi"""
|
|
try:
|
|
start_progress_tracking(f"estrazione dati addestramento ({window_hours} ore)", 100)
|
|
|
|
log_phase(f"Estrazione dati per addestramento - ultimi {window_hours} ore")
|
|
|
|
# Query di conteggio ottimizzata
|
|
count_query = text("""
|
|
SELECT COUNT(*) AS total
|
|
FROM Esterna
|
|
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
|
|
""")
|
|
|
|
total_count = 0
|
|
with engine.connect() as conn:
|
|
result = conn.execute(count_query, {"window": window_hours}).fetchone()
|
|
total_count = result[0] if result else 0
|
|
|
|
if total_count == 0:
|
|
log_warning("Nessun dato trovato per l'addestramento")
|
|
end_progress_tracking(success=False)
|
|
return pd.DataFrame()
|
|
|
|
# Limita al massimo specificato
|
|
total_count = min(total_count, max_records)
|
|
log_result(f"Trovati {total_count} record, estraendo max {max_records}")
|
|
|
|
# Estrazione ottimizzata con campionamento se necessario
|
|
if total_count > max_records:
|
|
# Usa campionamento casuale per dataset molto grandi
|
|
sample_rate = max_records / total_count
|
|
log_warning(f"Dataset molto grande, usando campionamento {sample_rate:.2%}")
|
|
|
|
query = text("""
|
|
SELECT *
|
|
FROM Esterna
|
|
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
|
|
AND RAND() < :sample_rate
|
|
ORDER BY RAND()
|
|
LIMIT :max_records
|
|
""")
|
|
|
|
update_progress(50, details="Estrazione con campionamento casuale")
|
|
df = pd.read_sql(query, engine, params={
|
|
"window": window_hours,
|
|
"sample_rate": sample_rate,
|
|
"max_records": max_records
|
|
})
|
|
else:
|
|
# Estrazione normale con batching
|
|
offset = 0
|
|
all_data = []
|
|
|
|
while offset < total_count:
|
|
curr_batch_size = min(batch_size, total_count - offset)
|
|
percent_complete = (offset / total_count) * 100
|
|
|
|
update_progress(int(percent_complete),
|
|
details=f"Estrazione batch {offset//batch_size + 1} ({offset}/{total_count} record)")
|
|
|
|
# Controllo memoria
|
|
if check_memory_usage():
|
|
batch_size = max(5000, batch_size // 2)
|
|
log_warning(f"Memoria alta, riducendo batch size a {batch_size}")
|
|
|
|
query = text("""
|
|
SELECT *
|
|
FROM Esterna
|
|
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
|
|
ORDER BY ID DESC
|
|
LIMIT :batch_size OFFSET :offset
|
|
""")
|
|
|
|
batch_data = pd.read_sql(query, engine, params={
|
|
"window": window_hours,
|
|
"batch_size": curr_batch_size,
|
|
"offset": offset
|
|
})
|
|
|
|
if batch_data.empty:
|
|
break
|
|
|
|
all_data.append(batch_data)
|
|
offset += len(batch_data)
|
|
|
|
# Garbage collection periodico
|
|
if len(all_data) % 5 == 0:
|
|
gc.collect()
|
|
|
|
# Combina tutti i batch
|
|
if all_data:
|
|
df = pd.concat(all_data, ignore_index=True)
|
|
del all_data # Libera memoria
|
|
gc.collect()
|
|
else:
|
|
df = pd.DataFrame()
|
|
|
|
update_progress(100, details=f"Estrazione completata: {len(df)} record")
|
|
log_result(f"Estratti {len(df)} record per addestramento")
|
|
end_progress_tracking()
|
|
return df
|
|
|
|
except Exception as e:
|
|
end_progress_tracking(success=False)
|
|
log_error(f"Errore nell'estrazione dei dati: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def process_text_features_optimized(df):
|
|
"""Processa le feature testuali in modo ottimizzato"""
|
|
try:
|
|
logging.info("Elaborazione caratteristiche testuali ottimizzata...")
|
|
|
|
# Preparazione protocollo ottimizzata
|
|
if 'Messaggio1' in df.columns:
|
|
df['Protocollo'] = df['Messaggio1'].fillna('')
|
|
else:
|
|
df['Protocollo'] = ''
|
|
|
|
# TF-IDF ottimizzato con meno feature per velocità
|
|
tfidf_vectorizer = TfidfVectorizer(
|
|
max_features=21, # Ridotto per performance
|
|
stop_words='english',
|
|
ngram_range=(1, 1), # Solo unigrams per velocità
|
|
min_df=2, # Ignora termini troppo rari
|
|
max_df=0.95 # Ignora termini troppo comuni
|
|
)
|
|
|
|
# Campionamento per TF-IDF se dataset troppo grande
|
|
if len(df) > 50000:
|
|
sample_df = df.sample(n=50000, random_state=42)
|
|
logging.info("Usando campione di 50k record per TF-IDF")
|
|
X_protocol = tfidf_vectorizer.fit_transform(sample_df['Protocollo'])
|
|
|
|
# Applica al dataset completo
|
|
X_protocol_full = tfidf_vectorizer.transform(df['Protocollo'])
|
|
else:
|
|
X_protocol_full = tfidf_vectorizer.fit_transform(df['Protocollo'])
|
|
|
|
# Converti in DataFrame
|
|
tfidf_df = pd.DataFrame(
|
|
X_protocol_full.toarray(),
|
|
columns=[f'protocol_tfidf_{i}' for i in range(X_protocol_full.shape[1])],
|
|
index=df.index
|
|
)
|
|
|
|
# Concatena con il dataframe originale
|
|
result_df = pd.concat([df, tfidf_df], axis=1)
|
|
|
|
logging.info(f"Feature testuali estratte: {tfidf_df.shape[1]} colonne")
|
|
return result_df, tfidf_vectorizer
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'elaborazione delle caratteristiche testuali: {e}")
|
|
return df, None
|
|
|
|
def process_categorical_features_optimized(df):
|
|
"""Processa le feature categoriche in modo ottimizzato"""
|
|
try:
|
|
logging.info("Elaborazione caratteristiche categoriche ottimizzata...")
|
|
categorical_features = {}
|
|
|
|
# One-hot encoding ottimizzato per 'Host'
|
|
if 'Host' in df.columns:
|
|
# Limita il numero di categorie per performance
|
|
top_hosts = df['Host'].value_counts().head(10).index
|
|
df['Host_simplified'] = df['Host'].where(df['Host'].isin(top_hosts), 'Other')
|
|
|
|
host_dummies = pd.get_dummies(df['Host_simplified'], prefix='host', dummy_na=True)
|
|
categorical_features['host'] = host_dummies.columns.tolist()
|
|
df = pd.concat([df, host_dummies], axis=1)
|
|
|
|
# Encoding ottimizzato per IP_Attaccante
|
|
if 'IP_Attaccante' in df.columns and df['IP_Attaccante'].notna().any():
|
|
# Usa hash encoding semplificato per performance
|
|
ip_data = df['IP_Attaccante'].fillna('unknown').astype(str)
|
|
|
|
# Hash encoding manuale per velocità
|
|
for i in range(15):
|
|
df[f'ip_hash_{i}'] = pd.util.hash_array(ip_data.values) % (2**(i+8)) / (2**(i+8))
|
|
|
|
categorical_features['IP_Attaccante'] = [f'ip_hash_{i}' for i in range(15)]
|
|
|
|
logging.info("Caratteristiche categoriche elaborate con successo")
|
|
return df, categorical_features
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'elaborazione delle caratteristiche categoriche: {e}")
|
|
return df, {}
|
|
|
|
def prepare_data_for_model_optimized(df):
|
|
"""Prepara i dati per il modello in modo ottimizzato"""
|
|
try:
|
|
total_steps = 5
|
|
start_progress_tracking("preparazione dati ottimizzata", total_steps)
|
|
|
|
log_phase("Preparazione dati per modello (ottimizzata)")
|
|
|
|
if df.empty:
|
|
log_error("DataFrame vuoto")
|
|
end_progress_tracking(success=False)
|
|
return None, None
|
|
|
|
# 1. Feature temporali ottimizzate
|
|
update_progress(1, details=f"Caratteristiche temporali per {len(df)} record")
|
|
df = extract_time_features_optimized(df)
|
|
|
|
# 2. Feature testuali ottimizzate
|
|
update_progress(2, details="Caratteristiche testuali")
|
|
df, text_vectorizer = process_text_features_optimized(df)
|
|
|
|
# 3. Feature categoriche ottimizzate
|
|
update_progress(3, details="Caratteristiche categoriche")
|
|
df, categorical_features = process_categorical_features_optimized(df)
|
|
|
|
# 4. Selezione feature ottimizzata
|
|
update_progress(4, details="Selezione e pulizia feature")
|
|
|
|
# Seleziona colonne numeriche
|
|
numeric_cols = [col for col in df.columns if df[col].dtype in ['int64', 'float64']]
|
|
|
|
# Seleziona colonne dummy
|
|
dummy_cols = []
|
|
for cols in categorical_features.values():
|
|
dummy_cols.extend(cols)
|
|
|
|
# Seleziona colonne tfidf
|
|
tfidf_cols = [col for col in df.columns if col.startswith('protocol_tfidf_')]
|
|
|
|
# Combina tutte le colonne
|
|
feature_cols = numeric_cols + dummy_cols + tfidf_cols
|
|
|
|
# Rimuovi colonne non necessarie
|
|
excluded_cols = [
|
|
'ID', 'Data', 'Ora', 'Timestamp', 'Host', 'IndirizzoIP',
|
|
'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4',
|
|
'IP_Attaccante', 'IP_Attaccato', 'Porta_Attaccante', 'Porta_Attaccato',
|
|
'Protocollo', 'Host_simplified'
|
|
]
|
|
|
|
feature_cols = [col for col in feature_cols if col not in excluded_cols]
|
|
|
|
# Limita numero di feature per performance
|
|
if len(feature_cols) > 125:
|
|
feature_cols = feature_cols[:125]
|
|
log_warning(f"Limitando a 125 feature per performance")
|
|
|
|
# 5. Finalizzazione
|
|
update_progress(5, details="Finalizzazione dataset")
|
|
|
|
# Crea DataFrame delle feature
|
|
X = df[feature_cols].copy()
|
|
|
|
# Gestisci valori mancanti
|
|
X.fillna(0, inplace=True)
|
|
|
|
# Rimuovi feature con varianza zero
|
|
variance_threshold = 0.001
|
|
feature_variances = X.var()
|
|
valid_features = feature_variances[feature_variances > variance_threshold].index
|
|
X = X[valid_features]
|
|
|
|
# Aggiorna feature_cols
|
|
feature_cols = valid_features.tolist()
|
|
|
|
# Crea preprocessor
|
|
preprocessor = {
|
|
'feature_columns': feature_cols,
|
|
'text_vectorizer': text_vectorizer,
|
|
'categorical_features': categorical_features
|
|
}
|
|
|
|
# Salva preprocessor
|
|
try:
|
|
dump(preprocessor, PREPROCESSOR_PATH)
|
|
log_result(f"Preprocessor salvato: {len(feature_cols)} feature")
|
|
except Exception as e:
|
|
log_error(f"Errore nel salvataggio preprocessor: {e}")
|
|
|
|
log_result(f"Dati preparati: {X.shape[0]} esempi, {X.shape[1]} feature")
|
|
end_progress_tracking()
|
|
return X, preprocessor
|
|
|
|
except Exception as e:
|
|
end_progress_tracking(success=False)
|
|
log_error(f"Errore nella preparazione dei dati: {e}")
|
|
return None, None
|
|
|
|
def train_models_optimized(X):
|
|
"""Addestra i modelli in modo ottimizzato per grandi dataset"""
|
|
try:
|
|
start_progress_tracking("addestramento modelli ottimizzato", 4)
|
|
log_phase("Addestramento modelli ottimizzato")
|
|
|
|
if X.shape[0] < MIN_TRAINING_SAMPLES:
|
|
log_error(f"Troppo pochi campioni per addestramento: {X.shape[0]} < {MIN_TRAINING_SAMPLES}")
|
|
end_progress_tracking(success=False)
|
|
return None, None, None, None
|
|
|
|
# Campionamento se dataset troppo grande
|
|
if X.shape[0] > MAX_TRAINING_SAMPLES:
|
|
log_warning(f"Dataset molto grande ({X.shape[0]}), campionando {MAX_TRAINING_SAMPLES} esempi")
|
|
indices = np.random.choice(X.shape[0], MAX_TRAINING_SAMPLES, replace=False)
|
|
X_train = X.iloc[indices]
|
|
else:
|
|
X_train = X
|
|
|
|
log_result(f"Addestramento su {X_train.shape[0]} esempi con {X_train.shape[1]} feature")
|
|
|
|
# 1. Isolation Forest ottimizzato
|
|
update_progress(1, details=f"Isolation Forest su {X_train.shape[0]} campioni")
|
|
if_model = IsolationForest(
|
|
n_estimators=100, # Ridotto per velocità
|
|
contamination=0.05,
|
|
random_state=42,
|
|
n_jobs=-1,
|
|
max_samples='auto',
|
|
max_features=1.0
|
|
)
|
|
if_model.fit(X_train)
|
|
log_result("Isolation Forest addestrato")
|
|
|
|
# 2. Local Outlier Factor ottimizzato
|
|
update_progress(2, details="Local Outlier Factor")
|
|
# Limita campioni per LOF se troppi (LOF è O(n²))
|
|
if X_train.shape[0] > 50000:
|
|
lof_sample = X_train.sample(n=50000, random_state=42)
|
|
log_warning("LOF: usando campione di 50k per performance")
|
|
else:
|
|
lof_sample = X_train
|
|
|
|
lof_model = LocalOutlierFactor(
|
|
n_neighbors=min(20, lof_sample.shape[0] // 10),
|
|
contamination=0.05,
|
|
novelty=True,
|
|
n_jobs=-1
|
|
)
|
|
lof_model.fit(lof_sample)
|
|
log_result("Local Outlier Factor addestrato")
|
|
|
|
# 3. One-Class SVM ottimizzato
|
|
update_progress(3, details="One-Class SVM")
|
|
# Limita campioni per SVM se troppi (SVM è lento su grandi dataset)
|
|
if X_train.shape[0] > 20000:
|
|
svm_sample = X_train.sample(n=20000, random_state=42)
|
|
log_warning("SVM: usando campione di 20k per performance")
|
|
else:
|
|
svm_sample = X_train
|
|
|
|
svm_model = OneClassSVM(
|
|
kernel='rbf',
|
|
gamma='scale', # Cambiato da 'auto' per sklearn recenti
|
|
nu=0.05
|
|
)
|
|
svm_model.fit(svm_sample)
|
|
log_result("One-Class SVM addestrato")
|
|
|
|
# 4. Salvataggio modelli
|
|
update_progress(4, details="Salvataggio modelli")
|
|
|
|
try:
|
|
dump(if_model, IF_MODEL_PATH)
|
|
dump(lof_model, LOF_MODEL_PATH)
|
|
dump(svm_model, SVM_MODEL_PATH)
|
|
|
|
# Pesi ensemble ottimizzati
|
|
ensemble_weights = {
|
|
'isolation_forest': 0.50, # Peso maggiore per IF (più veloce)
|
|
'lof': 0.30,
|
|
'svm': 0.20
|
|
}
|
|
dump(ensemble_weights, ENSEMBLE_MODEL_PATH)
|
|
|
|
log_result("Tutti i modelli salvati con successo")
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore nel salvataggio: {e}")
|
|
end_progress_tracking(success=False)
|
|
return None, None, None, None
|
|
|
|
end_progress_tracking()
|
|
return if_model, lof_model, svm_model, ensemble_weights
|
|
|
|
except Exception as e:
|
|
end_progress_tracking(success=False)
|
|
log_error(f"Errore nell'addestramento: {e}")
|
|
return None, None, None, None
|
|
|
|
def save_model_timestamp():
|
|
"""Salva il timestamp dell'ultimo addestramento"""
|
|
try:
|
|
engine = connect_to_database()
|
|
if not engine:
|
|
return False
|
|
|
|
with engine.connect() as conn:
|
|
create_table_query = text("""
|
|
CREATE TABLE IF NOT EXISTS model_metadata (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
model_name VARCHAR(50) NOT NULL,
|
|
last_trained TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
model_path VARCHAR(255),
|
|
training_samples INT DEFAULT 0,
|
|
feature_count INT DEFAULT 0,
|
|
UNIQUE KEY unique_model (model_name)
|
|
)
|
|
""")
|
|
conn.execute(create_table_query)
|
|
|
|
upsert_query = text("""
|
|
INSERT INTO model_metadata (model_name, last_trained, model_path, training_samples, feature_count)
|
|
VALUES ('ensemble', NOW(), :model_path, :samples, :features)
|
|
ON DUPLICATE KEY UPDATE
|
|
last_trained = NOW(),
|
|
model_path = :model_path,
|
|
training_samples = :samples,
|
|
feature_count = :features
|
|
""")
|
|
|
|
conn.execute(upsert_query, {
|
|
"model_path": ENSEMBLE_MODEL_PATH,
|
|
"samples": 0, # Sarà aggiornato dal chiamante
|
|
"features": 0 # Sarà aggiornato dal chiamante
|
|
})
|
|
conn.commit()
|
|
|
|
log_result("Timestamp addestramento salvato")
|
|
return True
|
|
except Exception as e:
|
|
log_error(f"Errore nel salvare timestamp: {e}")
|
|
return False
|
|
|
|
def needs_training(force_training=False):
|
|
"""Verifica se il modello deve essere riaddestrato"""
|
|
if force_training:
|
|
log_result("Riaddestramento forzato richiesto")
|
|
return True
|
|
|
|
try:
|
|
engine = connect_to_database()
|
|
if not engine:
|
|
return True
|
|
|
|
with engine.connect() as conn:
|
|
try:
|
|
query = text("""
|
|
SELECT last_trained, training_samples, feature_count
|
|
FROM model_metadata
|
|
WHERE model_name = 'ensemble'
|
|
""")
|
|
|
|
result = conn.execute(query).fetchone()
|
|
|
|
if not result:
|
|
log_result("Nessun addestramento precedente, riaddestramento necessario")
|
|
return True
|
|
|
|
last_trained, samples, features = result
|
|
now = datetime.now()
|
|
hours_diff = (now - last_trained).total_seconds() / 3600
|
|
|
|
if hours_diff >= TRAINING_FREQUENCY_HOURS:
|
|
log_result(f"Ultimo addestramento: {hours_diff:.1f} ore fa, riaddestramento necessario")
|
|
return True
|
|
else:
|
|
log_result(f"Ultimo addestramento: {hours_diff:.1f} ore fa, non necessario")
|
|
return False
|
|
|
|
except Exception as e:
|
|
log_warning(f"Errore nel controllo metadata: {e}")
|
|
return True
|
|
except Exception as e:
|
|
log_error(f"Errore nel verificare necessità addestramento: {e}")
|
|
return True
|
|
|
|
def test_database_connection():
|
|
"""Testa la connessione al database"""
|
|
try:
|
|
engine = connect_to_database()
|
|
if not engine:
|
|
return False
|
|
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("SELECT 1")).fetchone()
|
|
if result and result[0] == 1:
|
|
log_result("Test connessione database superato")
|
|
|
|
# Verifica tabelle
|
|
tables = conn.execute(text("SHOW TABLES")).fetchall()
|
|
table_names = [t[0] for t in tables]
|
|
|
|
if 'Esterna' in table_names:
|
|
count = conn.execute(text("SELECT COUNT(*) FROM Esterna")).fetchone()[0]
|
|
log_result(f"Tabella Esterna: {count} record")
|
|
else:
|
|
log_error("Tabella Esterna non trovata")
|
|
return False
|
|
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
log_error(f"Errore test connessione: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Funzione principale ottimizzata"""
|
|
parser = argparse.ArgumentParser(description='Analisi comportamentale ottimizzata per grandi volumi')
|
|
parser.add_argument('--force-training', action='store_true', help='Forza riaddestramento')
|
|
parser.add_argument('--test', action='store_true', help='Modalità test')
|
|
parser.add_argument('--time-window', type=int, default=12, help='Finestra temporale in ore (default: 12)')
|
|
parser.add_argument('--max-records', type=int, default=500000, help='Max record per addestramento (default: 500k)')
|
|
parser.add_argument('--batch-size', type=int, default=20000, help='Dimensione batch (default: 20k)')
|
|
parser.add_argument('--debug', action='store_true', help='Abilita debug logging')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
log_phase("Avvio sistema di addestramento ottimizzato")
|
|
log_result(f"Configurazione: {args.time_window}h, max {args.max_records} record, batch {args.batch_size}")
|
|
|
|
# Modalità test
|
|
if args.test:
|
|
log_phase("Modalità test")
|
|
|
|
if not test_database_connection():
|
|
log_error("Test database fallito")
|
|
sys.exit(1)
|
|
|
|
log_result("Test completato con successo")
|
|
sys.exit(0)
|
|
|
|
# Test connessione
|
|
if not test_database_connection():
|
|
log_error("Impossibile connettersi al database")
|
|
sys.exit(1)
|
|
|
|
# Controllo necessità addestramento
|
|
if not needs_training(args.force_training):
|
|
log_result("Addestramento non necessario")
|
|
sys.exit(0)
|
|
|
|
try:
|
|
engine = connect_to_database()
|
|
if not engine:
|
|
log_error("Connessione database fallita")
|
|
sys.exit(1)
|
|
|
|
# Estrazione dati
|
|
df = extract_data_for_training_optimized(
|
|
engine,
|
|
args.time_window,
|
|
args.max_records,
|
|
args.batch_size
|
|
)
|
|
|
|
if df.empty:
|
|
log_error("Nessun dato estratto")
|
|
sys.exit(1)
|
|
|
|
# Preparazione dati
|
|
X, preprocessor = prepare_data_for_model_optimized(df)
|
|
|
|
if X is None:
|
|
log_error("Errore nella preparazione dati")
|
|
sys.exit(1)
|
|
|
|
# Addestramento
|
|
models = train_models_optimized(X)
|
|
|
|
if all(m is not None for m in models):
|
|
log_phase("Addestramento completato con successo")
|
|
|
|
# Salva timestamp
|
|
save_model_timestamp()
|
|
|
|
# Statistiche finali
|
|
memory_usage = psutil.virtual_memory().percent
|
|
log_result(f"Memoria finale utilizzata: {memory_usage:.1f}%")
|
|
log_result(f"Modelli addestrati su {X.shape[0]} campioni con {X.shape[1]} feature")
|
|
|
|
else:
|
|
log_error("Errore nell'addestramento dei modelli")
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore generale: {e}")
|
|
import traceback
|
|
logging.error(traceback.format_exc())
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |