ids.alfacom.it/extracted_idf/analisys_fixed.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

906 lines
36 KiB
Python

#!/usr/bin/env python3
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.feature_extraction.text import TfidfVectorizer
from joblib import dump, load
import logging
import gc
import os
import time
from datetime import datetime, timedelta
import numpy as np
import argparse # Aggiunto per gestire gli argomenti da linea di comando
import sys
import traceback
import threading
# Configurazione del logging migliorata
logging.basicConfig(
level=logging.DEBUG, # Cambiato da INFO a DEBUG
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('analisys_debug.log') # File di log separato
]
)
# Cartella per i modelli
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
try:
os.makedirs(MODEL_DIR, exist_ok=True)
logging.debug(f"Directory models creata/verificata: {MODEL_DIR}")
except Exception as e:
logging.error(f"Errore nella creazione della directory models: {e}")
# Fallback alla directory corrente
MODEL_DIR = os.path.join(os.getcwd(), 'models')
try:
os.makedirs(MODEL_DIR, exist_ok=True)
logging.debug(f"Directory models creata come fallback in: {MODEL_DIR}")
except Exception as e2:
logging.error(f"Impossibile creare la directory models anche come fallback: {e2}")
MODEL_DIR = '.' # Usa la directory corrente come ultima risorsa
# Percorsi dei modelli
IF_MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
LOF_MODEL_PATH = os.path.join(MODEL_DIR, 'lof.joblib')
SVM_MODEL_PATH = os.path.join(MODEL_DIR, 'svm.joblib')
ENSEMBLE_MODEL_PATH = os.path.join(MODEL_DIR, 'ensemble_weights.joblib')
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
ACCUMULATED_DATA_PATH = os.path.join(MODEL_DIR, 'accumulated_data.pkl')
LAST_TRAINING_PATH = os.path.join(MODEL_DIR, 'last_training.txt')
# Parametri di configurazione
TRAINING_FREQUENCY_HOURS = 12 # Riaddestra ogni 12 ore
CONTINUOUS_LEARNING = True
# Variabili globali per il tracciamento dell'avanzamento
progress_status = {
'in_progress': False,
'operation': '',
'start_time': None,
'current_step': 0,
'total_steps': 0,
'details': '',
'last_update': 0
}
def start_progress_tracking(operation, total_steps=100):
"""
Inizia il tracciamento di un'operazione lunga
"""
global progress_status
progress_status['in_progress'] = True
progress_status['operation'] = operation
progress_status['start_time'] = time.time()
progress_status['current_step'] = 0
progress_status['total_steps'] = total_steps
progress_status['details'] = 'Inizializzazione...'
progress_status['last_update'] = 0
# Avvia un thread separato per il monitoraggio dell'avanzamento
threading.Thread(target=progress_reporter, daemon=True).start()
logging.info(f"Avvio: {operation} (totale passi previsti: {total_steps})")
def update_progress(step=None, increment=1, details=''):
"""
Aggiorna lo stato di avanzamento di un'operazione lunga
"""
global progress_status
if not progress_status['in_progress']:
return
if step is not None:
progress_status['current_step'] = step
else:
progress_status['current_step'] += increment
if details:
progress_status['details'] = details
# Forza un aggiornamento immediato se details è stato fornito
if details and (time.time() - progress_status['last_update']) > 5:
report_progress()
def end_progress_tracking(success=True):
"""
Termina il tracciamento dell'avanzamento
"""
global progress_status
if not progress_status['in_progress']:
return
elapsed = time.time() - progress_status['start_time']
if success:
logging.info(f"Completato: {progress_status['operation']} in {elapsed:.1f} secondi")
else:
logging.error(f"Fallito: {progress_status['operation']} dopo {elapsed:.1f} secondi")
progress_status['in_progress'] = False
progress_status['current_step'] = progress_status['total_steps']
report_progress(force=True)
def report_progress(force=False):
"""
Riporta lo stato di avanzamento attuale
"""
global progress_status
if not progress_status['in_progress'] and not force:
return
current_time = time.time()
if not force and (current_time - progress_status['last_update']) < 30:
return # Non mostrare aggiornamenti più frequenti di 30 secondi
elapsed = current_time - progress_status['start_time']
percent = (progress_status['current_step'] / progress_status['total_steps']) * 100 if progress_status['total_steps'] > 0 else 0
# Calcola il tempo rimanente (se possibile)
remaining = "N/A"
if percent > 0:
remaining_seconds = (elapsed / percent) * (100 - percent)
if remaining_seconds < 60:
remaining = f"{remaining_seconds:.0f} secondi"
elif remaining_seconds < 3600:
remaining = f"{remaining_seconds/60:.1f} minuti"
else:
remaining = f"{remaining_seconds/3600:.1f} ore"
message = f"Progresso {progress_status['operation']}: {percent:.1f}% completato, {elapsed:.1f} secondi trascorsi, tempo rimasto: {remaining}"
if progress_status['details']:
message += f" - {progress_status['details']}"
logging.info(message)
progress_status['last_update'] = current_time
def progress_reporter():
"""
Thread che riporta periodicamente i progressi
"""
while progress_status['in_progress']:
report_progress()
time.sleep(5) # Controlla ogni 5 secondi, ma riporta solo ogni 30
def extract_time_features(df):
"""
Estrae caratteristiche temporali dai dati
"""
total_rows = len(df)
batch_size = max(1, total_rows // 10) # Dividi in circa 10 batch per i report
logging.info("Estrazione delle caratteristiche temporali...")
# Converti timestamp in ora del giorno e giorno della settimana
if 'Timestamp' not in df.columns and 'Data' in df.columns and 'Ora' in df.columns:
# Crea timestamp dalle colonne Data e Ora
df['Data'] = pd.to_datetime(df['Data'], errors='coerce')
df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce')
df['Timestamp'] = df['Data'] + df['Ora']
if 'Timestamp' in df.columns:
df['hour_of_day'] = df['Timestamp'].dt.hour
df['day_of_week'] = df['Timestamp'].dt.dayofweek
else:
logging.warning("Colonna Timestamp non presente nei dati. Impossibile estrarre caratteristiche temporali.")
df['hour_of_day'] = 0
df['day_of_week'] = 0
# Inizializza colonne per le nuove caratteristiche
df['time_since_last'] = np.nan
df['events_last_hour'] = 0
df['events_last_day'] = 0
# Estrai IP attaccante da Messaggio2 se disponibile
try:
update_progress(details="Estrazione IP e porte dagli attaccanti")
# Messaggio2 contiene IP e porta dell'attaccante
df['IP_Attaccante'] = df['Messaggio2'].apply(lambda x: x.split(':')[0] if pd.notna(x) and ':' in str(x) else None)
df['Porta_Attaccante'] = df['Messaggio2'].apply(lambda x: x.split(':')[1] if pd.notna(x) and ':' in str(x) else None)
# Messaggio3 contiene IP e porta dell'attaccato
df['IP_Attaccato'] = df['Messaggio3'].apply(lambda x: x.split(':')[0] if pd.notna(x) and ':' in str(x) else None)
df['Porta_Attaccato'] = df['Messaggio3'].apply(lambda x: x.split(':')[1] if pd.notna(x) and ':' in str(x) else None)
logging.debug(f"Estratto IP attaccante per {len(df[df['IP_Attaccante'].notna()])} record")
except Exception as e:
logging.error(f"Errore nell'estrazione degli IP dai messaggi: {e}")
# Calcola caratteristiche solo per IP dell'attaccante
ip_column = 'IP_Attaccante'
if ip_column in df.columns and not df[ip_column].isna().all():
logging.debug(f"Calcolo statistiche per l'IP dell'attaccante")
update_progress(details="Calcolo statistiche per IP attaccanti")
# Ordina per IP e timestamp
df_sorted = df.sort_values([ip_column, 'Timestamp']).dropna(subset=[ip_column])
# Per ogni IP, calcola il tempo tra eventi consecutivi
df['time_since_last'] = np.nan
df['events_last_hour'] = 0
df['events_last_day'] = 0
# Conta quanti IP unici ci sono per stimare il lavoro
unique_ips = df_sorted[ip_column].nunique()
update_progress(details=f"Analisi di {unique_ips} IP univoci")
ip_counter = 0
for ip, group in df_sorted.groupby(ip_column):
ip_counter += 1
if ip_counter % 100 == 0: # Aggiorna ogni 100 IP
update_progress(details=f"Analizzati {ip_counter}/{unique_ips} IP")
if len(group) > 1:
# Calcola il tempo tra eventi consecutivi
group = group.copy()
group['time_since_last'] = group['Timestamp'].diff().dt.total_seconds()
# Aggiorna il DataFrame originale
df.loc[group.index, 'time_since_last'] = group['time_since_last']
# Conta eventi nell'ultima ora e giorno per ogni IP
for idx, row in group.iterrows():
current_time = row['Timestamp']
one_hour_ago = current_time - timedelta(hours=1)
one_day_ago = current_time - timedelta(days=1)
# Conta eventi nell'ultima ora
events_last_hour = len(group[(group['Timestamp'] > one_hour_ago) &
(group['Timestamp'] <= current_time)])
# Conta eventi nell'ultimo giorno
events_last_day = len(group[(group['Timestamp'] > one_day_ago) &
(group['Timestamp'] <= current_time)])
df.loc[idx, 'events_last_hour'] = events_last_hour
df.loc[idx, 'events_last_day'] = events_last_day
update_progress(details="Calcolo statistiche aggregate per IP")
# Calcola statistiche per IP manualmente
ip_stats = pd.DataFrame()
# Raggruppa i dati per IP per calcolare le statistiche
ip_groups = df_sorted.groupby(ip_column)
# Calcola statistiche
ip_stats['time_since_last_mean'] = ip_groups['time_since_last'].mean()
ip_stats['time_since_last_std'] = ip_groups['time_since_last'].std().fillna(0)
ip_stats['time_since_last_min'] = ip_groups['time_since_last'].min()
ip_stats['time_since_last_max'] = ip_groups['time_since_last'].max()
ip_stats['events_last_hour_max'] = ip_groups['events_last_hour'].max()
ip_stats['events_last_day_max'] = ip_groups['events_last_day'].max()
# Resetta l'indice per avere IP come colonna
ip_stats = ip_stats.reset_index()
# Merge con il DataFrame originale
update_progress(details="Merge delle statistiche con il dataset principale")
df = df.merge(ip_stats, on=ip_column, how='left')
# Gestisci valori NaN in tutte le feature
update_progress(details="Pulizia valori mancanti")
for col in df.columns:
if col.startswith('time_since_last') or col.startswith('events_last'):
df[col] = df[col].fillna(0)
logging.info("Caratteristiche temporali estratte con successo.")
return df
def connect_to_database():
"""
Connette al database MySQL usando le credenziali da variabili d'ambiente
"""
try:
logging.info("Connessione al database...")
db_user = os.environ.get('MYSQL_USER', 'root')
db_password = os.environ.get('MYSQL_PASSWORD', 'Hdgtejskjjc0-')
db_host = os.environ.get('MYSQL_HOST', 'localhost')
db_name = os.environ.get('MYSQL_DATABASE', 'LOG_MIKROTIK')
connection_string = f"mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}"
engine = create_engine(connection_string)
return engine
except Exception as e:
logging.error(f"Errore nella connessione al database: {e}")
return None
def extract_new_data(engine, window_minutes=90):
"""
Estrae nuovi dati dal database per l'addestramento del modello
"""
try:
logging.info(f"Estrazione dei dati degli ultimi {window_minutes} minuti...")
query = text("""
SELECT *
FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window MINUTE)
""")
df = pd.read_sql(query, engine, params={"window": window_minutes})
logging.info(f"Estratti {len(df)} record dalla tabella Esterna")
return df
except Exception as e:
logging.error(f"Errore nell'estrazione dei dati: {e}")
return pd.DataFrame()
def save_model_timestamp():
"""
Salva il timestamp dell'ultimo addestramento del modello
"""
try:
engine = connect_to_database()
if not engine:
return False
with engine.connect() as conn:
# Crea la tabella se non esiste
create_table_query = text("""
CREATE TABLE IF NOT EXISTS model_metadata (
id INT AUTO_INCREMENT PRIMARY KEY,
model_name VARCHAR(50) NOT NULL,
last_trained TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
model_path VARCHAR(255),
UNIQUE KEY unique_model (model_name)
)
""")
conn.execute(create_table_query)
# Aggiorna o inserisci il timestamp
upsert_query = text("""
INSERT INTO model_metadata (model_name, last_trained, model_path)
VALUES ('ensemble', NOW(), :model_path)
ON DUPLICATE KEY UPDATE last_trained = NOW(), model_path = :model_path
""")
conn.execute(upsert_query, {"model_path": ENSEMBLE_MODEL_PATH})
logging.info("Timestamp di addestramento del modello salvato con successo")
return True
except Exception as e:
logging.error(f"Errore nel salvare il timestamp di addestramento: {e}")
return False
def needs_training(force_training=False):
"""
Verifica se il modello deve essere riaddestrato (ogni 12 ore)
"""
if force_training:
logging.info("Riaddestramento forzato richiesto.")
return True
try:
engine = connect_to_database()
if not engine:
return True
with engine.connect() as conn:
# Verifica se la tabella esiste
try:
query = text("""
SELECT last_trained
FROM model_metadata
WHERE model_name = 'ensemble'
""")
result = conn.execute(query).fetchone()
if not result:
logging.info("Nessun dato di addestramento precedente trovato, riaddestramento necessario")
return True
last_trained = result[0]
now = datetime.now()
# Se l'ultimo addestramento è più vecchio di 12 ore, riaddestra
hours_diff = (now - last_trained).total_seconds() / 3600
if hours_diff >= 12:
logging.info(f"Ultimo addestramento: {last_trained}, {hours_diff:.1f} ore fa. Riaddestramento necessario")
return True
else:
logging.info(f"Ultimo addestramento: {last_trained}, {hours_diff:.1f} ore fa. Riaddestramento non necessario")
return False
except Exception as e:
logging.warning(f"Errore nel controllo della tabella model_metadata: {e}")
return True
except Exception as e:
logging.error(f"Errore nel verificare se il modello deve essere riaddestrato: {e}")
return True
def update_last_training_time():
"""
Aggiorna il timestamp dell'ultimo addestramento
"""
with open(LAST_TRAINING_PATH, 'w') as f:
f.write(datetime.now().isoformat())
def train_models(X):
"""
Addestra i modelli di anomalia sui dati forniti
"""
start_progress_tracking("addestramento modelli", 4)
logging.info("Addestramento modelli in corso...")
try:
# Addestra Isolation Forest
update_progress(1, details=f"Addestramento Isolation Forest su {X.shape[0]} record con {X.shape[1]} feature")
if_model = IsolationForest(
n_estimators=200,
contamination=0.05, # Aumentato da 0.02 a 0.05 (5% di anomalie attese)
random_state=42,
n_jobs=-1 # Usa tutti i core disponibili
)
if_model.fit(X)
logging.info("Isolation Forest addestrato con successo")
# Addestra Local Outlier Factor
update_progress(2, details="Addestramento Local Outlier Factor")
lof_model = LocalOutlierFactor(
n_neighbors=20,
contamination=0.02,
novelty=True, # Deve essere True per predire nuovi dati
n_jobs=-1
)
lof_model.fit(X)
logging.info("Local Outlier Factor addestrato con successo")
# Addestra One-Class SVM
update_progress(3, details="Addestramento One-Class SVM")
svm_model = OneClassSVM(
kernel='rbf',
gamma='auto',
nu=0.02 # Simile a contamination
)
svm_model.fit(X)
logging.info("One-Class SVM addestrato con successo")
# Salva i modelli
update_progress(4, details="Salvataggio modelli su disco")
dump(if_model, IF_MODEL_PATH)
dump(lof_model, LOF_MODEL_PATH)
dump(svm_model, SVM_MODEL_PATH)
# Definisci pesi per l'ensemble (da ottimizzare con validazione)
ensemble_weights = {
'isolation_forest': 0.40,
'lof': 0.35,
'svm': 0.25
}
dump(ensemble_weights, ENSEMBLE_MODEL_PATH)
logging.info("Tutti i modelli salvati con successo")
end_progress_tracking()
return if_model, lof_model, svm_model, ensemble_weights
except Exception as e:
end_progress_tracking(success=False)
logging.error(f"Errore nell'addestramento dei modelli: {e}")
return None, None, None, None
def test_database_connection(conn_string=None):
"""
Testa la connessione al database e verifica la struttura della tabella Fibra
"""
try:
if not conn_string:
# Usa le stesse credenziali di connect_to_database
db_user = os.environ.get('MYSQL_USER', 'root')
db_password = os.environ.get('MYSQL_PASSWORD', 'Hdgtejskjjc0-')
db_host = os.environ.get('MYSQL_HOST', 'localhost')
db_name = os.environ.get('MYSQL_DATABASE', 'LOG_MIKROTIK')
conn_string = f"mysql+mysqlconnector://{db_user}:{db_password}@{db_host}/{db_name}"
logging.debug(f"Test di connessione al database con: {conn_string}")
# Test connessione di base
engine = create_engine(conn_string)
with engine.connect() as conn:
# Verifica connessione di base
result = conn.execute(text("SELECT 1")).fetchone()
if result and result[0] == 1:
logging.debug("Test connessione di base superato!")
# Elenca tabelle disponibili
tables = conn.execute(text("SHOW TABLES")).fetchall()
logging.debug(f"Tabelle disponibili: {[t[0] for t in tables]}")
# Verifica struttura tabella Fibra
columns = conn.execute(text("SHOW COLUMNS FROM Fibra")).fetchall()
column_names = [c[0] for c in columns]
logging.debug(f"Struttura tabella Fibra: {column_names}")
# Conta record nella tabella
count = conn.execute(text("SELECT COUNT(*) FROM Fibra")).fetchone()[0]
logging.debug(f"Tabella Fibra contiene {count} record")
# Ottieni un esempio
sample = conn.execute(text("SELECT * FROM Fibra LIMIT 1")).fetchone()
logging.debug(f"Esempio di dati da Fibra: {sample}")
return engine
else:
logging.error("Test di connessione al database fallito")
return None
except Exception as e:
logging.error(f"Errore nel test di connessione al database: {e}")
traceback.print_exc()
return None
def extract_data_sample(engine, table='Esterna', limit=5):
"""
Estrae un campione di dati dalla tabella specificata
"""
try:
query = text(f"SELECT * FROM {table} LIMIT :limit")
sample = pd.read_sql(query, engine, params={"limit": limit})
return sample
except Exception as e:
logging.error(f"Errore nell'estrazione del campione di dati: {e}")
return pd.DataFrame()
def extract_data_for_training(engine, window_hours=12, max_records=100000, batch_size=10000):
"""
Estrae dati per l'addestramento del modello con una finestra temporale
Implementa un sistema di paginazione per evitare blocchi con grandi dataset
Args:
engine: Connessione al database
window_hours: Finestra temporale in ore per estrarre i dati
max_records: Numero massimo di record da estrarre in totale
batch_size: Dimensione di ogni batch
"""
try:
start_progress_tracking(f"estrazione dati ({window_hours} ore)", 100)
update_progress(1, details=f"Preparazione query con limite {max_records} record")
logging.info(f"Estrazione dei dati delle ultime {window_hours} ore (max {max_records} record)...")
# Query di conteggio per stimare il totale
count_query = text("""
SELECT COUNT(*) AS total
FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
""")
total_count = 0
with engine.connect() as conn:
result = conn.execute(count_query, {"window": window_hours}).fetchone()
total_count = result[0] if result else 0
# Limita al massimo specificato
total_count = min(total_count, max_records)
logging.info(f"Trovati {total_count} record totali, ne estraggo al massimo {max_records}")
# Estrae dati in batch
offset = 0
all_data = []
batches_extracted = 0
while offset < total_count:
curr_batch_size = min(batch_size, total_count - offset)
percent_complete = (offset / total_count) * 100 if total_count > 0 else 0
update_progress(int(percent_complete),
details=f"Estrazione batch {batches_extracted+1} ({offset}/{total_count} record, {percent_complete:.1f}%)")
query = text("""
SELECT *
FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
ORDER BY ID ASC
LIMIT :batch_size OFFSET :offset
""")
batch_data = pd.read_sql(query, engine, params={
"window": window_hours,
"batch_size": curr_batch_size,
"offset": offset
})
if batch_data.empty:
logging.warning(f"Batch vuoto ricevuto al offset {offset}, interrompo estrazione")
break
batches_extracted += 1
logging.info(f"Estratto batch {batches_extracted}: {len(batch_data)} record")
all_data.append(batch_data)
offset += len(batch_data)
# Aggiorna lo stato di avanzamento
update_progress(int((offset / total_count) * 100),
details=f"Completato batch {batches_extracted} ({offset}/{total_count} record)")
# Controlla se abbiamo raggiunto il massimo
if offset >= max_records:
logging.info(f"Raggiunto il limite massimo di {max_records} record")
break
# Ferma dopo 10 batch per sicurezza se non abbiamo ancora raggiunto il totale
if batches_extracted >= 10 and offset < total_count:
logging.warning(f"Fermata estrazione dopo {batches_extracted} batch per evitare sovraccarico")
break
# Combina tutti i batch in un unico DataFrame
if all_data:
df = pd.concat(all_data, ignore_index=True)
logging.info(f"Estratti {len(df)} record in totale tramite {batches_extracted} batch")
update_progress(100, details=f"Estrazione completata: {len(df)} record in {batches_extracted} batch")
else:
df = pd.DataFrame()
logging.warning("Nessun dato estratto!")
update_progress(100, details="Estrazione completata: nessun dato trovato")
end_progress_tracking()
return df
except Exception as e:
end_progress_tracking(success=False)
logging.error(f"Errore nell'estrazione dei dati per l'addestramento: {e}")
import traceback
logging.error(traceback.format_exc())
return pd.DataFrame()
def process_text_features(df):
"""
Processa le feature testuali utilizzando TF-IDF
"""
try:
logging.info("Elaborazione delle caratteristiche testuali...")
# Messaggio1 contiene il protocollo
if 'Messaggio1' in df.columns:
df['Protocollo'] = df['Messaggio1'].fillna('')
else:
df['Protocollo'] = ''
# Preparazione del vettorizzatore TF-IDF per il protocollo
tfidf_vectorizer = TfidfVectorizer(max_features=50, stop_words='english')
X_protocol = tfidf_vectorizer.fit_transform(df['Protocollo'])
# Converti in DataFrame
tfidf_df = pd.DataFrame(
X_protocol.toarray(),
columns=[f'protocol_tfidf_{i}' for i in range(X_protocol.shape[1])],
index=df.index
)
# Concatena con il dataframe originale
result_df = pd.concat([df, tfidf_df], axis=1)
logging.info(f"Feature testuali estratte con successo: {tfidf_df.shape[1]} nuove colonne")
return result_df, tfidf_vectorizer
except Exception as e:
logging.error(f"Errore nell'elaborazione delle caratteristiche testuali: {e}")
return df, None
def process_categorical_features(df):
"""
Processa le feature categoriche
"""
try:
logging.info("Elaborazione delle caratteristiche categoriche...")
categorical_features = {}
# One-hot encoding per 'Host' se presente
if 'Host' in df.columns:
host_dummies = pd.get_dummies(df['Host'], prefix='host', dummy_na=True)
categorical_features['host'] = host_dummies.columns.tolist()
df = pd.concat([df, host_dummies], axis=1)
# Encoding per IP_Attaccante se presente
if 'IP_Attaccante' in df.columns and df['IP_Attaccante'].notna().any():
from category_encoders import HashingEncoder
he = HashingEncoder(cols=['IP_Attaccante'], n_components=15)
ip_att_encoded = he.fit_transform(df[['IP_Attaccante']].fillna('unknown'))
categorical_features['IP_Attaccante'] = ip_att_encoded.columns.tolist()
df = pd.concat([df, ip_att_encoded], axis=1)
logging.info("Caratteristiche categoriche elaborate con successo.")
return df, categorical_features
except Exception as e:
logging.error(f"Errore nell'elaborazione delle caratteristiche categoriche: {e}")
return df, {}
def prepare_data_for_model(df):
"""
Prepara i dati per il modello
"""
try:
total_steps = 4 # Numero di passi principali
start_progress_tracking("preparazione dati", total_steps)
logging.info("Preparazione dei dati per il modello...")
# Assicurati che il DataFrame non sia vuoto
if df.empty:
logging.error("DataFrame vuoto. Impossibile preparare i dati per il modello.")
end_progress_tracking(success=False)
return None, None
# Prepara le feature temporali
update_progress(1, details=f"Estrazione caratteristiche temporali per {len(df)} record")
df = extract_time_features(df)
# Processa le feature testuali
update_progress(2, details="Elaborazione caratteristiche testuali")
df, text_vectorizer = process_text_features(df)
# Processa le feature categoriche
update_progress(3, details="Elaborazione caratteristiche categoriche")
df, categorical_features = process_categorical_features(df)
# Seleziona le colonne e finalizza
update_progress(4, details="Selezione feature e finalizzazione")
# Seleziona le colonne numeriche per il modello
numeric_cols = [col for col in df.columns if df[col].dtype in ['int64', 'float64']]
# Seleziona le colonne dummy (one-hot)
dummy_cols = []
for cols in categorical_features.values():
dummy_cols.extend(cols)
# Seleziona le colonne tfidf
tfidf_cols = [col for col in df.columns if col.startswith('protocol_tfidf_')]
# Combina tutte le colonne
feature_cols = numeric_cols + dummy_cols + tfidf_cols
# Rimuovi colonne non necessarie per il modello
excluded_cols = ['ID', 'Data', 'Ora', 'Timestamp', 'Host', 'IndirizzoIP',
'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4',
'IP_Attaccante', 'IP_Attaccato', 'Porta_Attaccante', 'Porta_Attaccato',
'Protocollo']
feature_cols = [col for col in feature_cols if col not in excluded_cols]
logging.debug(f"Feature utilizzate per il modello: {feature_cols}")
# Crea DataFrame delle feature
X = df[feature_cols].copy()
# Gestisci eventuali valori mancanti
X.fillna(0, inplace=True)
# Crea un oggetto preprocessore che contiene tutte le informazioni necessarie
preprocessor = {
'feature_columns': feature_cols,
'text_vectorizer': text_vectorizer,
'categorical_features': categorical_features
}
# Salva il preprocessor su disco
try:
logging.info(f"Salvataggio preprocessor in {PREPROCESSOR_PATH}")
dump(preprocessor, PREPROCESSOR_PATH)
logging.info("Preprocessor salvato con successo")
except Exception as e:
logging.error(f"Errore nel salvataggio del preprocessor: {e}")
logging.info(f"Dati preparati con successo: {X.shape[0]} esempi, {X.shape[1]} feature")
end_progress_tracking()
return X, preprocessor
except Exception as e:
end_progress_tracking(success=False)
logging.error(f"Errore nella preparazione dei dati: {e}")
import traceback
logging.error(traceback.format_exc())
return None, None
def main():
# Parsing degli argomenti da linea di comando
parser = argparse.ArgumentParser(description='Analisi comportamentale del traffico di rete')
parser.add_argument('--force-training', action='store_true', help='Forza il riaddestramento del modello indipendentemente da quando è stato addestrato l\'ultima volta')
parser.add_argument('--test', action='store_true', help='Esegue un test delle funzionalità senza addestramento o predizione')
parser.add_argument('--time-window', type=str, default='12 HOUR', help='Finestra temporale per l\'estrazione dei dati (es. "12 HOUR", "30 MINUTE")')
parser.add_argument('--max-records', type=int, default=100000, help='Numero massimo di record da estrarre (default: 100000)')
parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione di ciascun batch per l\'estrazione (default: 10000)')
args = parser.parse_args()
# Stampa le opzioni
logging.info(f"Opzioni: force_training={args.force_training}, test={args.test}, time_window={args.time_window}, max_records={args.max_records}")
# Se in modalità test, esegui solo le funzioni di diagnostica
if args.test:
logging.info("MODALITÀ TEST - esecuzione diagnostica")
# Test di connessione al database
engine = test_database_connection()
if not engine:
logging.error("Test database fallito - impossibile continuare")
sys.exit(1)
# Test di estrazione dati
logging.info("Test: estrazione di un campione di dati...")
sample_df = extract_data_sample(engine)
logging.info(f"Test di estrazione dati riuscito. Colonne: {sample_df.columns.tolist()}")
if not sample_df.empty:
logging.info(f"Esempio record: {sample_df.iloc[0].to_dict()}")
# Test percorsi modelli
logging.info("Test: controllo percorsi per i modelli...")
logging.info(f"MODEL_DIR esiste: {os.path.exists(MODEL_DIR)}")
# Test sistema di tracciamento progressi
logging.info("Test: sistema di tracciamento progressi")
start_progress_tracking("test tracciamento", 5)
for i in range(5):
update_progress(i+1, details=f"Passo di test {i+1}")
time.sleep(2) # Simula un'operazione che richiede tempo
end_progress_tracking()
logging.info("Modalità test completata")
sys.exit(0)
# Ottieni una connessione al database
engine = connect_to_database()
if not engine:
logging.error("Impossibile connettersi al database. Verificare le configurazioni e riprovare.")
sys.exit(1)
# Controlla se è necessario riaddestramento
if needs_training(args.force_training):
start_progress_tracking("processo di addestramento completo", 3)
update_progress(1, details="Verifica necessità di addestramento")
logging.info("Inizio addestramento dei modelli...")
# Estrai dati per addestramento
update_progress(details="Configurazione parametri di estrazione dati")
time_window_value = int(args.time_window.split(' ')[0])
time_window_unit = args.time_window.split(' ')[1]
if time_window_unit.upper() == 'HOUR':
df = extract_data_for_training(engine, time_window_value, args.max_records, args.batch_size)
elif time_window_unit.upper() == 'MINUTE':
df = extract_data_for_training(engine, time_window_value / 60, args.max_records, args.batch_size)
else:
logging.error(f"Unità di tempo non supportata: {time_window_unit}")
end_progress_tracking(success=False)
sys.exit(1)
if df.empty:
logging.error("Nessun dato estratto per l'addestramento. Verifica la connessione al database.")
end_progress_tracking(success=False)
sys.exit(1)
update_progress(2, details="Preparazione dati per il modello")
# Prepara i dati
X, preprocessor = prepare_data_for_model(df)
if X is None:
logging.error("Errore nella preparazione dei dati. Impossibile addestrare i modelli.")
end_progress_tracking(success=False)
sys.exit(1)
update_progress(3, details="Addestramento modelli")
# Addestra i modelli
models = train_models(X)
if all(m is not None for m in models):
logging.info("Addestramento completato con successo!")
# Aggiorna timestamp dell'ultimo addestramento
save_model_timestamp()
update_last_training_time()
end_progress_tracking()
else:
logging.error("Errore nell'addestramento dei modelli.")
end_progress_tracking(success=False)
sys.exit(1)
else:
logging.info("Addestramento non necessario, utilizzo dei modelli esistenti.")
logging.info("Analisi completata con successo.")
if __name__ == "__main__":
main()