Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
906 lines
36 KiB
Python
906 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.sql import text
|
|
from sklearn.ensemble import IsolationForest
|
|
from sklearn.neighbors import LocalOutlierFactor
|
|
from sklearn.svm import OneClassSVM
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from joblib import dump, load
|
|
import logging
|
|
import gc
|
|
import os
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
import numpy as np
|
|
import argparse # Aggiunto per gestire gli argomenti da linea di comando
|
|
import sys
|
|
import traceback
|
|
import threading
|
|
|
|
# Configurazione del logging migliorata
|
|
logging.basicConfig(
|
|
level=logging.DEBUG, # Cambiato da INFO a DEBUG
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('analisys_debug.log') # File di log separato
|
|
]
|
|
)
|
|
|
|
# Cartella per i modelli
|
|
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
|
|
try:
|
|
os.makedirs(MODEL_DIR, exist_ok=True)
|
|
logging.debug(f"Directory models creata/verificata: {MODEL_DIR}")
|
|
except Exception as e:
|
|
logging.error(f"Errore nella creazione della directory models: {e}")
|
|
# Fallback alla directory corrente
|
|
MODEL_DIR = os.path.join(os.getcwd(), 'models')
|
|
try:
|
|
os.makedirs(MODEL_DIR, exist_ok=True)
|
|
logging.debug(f"Directory models creata come fallback in: {MODEL_DIR}")
|
|
except Exception as e2:
|
|
logging.error(f"Impossibile creare la directory models anche come fallback: {e2}")
|
|
MODEL_DIR = '.' # Usa la directory corrente come ultima risorsa
|
|
|
|
# Percorsi dei modelli
|
|
IF_MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
|
|
LOF_MODEL_PATH = os.path.join(MODEL_DIR, 'lof.joblib')
|
|
SVM_MODEL_PATH = os.path.join(MODEL_DIR, 'svm.joblib')
|
|
ENSEMBLE_MODEL_PATH = os.path.join(MODEL_DIR, 'ensemble_weights.joblib')
|
|
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
|
|
ACCUMULATED_DATA_PATH = os.path.join(MODEL_DIR, 'accumulated_data.pkl')
|
|
LAST_TRAINING_PATH = os.path.join(MODEL_DIR, 'last_training.txt')
|
|
|
|
# Parametri di configurazione
|
|
TRAINING_FREQUENCY_HOURS = 12 # Riaddestra ogni 12 ore
|
|
CONTINUOUS_LEARNING = True
|
|
|
|
# Variabili globali per il tracciamento dell'avanzamento
|
|
progress_status = {
|
|
'in_progress': False,
|
|
'operation': '',
|
|
'start_time': None,
|
|
'current_step': 0,
|
|
'total_steps': 0,
|
|
'details': '',
|
|
'last_update': 0
|
|
}
|
|
|
|
def start_progress_tracking(operation, total_steps=100):
|
|
"""
|
|
Inizia il tracciamento di un'operazione lunga
|
|
"""
|
|
global progress_status
|
|
progress_status['in_progress'] = True
|
|
progress_status['operation'] = operation
|
|
progress_status['start_time'] = time.time()
|
|
progress_status['current_step'] = 0
|
|
progress_status['total_steps'] = total_steps
|
|
progress_status['details'] = 'Inizializzazione...'
|
|
progress_status['last_update'] = 0
|
|
|
|
# Avvia un thread separato per il monitoraggio dell'avanzamento
|
|
threading.Thread(target=progress_reporter, daemon=True).start()
|
|
logging.info(f"Avvio: {operation} (totale passi previsti: {total_steps})")
|
|
|
|
def update_progress(step=None, increment=1, details=''):
|
|
"""
|
|
Aggiorna lo stato di avanzamento di un'operazione lunga
|
|
"""
|
|
global progress_status
|
|
if not progress_status['in_progress']:
|
|
return
|
|
|
|
if step is not None:
|
|
progress_status['current_step'] = step
|
|
else:
|
|
progress_status['current_step'] += increment
|
|
|
|
if details:
|
|
progress_status['details'] = details
|
|
|
|
# Forza un aggiornamento immediato se details è stato fornito
|
|
if details and (time.time() - progress_status['last_update']) > 5:
|
|
report_progress()
|
|
|
|
def end_progress_tracking(success=True):
|
|
"""
|
|
Termina il tracciamento dell'avanzamento
|
|
"""
|
|
global progress_status
|
|
if not progress_status['in_progress']:
|
|
return
|
|
|
|
elapsed = time.time() - progress_status['start_time']
|
|
if success:
|
|
logging.info(f"Completato: {progress_status['operation']} in {elapsed:.1f} secondi")
|
|
else:
|
|
logging.error(f"Fallito: {progress_status['operation']} dopo {elapsed:.1f} secondi")
|
|
|
|
progress_status['in_progress'] = False
|
|
progress_status['current_step'] = progress_status['total_steps']
|
|
report_progress(force=True)
|
|
|
|
def report_progress(force=False):
|
|
"""
|
|
Riporta lo stato di avanzamento attuale
|
|
"""
|
|
global progress_status
|
|
if not progress_status['in_progress'] and not force:
|
|
return
|
|
|
|
current_time = time.time()
|
|
if not force and (current_time - progress_status['last_update']) < 30:
|
|
return # Non mostrare aggiornamenti più frequenti di 30 secondi
|
|
|
|
elapsed = current_time - progress_status['start_time']
|
|
percent = (progress_status['current_step'] / progress_status['total_steps']) * 100 if progress_status['total_steps'] > 0 else 0
|
|
|
|
# Calcola il tempo rimanente (se possibile)
|
|
remaining = "N/A"
|
|
if percent > 0:
|
|
remaining_seconds = (elapsed / percent) * (100 - percent)
|
|
if remaining_seconds < 60:
|
|
remaining = f"{remaining_seconds:.0f} secondi"
|
|
elif remaining_seconds < 3600:
|
|
remaining = f"{remaining_seconds/60:.1f} minuti"
|
|
else:
|
|
remaining = f"{remaining_seconds/3600:.1f} ore"
|
|
|
|
message = f"Progresso {progress_status['operation']}: {percent:.1f}% completato, {elapsed:.1f} secondi trascorsi, tempo rimasto: {remaining}"
|
|
if progress_status['details']:
|
|
message += f" - {progress_status['details']}"
|
|
|
|
logging.info(message)
|
|
progress_status['last_update'] = current_time
|
|
|
|
def progress_reporter():
|
|
"""
|
|
Thread che riporta periodicamente i progressi
|
|
"""
|
|
while progress_status['in_progress']:
|
|
report_progress()
|
|
time.sleep(5) # Controlla ogni 5 secondi, ma riporta solo ogni 30
|
|
|
|
def extract_time_features(df):
|
|
"""
|
|
Estrae caratteristiche temporali dai dati
|
|
"""
|
|
total_rows = len(df)
|
|
batch_size = max(1, total_rows // 10) # Dividi in circa 10 batch per i report
|
|
|
|
logging.info("Estrazione delle caratteristiche temporali...")
|
|
|
|
# Converti timestamp in ora del giorno e giorno della settimana
|
|
if 'Timestamp' not in df.columns and 'Data' in df.columns and 'Ora' in df.columns:
|
|
# Crea timestamp dalle colonne Data e Ora
|
|
df['Data'] = pd.to_datetime(df['Data'], errors='coerce')
|
|
df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce')
|
|
df['Timestamp'] = df['Data'] + df['Ora']
|
|
|
|
if 'Timestamp' in df.columns:
|
|
df['hour_of_day'] = df['Timestamp'].dt.hour
|
|
df['day_of_week'] = df['Timestamp'].dt.dayofweek
|
|
else:
|
|
logging.warning("Colonna Timestamp non presente nei dati. Impossibile estrarre caratteristiche temporali.")
|
|
df['hour_of_day'] = 0
|
|
df['day_of_week'] = 0
|
|
|
|
# Inizializza colonne per le nuove caratteristiche
|
|
df['time_since_last'] = np.nan
|
|
df['events_last_hour'] = 0
|
|
df['events_last_day'] = 0
|
|
|
|
# Estrai IP attaccante da Messaggio2 se disponibile
|
|
try:
|
|
update_progress(details="Estrazione IP e porte dagli attaccanti")
|
|
# Messaggio2 contiene IP e porta dell'attaccante
|
|
df['IP_Attaccante'] = df['Messaggio2'].apply(lambda x: x.split(':')[0] if pd.notna(x) and ':' in str(x) else None)
|
|
df['Porta_Attaccante'] = df['Messaggio2'].apply(lambda x: x.split(':')[1] if pd.notna(x) and ':' in str(x) else None)
|
|
|
|
# Messaggio3 contiene IP e porta dell'attaccato
|
|
df['IP_Attaccato'] = df['Messaggio3'].apply(lambda x: x.split(':')[0] if pd.notna(x) and ':' in str(x) else None)
|
|
df['Porta_Attaccato'] = df['Messaggio3'].apply(lambda x: x.split(':')[1] if pd.notna(x) and ':' in str(x) else None)
|
|
|
|
logging.debug(f"Estratto IP attaccante per {len(df[df['IP_Attaccante'].notna()])} record")
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'estrazione degli IP dai messaggi: {e}")
|
|
|
|
# Calcola caratteristiche solo per IP dell'attaccante
|
|
ip_column = 'IP_Attaccante'
|
|
if ip_column in df.columns and not df[ip_column].isna().all():
|
|
logging.debug(f"Calcolo statistiche per l'IP dell'attaccante")
|
|
update_progress(details="Calcolo statistiche per IP attaccanti")
|
|
|
|
# Ordina per IP e timestamp
|
|
df_sorted = df.sort_values([ip_column, 'Timestamp']).dropna(subset=[ip_column])
|
|
|
|
# Per ogni IP, calcola il tempo tra eventi consecutivi
|
|
df['time_since_last'] = np.nan
|
|
df['events_last_hour'] = 0
|
|
df['events_last_day'] = 0
|
|
|
|
# Conta quanti IP unici ci sono per stimare il lavoro
|
|
unique_ips = df_sorted[ip_column].nunique()
|
|
update_progress(details=f"Analisi di {unique_ips} IP univoci")
|
|
|
|
ip_counter = 0
|
|
for ip, group in df_sorted.groupby(ip_column):
|
|
ip_counter += 1
|
|
if ip_counter % 100 == 0: # Aggiorna ogni 100 IP
|
|
update_progress(details=f"Analizzati {ip_counter}/{unique_ips} IP")
|
|
|
|
if len(group) > 1:
|
|
# Calcola il tempo tra eventi consecutivi
|
|
group = group.copy()
|
|
group['time_since_last'] = group['Timestamp'].diff().dt.total_seconds()
|
|
|
|
# Aggiorna il DataFrame originale
|
|
df.loc[group.index, 'time_since_last'] = group['time_since_last']
|
|
|
|
# Conta eventi nell'ultima ora e giorno per ogni IP
|
|
for idx, row in group.iterrows():
|
|
current_time = row['Timestamp']
|
|
one_hour_ago = current_time - timedelta(hours=1)
|
|
one_day_ago = current_time - timedelta(days=1)
|
|
|
|
# Conta eventi nell'ultima ora
|
|
events_last_hour = len(group[(group['Timestamp'] > one_hour_ago) &
|
|
(group['Timestamp'] <= current_time)])
|
|
|
|
# Conta eventi nell'ultimo giorno
|
|
events_last_day = len(group[(group['Timestamp'] > one_day_ago) &
|
|
(group['Timestamp'] <= current_time)])
|
|
|
|
df.loc[idx, 'events_last_hour'] = events_last_hour
|
|
df.loc[idx, 'events_last_day'] = events_last_day
|
|
|
|
update_progress(details="Calcolo statistiche aggregate per IP")
|
|
# Calcola statistiche per IP manualmente
|
|
ip_stats = pd.DataFrame()
|
|
|
|
# Raggruppa i dati per IP per calcolare le statistiche
|
|
ip_groups = df_sorted.groupby(ip_column)
|
|
|
|
# Calcola statistiche
|
|
ip_stats['time_since_last_mean'] = ip_groups['time_since_last'].mean()
|
|
ip_stats['time_since_last_std'] = ip_groups['time_since_last'].std().fillna(0)
|
|
ip_stats['time_since_last_min'] = ip_groups['time_since_last'].min()
|
|
ip_stats['time_since_last_max'] = ip_groups['time_since_last'].max()
|
|
ip_stats['events_last_hour_max'] = ip_groups['events_last_hour'].max()
|
|
ip_stats['events_last_day_max'] = ip_groups['events_last_day'].max()
|
|
|
|
# Resetta l'indice per avere IP come colonna
|
|
ip_stats = ip_stats.reset_index()
|
|
|
|
# Merge con il DataFrame originale
|
|
update_progress(details="Merge delle statistiche con il dataset principale")
|
|
df = df.merge(ip_stats, on=ip_column, how='left')
|
|
|
|
# Gestisci valori NaN in tutte le feature
|
|
update_progress(details="Pulizia valori mancanti")
|
|
for col in df.columns:
|
|
if col.startswith('time_since_last') or col.startswith('events_last'):
|
|
df[col] = df[col].fillna(0)
|
|
|
|
logging.info("Caratteristiche temporali estratte con successo.")
|
|
return df
|
|
|
|
def connect_to_database():
|
|
"""
|
|
Connette al database MySQL usando le credenziali da variabili d'ambiente
|
|
"""
|
|
try:
|
|
logging.info("Connessione al database...")
|
|
|
|
db_user = os.environ.get('MYSQL_USER', 'root')
|
|
db_password = os.environ.get('MYSQL_PASSWORD', 'Hdgtejskjjc0-')
|
|
db_host = os.environ.get('MYSQL_HOST', 'localhost')
|
|
db_name = os.environ.get('MYSQL_DATABASE', 'LOG_MIKROTIK')
|
|
|
|
connection_string = f"mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}"
|
|
engine = create_engine(connection_string)
|
|
|
|
return engine
|
|
except Exception as e:
|
|
logging.error(f"Errore nella connessione al database: {e}")
|
|
return None
|
|
|
|
def extract_new_data(engine, window_minutes=90):
|
|
"""
|
|
Estrae nuovi dati dal database per l'addestramento del modello
|
|
"""
|
|
try:
|
|
logging.info(f"Estrazione dei dati degli ultimi {window_minutes} minuti...")
|
|
|
|
query = text("""
|
|
SELECT *
|
|
FROM Esterna
|
|
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window MINUTE)
|
|
""")
|
|
|
|
df = pd.read_sql(query, engine, params={"window": window_minutes})
|
|
logging.info(f"Estratti {len(df)} record dalla tabella Esterna")
|
|
return df
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'estrazione dei dati: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def save_model_timestamp():
|
|
"""
|
|
Salva il timestamp dell'ultimo addestramento del modello
|
|
"""
|
|
try:
|
|
engine = connect_to_database()
|
|
if not engine:
|
|
return False
|
|
|
|
with engine.connect() as conn:
|
|
# Crea la tabella se non esiste
|
|
create_table_query = text("""
|
|
CREATE TABLE IF NOT EXISTS model_metadata (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
model_name VARCHAR(50) NOT NULL,
|
|
last_trained TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
model_path VARCHAR(255),
|
|
UNIQUE KEY unique_model (model_name)
|
|
)
|
|
""")
|
|
conn.execute(create_table_query)
|
|
|
|
# Aggiorna o inserisci il timestamp
|
|
upsert_query = text("""
|
|
INSERT INTO model_metadata (model_name, last_trained, model_path)
|
|
VALUES ('ensemble', NOW(), :model_path)
|
|
ON DUPLICATE KEY UPDATE last_trained = NOW(), model_path = :model_path
|
|
""")
|
|
|
|
conn.execute(upsert_query, {"model_path": ENSEMBLE_MODEL_PATH})
|
|
|
|
logging.info("Timestamp di addestramento del modello salvato con successo")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Errore nel salvare il timestamp di addestramento: {e}")
|
|
return False
|
|
|
|
def needs_training(force_training=False):
|
|
"""
|
|
Verifica se il modello deve essere riaddestrato (ogni 12 ore)
|
|
"""
|
|
if force_training:
|
|
logging.info("Riaddestramento forzato richiesto.")
|
|
return True
|
|
|
|
try:
|
|
engine = connect_to_database()
|
|
if not engine:
|
|
return True
|
|
|
|
with engine.connect() as conn:
|
|
# Verifica se la tabella esiste
|
|
try:
|
|
query = text("""
|
|
SELECT last_trained
|
|
FROM model_metadata
|
|
WHERE model_name = 'ensemble'
|
|
""")
|
|
|
|
result = conn.execute(query).fetchone()
|
|
|
|
if not result:
|
|
logging.info("Nessun dato di addestramento precedente trovato, riaddestramento necessario")
|
|
return True
|
|
|
|
last_trained = result[0]
|
|
now = datetime.now()
|
|
|
|
# Se l'ultimo addestramento è più vecchio di 12 ore, riaddestra
|
|
hours_diff = (now - last_trained).total_seconds() / 3600
|
|
|
|
if hours_diff >= 12:
|
|
logging.info(f"Ultimo addestramento: {last_trained}, {hours_diff:.1f} ore fa. Riaddestramento necessario")
|
|
return True
|
|
else:
|
|
logging.info(f"Ultimo addestramento: {last_trained}, {hours_diff:.1f} ore fa. Riaddestramento non necessario")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Errore nel controllo della tabella model_metadata: {e}")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Errore nel verificare se il modello deve essere riaddestrato: {e}")
|
|
return True
|
|
|
|
def update_last_training_time():
|
|
"""
|
|
Aggiorna il timestamp dell'ultimo addestramento
|
|
"""
|
|
with open(LAST_TRAINING_PATH, 'w') as f:
|
|
f.write(datetime.now().isoformat())
|
|
|
|
def train_models(X):
|
|
"""
|
|
Addestra i modelli di anomalia sui dati forniti
|
|
"""
|
|
start_progress_tracking("addestramento modelli", 4)
|
|
logging.info("Addestramento modelli in corso...")
|
|
|
|
try:
|
|
# Addestra Isolation Forest
|
|
update_progress(1, details=f"Addestramento Isolation Forest su {X.shape[0]} record con {X.shape[1]} feature")
|
|
if_model = IsolationForest(
|
|
n_estimators=200,
|
|
contamination=0.05, # Aumentato da 0.02 a 0.05 (5% di anomalie attese)
|
|
random_state=42,
|
|
n_jobs=-1 # Usa tutti i core disponibili
|
|
)
|
|
if_model.fit(X)
|
|
logging.info("Isolation Forest addestrato con successo")
|
|
|
|
# Addestra Local Outlier Factor
|
|
update_progress(2, details="Addestramento Local Outlier Factor")
|
|
lof_model = LocalOutlierFactor(
|
|
n_neighbors=20,
|
|
contamination=0.02,
|
|
novelty=True, # Deve essere True per predire nuovi dati
|
|
n_jobs=-1
|
|
)
|
|
lof_model.fit(X)
|
|
logging.info("Local Outlier Factor addestrato con successo")
|
|
|
|
# Addestra One-Class SVM
|
|
update_progress(3, details="Addestramento One-Class SVM")
|
|
svm_model = OneClassSVM(
|
|
kernel='rbf',
|
|
gamma='auto',
|
|
nu=0.02 # Simile a contamination
|
|
)
|
|
svm_model.fit(X)
|
|
logging.info("One-Class SVM addestrato con successo")
|
|
|
|
# Salva i modelli
|
|
update_progress(4, details="Salvataggio modelli su disco")
|
|
dump(if_model, IF_MODEL_PATH)
|
|
dump(lof_model, LOF_MODEL_PATH)
|
|
dump(svm_model, SVM_MODEL_PATH)
|
|
|
|
# Definisci pesi per l'ensemble (da ottimizzare con validazione)
|
|
ensemble_weights = {
|
|
'isolation_forest': 0.40,
|
|
'lof': 0.35,
|
|
'svm': 0.25
|
|
}
|
|
dump(ensemble_weights, ENSEMBLE_MODEL_PATH)
|
|
|
|
logging.info("Tutti i modelli salvati con successo")
|
|
end_progress_tracking()
|
|
|
|
return if_model, lof_model, svm_model, ensemble_weights
|
|
except Exception as e:
|
|
end_progress_tracking(success=False)
|
|
logging.error(f"Errore nell'addestramento dei modelli: {e}")
|
|
return None, None, None, None
|
|
|
|
def test_database_connection(conn_string=None):
|
|
"""
|
|
Testa la connessione al database e verifica la struttura della tabella Fibra
|
|
"""
|
|
try:
|
|
if not conn_string:
|
|
# Usa le stesse credenziali di connect_to_database
|
|
db_user = os.environ.get('MYSQL_USER', 'root')
|
|
db_password = os.environ.get('MYSQL_PASSWORD', 'Hdgtejskjjc0-')
|
|
db_host = os.environ.get('MYSQL_HOST', 'localhost')
|
|
db_name = os.environ.get('MYSQL_DATABASE', 'LOG_MIKROTIK')
|
|
|
|
conn_string = f"mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}"
|
|
|
|
logging.debug(f"Test di connessione al database con: {conn_string}")
|
|
|
|
# Test connessione di base
|
|
engine = create_engine(conn_string)
|
|
with engine.connect() as conn:
|
|
# Verifica connessione di base
|
|
result = conn.execute(text("SELECT 1")).fetchone()
|
|
if result and result[0] == 1:
|
|
logging.debug("Test connessione di base superato!")
|
|
|
|
# Elenca tabelle disponibili
|
|
tables = conn.execute(text("SHOW TABLES")).fetchall()
|
|
logging.debug(f"Tabelle disponibili: {[t[0] for t in tables]}")
|
|
|
|
# Verifica struttura tabella Fibra
|
|
columns = conn.execute(text("SHOW COLUMNS FROM Fibra")).fetchall()
|
|
column_names = [c[0] for c in columns]
|
|
logging.debug(f"Struttura tabella Fibra: {column_names}")
|
|
|
|
# Conta record nella tabella
|
|
count = conn.execute(text("SELECT COUNT(*) FROM Fibra")).fetchone()[0]
|
|
logging.debug(f"Tabella Fibra contiene {count} record")
|
|
|
|
# Ottieni un esempio
|
|
sample = conn.execute(text("SELECT * FROM Fibra LIMIT 1")).fetchone()
|
|
logging.debug(f"Esempio di dati da Fibra: {sample}")
|
|
|
|
return engine
|
|
else:
|
|
logging.error("Test di connessione al database fallito")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f"Errore nel test di connessione al database: {e}")
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def extract_data_sample(engine, table='Esterna', limit=5):
|
|
"""
|
|
Estrae un campione di dati dalla tabella specificata
|
|
"""
|
|
try:
|
|
query = text(f"SELECT * FROM {table} LIMIT :limit")
|
|
sample = pd.read_sql(query, engine, params={"limit": limit})
|
|
return sample
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'estrazione del campione di dati: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def extract_data_for_training(engine, window_hours=12, max_records=100000, batch_size=10000):
|
|
"""
|
|
Estrae dati per l'addestramento del modello con una finestra temporale
|
|
Implementa un sistema di paginazione per evitare blocchi con grandi dataset
|
|
|
|
Args:
|
|
engine: Connessione al database
|
|
window_hours: Finestra temporale in ore per estrarre i dati
|
|
max_records: Numero massimo di record da estrarre in totale
|
|
batch_size: Dimensione di ogni batch
|
|
"""
|
|
try:
|
|
start_progress_tracking(f"estrazione dati ({window_hours} ore)", 100)
|
|
update_progress(1, details=f"Preparazione query con limite {max_records} record")
|
|
|
|
logging.info(f"Estrazione dei dati delle ultime {window_hours} ore (max {max_records} record)...")
|
|
|
|
# Query di conteggio per stimare il totale
|
|
count_query = text("""
|
|
SELECT COUNT(*) AS total
|
|
FROM Esterna
|
|
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
|
|
""")
|
|
|
|
total_count = 0
|
|
with engine.connect() as conn:
|
|
result = conn.execute(count_query, {"window": window_hours}).fetchone()
|
|
total_count = result[0] if result else 0
|
|
|
|
# Limita al massimo specificato
|
|
total_count = min(total_count, max_records)
|
|
logging.info(f"Trovati {total_count} record totali, ne estraggo al massimo {max_records}")
|
|
|
|
# Estrae dati in batch
|
|
offset = 0
|
|
all_data = []
|
|
batches_extracted = 0
|
|
|
|
while offset < total_count:
|
|
curr_batch_size = min(batch_size, total_count - offset)
|
|
percent_complete = (offset / total_count) * 100 if total_count > 0 else 0
|
|
|
|
update_progress(int(percent_complete),
|
|
details=f"Estrazione batch {batches_extracted+1} ({offset}/{total_count} record, {percent_complete:.1f}%)")
|
|
|
|
query = text("""
|
|
SELECT *
|
|
FROM Esterna
|
|
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
|
|
ORDER BY ID ASC
|
|
LIMIT :batch_size OFFSET :offset
|
|
""")
|
|
|
|
batch_data = pd.read_sql(query, engine, params={
|
|
"window": window_hours,
|
|
"batch_size": curr_batch_size,
|
|
"offset": offset
|
|
})
|
|
|
|
if batch_data.empty:
|
|
logging.warning(f"Batch vuoto ricevuto al offset {offset}, interrompo estrazione")
|
|
break
|
|
|
|
batches_extracted += 1
|
|
logging.info(f"Estratto batch {batches_extracted}: {len(batch_data)} record")
|
|
all_data.append(batch_data)
|
|
|
|
offset += len(batch_data)
|
|
|
|
# Aggiorna lo stato di avanzamento
|
|
update_progress(int((offset / total_count) * 100),
|
|
details=f"Completato batch {batches_extracted} ({offset}/{total_count} record)")
|
|
|
|
# Controlla se abbiamo raggiunto il massimo
|
|
if offset >= max_records:
|
|
logging.info(f"Raggiunto il limite massimo di {max_records} record")
|
|
break
|
|
|
|
# Ferma dopo 10 batch per sicurezza se non abbiamo ancora raggiunto il totale
|
|
if batches_extracted >= 10 and offset < total_count:
|
|
logging.warning(f"Fermata estrazione dopo {batches_extracted} batch per evitare sovraccarico")
|
|
break
|
|
|
|
# Combina tutti i batch in un unico DataFrame
|
|
if all_data:
|
|
df = pd.concat(all_data, ignore_index=True)
|
|
logging.info(f"Estratti {len(df)} record in totale tramite {batches_extracted} batch")
|
|
update_progress(100, details=f"Estrazione completata: {len(df)} record in {batches_extracted} batch")
|
|
else:
|
|
df = pd.DataFrame()
|
|
logging.warning("Nessun dato estratto!")
|
|
update_progress(100, details="Estrazione completata: nessun dato trovato")
|
|
|
|
end_progress_tracking()
|
|
return df
|
|
except Exception as e:
|
|
end_progress_tracking(success=False)
|
|
logging.error(f"Errore nell'estrazione dei dati per l'addestramento: {e}")
|
|
import traceback
|
|
logging.error(traceback.format_exc())
|
|
return pd.DataFrame()
|
|
|
|
def process_text_features(df):
|
|
"""
|
|
Processa le feature testuali utilizzando TF-IDF
|
|
"""
|
|
try:
|
|
logging.info("Elaborazione delle caratteristiche testuali...")
|
|
|
|
# Messaggio1 contiene il protocollo
|
|
if 'Messaggio1' in df.columns:
|
|
df['Protocollo'] = df['Messaggio1'].fillna('')
|
|
else:
|
|
df['Protocollo'] = ''
|
|
|
|
# Preparazione del vettorizzatore TF-IDF per il protocollo
|
|
tfidf_vectorizer = TfidfVectorizer(max_features=50, stop_words='english')
|
|
X_protocol = tfidf_vectorizer.fit_transform(df['Protocollo'])
|
|
|
|
# Converti in DataFrame
|
|
tfidf_df = pd.DataFrame(
|
|
X_protocol.toarray(),
|
|
columns=[f'protocol_tfidf_{i}' for i in range(X_protocol.shape[1])],
|
|
index=df.index
|
|
)
|
|
|
|
# Concatena con il dataframe originale
|
|
result_df = pd.concat([df, tfidf_df], axis=1)
|
|
|
|
logging.info(f"Feature testuali estratte con successo: {tfidf_df.shape[1]} nuove colonne")
|
|
return result_df, tfidf_vectorizer
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'elaborazione delle caratteristiche testuali: {e}")
|
|
return df, None
|
|
|
|
def process_categorical_features(df):
|
|
"""
|
|
Processa le feature categoriche
|
|
"""
|
|
try:
|
|
logging.info("Elaborazione delle caratteristiche categoriche...")
|
|
categorical_features = {}
|
|
|
|
# One-hot encoding per 'Host' se presente
|
|
if 'Host' in df.columns:
|
|
host_dummies = pd.get_dummies(df['Host'], prefix='host', dummy_na=True)
|
|
categorical_features['host'] = host_dummies.columns.tolist()
|
|
df = pd.concat([df, host_dummies], axis=1)
|
|
|
|
# Encoding per IP_Attaccante se presente
|
|
if 'IP_Attaccante' in df.columns and df['IP_Attaccante'].notna().any():
|
|
from category_encoders import HashingEncoder
|
|
|
|
he = HashingEncoder(cols=['IP_Attaccante'], n_components=15)
|
|
ip_att_encoded = he.fit_transform(df[['IP_Attaccante']].fillna('unknown'))
|
|
categorical_features['IP_Attaccante'] = ip_att_encoded.columns.tolist()
|
|
df = pd.concat([df, ip_att_encoded], axis=1)
|
|
|
|
logging.info("Caratteristiche categoriche elaborate con successo.")
|
|
return df, categorical_features
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'elaborazione delle caratteristiche categoriche: {e}")
|
|
return df, {}
|
|
|
|
def prepare_data_for_model(df):
|
|
"""
|
|
Prepara i dati per il modello
|
|
"""
|
|
try:
|
|
total_steps = 4 # Numero di passi principali
|
|
start_progress_tracking("preparazione dati", total_steps)
|
|
|
|
logging.info("Preparazione dei dati per il modello...")
|
|
|
|
# Assicurati che il DataFrame non sia vuoto
|
|
if df.empty:
|
|
logging.error("DataFrame vuoto. Impossibile preparare i dati per il modello.")
|
|
end_progress_tracking(success=False)
|
|
return None, None
|
|
|
|
# Prepara le feature temporali
|
|
update_progress(1, details=f"Estrazione caratteristiche temporali per {len(df)} record")
|
|
df = extract_time_features(df)
|
|
|
|
# Processa le feature testuali
|
|
update_progress(2, details="Elaborazione caratteristiche testuali")
|
|
df, text_vectorizer = process_text_features(df)
|
|
|
|
# Processa le feature categoriche
|
|
update_progress(3, details="Elaborazione caratteristiche categoriche")
|
|
df, categorical_features = process_categorical_features(df)
|
|
|
|
# Seleziona le colonne e finalizza
|
|
update_progress(4, details="Selezione feature e finalizzazione")
|
|
|
|
# Seleziona le colonne numeriche per il modello
|
|
numeric_cols = [col for col in df.columns if df[col].dtype in ['int64', 'float64']]
|
|
|
|
# Seleziona le colonne dummy (one-hot)
|
|
dummy_cols = []
|
|
for cols in categorical_features.values():
|
|
dummy_cols.extend(cols)
|
|
|
|
# Seleziona le colonne tfidf
|
|
tfidf_cols = [col for col in df.columns if col.startswith('protocol_tfidf_')]
|
|
|
|
# Combina tutte le colonne
|
|
feature_cols = numeric_cols + dummy_cols + tfidf_cols
|
|
|
|
# Rimuovi colonne non necessarie per il modello
|
|
excluded_cols = ['ID', 'Data', 'Ora', 'Timestamp', 'Host', 'IndirizzoIP',
|
|
'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4',
|
|
'IP_Attaccante', 'IP_Attaccato', 'Porta_Attaccante', 'Porta_Attaccato',
|
|
'Protocollo']
|
|
|
|
feature_cols = [col for col in feature_cols if col not in excluded_cols]
|
|
|
|
logging.debug(f"Feature utilizzate per il modello: {feature_cols}")
|
|
|
|
# Crea DataFrame delle feature
|
|
X = df[feature_cols].copy()
|
|
|
|
# Gestisci eventuali valori mancanti
|
|
X.fillna(0, inplace=True)
|
|
|
|
# Crea un oggetto preprocessore che contiene tutte le informazioni necessarie
|
|
preprocessor = {
|
|
'feature_columns': feature_cols,
|
|
'text_vectorizer': text_vectorizer,
|
|
'categorical_features': categorical_features
|
|
}
|
|
|
|
# Salva il preprocessor su disco
|
|
try:
|
|
logging.info(f"Salvataggio preprocessor in {PREPROCESSOR_PATH}")
|
|
dump(preprocessor, PREPROCESSOR_PATH)
|
|
logging.info("Preprocessor salvato con successo")
|
|
except Exception as e:
|
|
logging.error(f"Errore nel salvataggio del preprocessor: {e}")
|
|
|
|
logging.info(f"Dati preparati con successo: {X.shape[0]} esempi, {X.shape[1]} feature")
|
|
end_progress_tracking()
|
|
return X, preprocessor
|
|
except Exception as e:
|
|
end_progress_tracking(success=False)
|
|
logging.error(f"Errore nella preparazione dei dati: {e}")
|
|
import traceback
|
|
logging.error(traceback.format_exc())
|
|
return None, None
|
|
|
|
def main():
|
|
# Parsing degli argomenti da linea di comando
|
|
parser = argparse.ArgumentParser(description='Analisi comportamentale del traffico di rete')
|
|
parser.add_argument('--force-training', action='store_true', help='Forza il riaddestramento del modello indipendentemente da quando è stato addestrato l\'ultima volta')
|
|
parser.add_argument('--test', action='store_true', help='Esegue un test delle funzionalità senza addestramento o predizione')
|
|
parser.add_argument('--time-window', type=str, default='12 HOUR', help='Finestra temporale per l\'estrazione dei dati (es. "12 HOUR", "30 MINUTE")')
|
|
parser.add_argument('--max-records', type=int, default=100000, help='Numero massimo di record da estrarre (default: 100000)')
|
|
parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione di ciascun batch per l\'estrazione (default: 10000)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Stampa le opzioni
|
|
logging.info(f"Opzioni: force_training={args.force_training}, test={args.test}, time_window={args.time_window}, max_records={args.max_records}")
|
|
|
|
# Se in modalità test, esegui solo le funzioni di diagnostica
|
|
if args.test:
|
|
logging.info("MODALITÀ TEST - esecuzione diagnostica")
|
|
|
|
# Test di connessione al database
|
|
engine = test_database_connection()
|
|
if not engine:
|
|
logging.error("Test database fallito - impossibile continuare")
|
|
sys.exit(1)
|
|
|
|
# Test di estrazione dati
|
|
logging.info("Test: estrazione di un campione di dati...")
|
|
sample_df = extract_data_sample(engine)
|
|
logging.info(f"Test di estrazione dati riuscito. Colonne: {sample_df.columns.tolist()}")
|
|
if not sample_df.empty:
|
|
logging.info(f"Esempio record: {sample_df.iloc[0].to_dict()}")
|
|
|
|
# Test percorsi modelli
|
|
logging.info("Test: controllo percorsi per i modelli...")
|
|
logging.info(f"MODEL_DIR esiste: {os.path.exists(MODEL_DIR)}")
|
|
|
|
# Test sistema di tracciamento progressi
|
|
logging.info("Test: sistema di tracciamento progressi")
|
|
start_progress_tracking("test tracciamento", 5)
|
|
for i in range(5):
|
|
update_progress(i+1, details=f"Passo di test {i+1}")
|
|
time.sleep(2) # Simula un'operazione che richiede tempo
|
|
end_progress_tracking()
|
|
|
|
logging.info("Modalità test completata")
|
|
sys.exit(0)
|
|
|
|
# Ottieni una connessione al database
|
|
engine = connect_to_database()
|
|
if not engine:
|
|
logging.error("Impossibile connettersi al database. Verificare le configurazioni e riprovare.")
|
|
sys.exit(1)
|
|
|
|
# Controlla se è necessario riaddestramento
|
|
if needs_training(args.force_training):
|
|
start_progress_tracking("processo di addestramento completo", 3)
|
|
update_progress(1, details="Verifica necessità di addestramento")
|
|
|
|
logging.info("Inizio addestramento dei modelli...")
|
|
|
|
# Estrai dati per addestramento
|
|
update_progress(details="Configurazione parametri di estrazione dati")
|
|
time_window_value = int(args.time_window.split(' ')[0])
|
|
time_window_unit = args.time_window.split(' ')[1]
|
|
|
|
if time_window_unit.upper() == 'HOUR':
|
|
df = extract_data_for_training(engine, time_window_value, args.max_records, args.batch_size)
|
|
elif time_window_unit.upper() == 'MINUTE':
|
|
df = extract_data_for_training(engine, time_window_value / 60, args.max_records, args.batch_size)
|
|
else:
|
|
logging.error(f"Unità di tempo non supportata: {time_window_unit}")
|
|
end_progress_tracking(success=False)
|
|
sys.exit(1)
|
|
|
|
if df.empty:
|
|
logging.error("Nessun dato estratto per l'addestramento. Verifica la connessione al database.")
|
|
end_progress_tracking(success=False)
|
|
sys.exit(1)
|
|
|
|
update_progress(2, details="Preparazione dati per il modello")
|
|
# Prepara i dati
|
|
X, preprocessor = prepare_data_for_model(df)
|
|
|
|
if X is None:
|
|
logging.error("Errore nella preparazione dei dati. Impossibile addestrare i modelli.")
|
|
end_progress_tracking(success=False)
|
|
sys.exit(1)
|
|
|
|
update_progress(3, details="Addestramento modelli")
|
|
# Addestra i modelli
|
|
models = train_models(X)
|
|
|
|
if all(m is not None for m in models):
|
|
logging.info("Addestramento completato con successo!")
|
|
|
|
# Aggiorna timestamp dell'ultimo addestramento
|
|
save_model_timestamp()
|
|
update_last_training_time()
|
|
end_progress_tracking()
|
|
else:
|
|
logging.error("Errore nell'addestramento dei modelli.")
|
|
end_progress_tracking(success=False)
|
|
sys.exit(1)
|
|
else:
|
|
logging.info("Addestramento non necessario, utilizzo dei modelli esistenti.")
|
|
|
|
logging.info("Analisi completata con successo.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |