ids.alfacom.it/extracted_idf/analisys_optimized.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

910 lines
35 KiB
Python

#!/usr/bin/env python3
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.feature_extraction.text import TfidfVectorizer
from joblib import dump, load
import logging
import gc
import os
import time
from datetime import datetime, timedelta
import numpy as np
import argparse
import sys
import traceback
import threading
import psutil
import warnings
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report
import multiprocessing
warnings.filterwarnings('ignore')
# Configurazione del logging ottimizzata
logging.basicConfig(
level=logging.INFO, # Cambiato da DEBUG a INFO per performance
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('analisys_debug.log')
]
)
# Configurazione del database
DB_USER = os.environ.get('DB_USER', 'root')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
# Cartella per i modelli con gestione errori ottimizzata
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
try:
os.makedirs(MODEL_DIR, exist_ok=True)
logging.debug(f"Directory models creata/verificata: {MODEL_DIR}")
except Exception as e:
logging.error(f"Errore nella creazione della directory models: {e}")
MODEL_DIR = os.path.join(os.getcwd(), 'models')
try:
os.makedirs(MODEL_DIR, exist_ok=True)
logging.debug(f"Directory models creata come fallback in: {MODEL_DIR}")
except Exception as e2:
logging.error(f"Impossibile creare la directory models: {e2}")
MODEL_DIR = '.'
# Percorsi dei modelli
IF_MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
LOF_MODEL_PATH = os.path.join(MODEL_DIR, 'lof.joblib')
SVM_MODEL_PATH = os.path.join(MODEL_DIR, 'svm.joblib')
ENSEMBLE_MODEL_PATH = os.path.join(MODEL_DIR, 'ensemble_weights.joblib')
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
ACCUMULATED_DATA_PATH = os.path.join(MODEL_DIR, 'accumulated_data.pkl')
LAST_TRAINING_PATH = os.path.join(MODEL_DIR, 'last_training.txt')
# Parametri di configurazione ottimizzati
TRAINING_FREQUENCY_HOURS = 12
CONTINUOUS_LEARNING = True
MAX_MEMORY_USAGE = 85 # Percentuale massima di memoria utilizzabile
CHUNK_SIZE = 10000 # Dimensione chunk per elaborazione
MAX_TRAINING_SAMPLES = 500000 # Limite massimo campioni per addestramento
MIN_TRAINING_SAMPLES = 1000 # Minimo campioni necessari
# Colori per output
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def log_phase(message):
"""Evidenzia una nuova fase principale dell'esecuzione"""
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n")
logging.info(f"FASE: {message}")
def log_result(message):
"""Evidenzia un risultato importante"""
print(f"{Colors.BLUE}{message}{Colors.END}")
logging.info(f"RISULTATO: {message}")
def log_warning(message):
"""Evidenzia un avviso importante"""
print(f"{Colors.YELLOW}{message}{Colors.END}")
logging.warning(message)
def log_error(message):
"""Evidenzia un errore importante"""
print(f"{Colors.RED}{message}{Colors.END}")
logging.error(message)
# Variabili globali per il tracciamento dell'avanzamento
progress_status = {
'in_progress': False,
'operation': '',
'start_time': None,
'current_step': 0,
'total_steps': 0,
'details': '',
'last_update': 0
}
def check_memory_usage():
"""Controlla l'utilizzo della memoria e forza garbage collection se necessario"""
memory_percent = psutil.virtual_memory().percent
if memory_percent > MAX_MEMORY_USAGE:
logging.warning(f"Utilizzo memoria alto: {memory_percent}%. Forzando garbage collection...")
gc.collect()
return True
return False
def start_progress_tracking(operation, total_steps=100):
"""Inizia il tracciamento di un'operazione lunga"""
global progress_status
progress_status['in_progress'] = True
progress_status['operation'] = operation
progress_status['start_time'] = time.time()
progress_status['current_step'] = 0
progress_status['total_steps'] = total_steps
progress_status['details'] = 'Inizializzazione...'
progress_status['last_update'] = 0
threading.Thread(target=progress_reporter, daemon=True).start()
logging.info(f"Avvio: {operation} (totale passi previsti: {total_steps})")
def update_progress(step=None, increment=1, details=''):
"""Aggiorna lo stato di avanzamento di un'operazione lunga"""
global progress_status
if not progress_status['in_progress']:
return
if step is not None:
progress_status['current_step'] = step
else:
progress_status['current_step'] += increment
if details:
progress_status['details'] = details
if details and (time.time() - progress_status['last_update']) > 5:
report_progress()
def end_progress_tracking(success=True):
"""Termina il tracciamento dell'avanzamento"""
global progress_status
if not progress_status['in_progress']:
return
elapsed = time.time() - progress_status['start_time']
if success:
logging.info(f"Completato: {progress_status['operation']} in {elapsed:.1f} secondi")
else:
logging.error(f"Fallito: {progress_status['operation']} dopo {elapsed:.1f} secondi")
progress_status['in_progress'] = False
progress_status['current_step'] = progress_status['total_steps']
report_progress(force=True)
def report_progress(force=False):
"""Riporta lo stato di avanzamento attuale"""
global progress_status
if not progress_status['in_progress'] and not force:
return
current_time = time.time()
if not force and (current_time - progress_status['last_update']) < 30:
return
elapsed = current_time - progress_status['start_time']
percent = (progress_status['current_step'] / progress_status['total_steps']) * 100 if progress_status['total_steps'] > 0 else 0
remaining = "N/A"
if percent > 0:
remaining_seconds = (elapsed / percent) * (100 - percent)
if remaining_seconds < 60:
remaining = f"{remaining_seconds:.0f} secondi"
elif remaining_seconds < 3600:
remaining = f"{remaining_seconds/60:.1f} minuti"
else:
remaining = f"{remaining_seconds/3600:.1f} ore"
memory_usage = psutil.virtual_memory().percent
message = f"""
{Colors.BOLD}======== PROGRESSO ADDESTRAMENTO ========{Colors.END}
Operazione: {progress_status['operation']}
Completamento: {percent:.1f}%
Tempo trascorso: {elapsed:.1f} secondi
Tempo rimanente: {remaining}
Memoria utilizzata: {memory_usage:.1f}%
Dettagli: {progress_status['details']}
{Colors.BOLD}=========================================={Colors.END}
"""
print(message)
logging.info(f"Progresso {progress_status['operation']}: {percent:.1f}% - {progress_status['details']}")
progress_status['last_update'] = current_time
def progress_reporter():
"""Thread che riporta periodicamente i progressi"""
while progress_status['in_progress']:
report_progress()
time.sleep(5)
def extract_time_features_optimized(df):
"""Estrae caratteristiche temporali dai dati in modo ottimizzato"""
try:
logging.info("Estrazione caratteristiche temporali ottimizzata...")
# Converti timestamp in modo vettorizzato
if 'Timestamp' not in df.columns and 'Data' in df.columns and 'Ora' in df.columns:
df['Data'] = pd.to_datetime(df['Data'], errors='coerce')
df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce')
df['Timestamp'] = df['Data'] + df['Ora']
if 'Timestamp' in df.columns:
df['hour_of_day'] = df['Timestamp'].dt.hour
df['day_of_week'] = df['Timestamp'].dt.dayofweek
else:
df['hour_of_day'] = 0
df['day_of_week'] = 0
# Estrazione IP ottimizzata
update_progress(details="Estrazione IP e porte (ottimizzata)")
if 'Messaggio2' in df.columns:
# Usa operazioni vettorizzate per performance
df['IP_Attaccante'] = df['Messaggio2'].str.split(':').str[0]
df['Porta_Attaccante'] = df['Messaggio2'].str.split(':').str[1]
if 'Messaggio3' in df.columns:
df['IP_Attaccato'] = df['Messaggio3'].str.split(':').str[0]
df['Porta_Attaccato'] = df['Messaggio3'].str.split(':').str[1]
# Feature temporali semplificate per performance
df['time_since_last'] = 0
df['events_last_hour'] = 0
df['events_last_day'] = 0
df['time_since_last_mean'] = 0
df['time_since_last_std'] = 0
df['time_since_last_min'] = 0
df['time_since_last_max'] = 0
df['events_last_hour_max'] = 0
df['events_last_day_max'] = 0
# Calcolo statistiche solo per campione rappresentativo per velocità
if len(df) > 10000:
sample_df = df.sample(n=10000, random_state=42)
logging.info("Usando campione di 10k record per statistiche temporali")
else:
sample_df = df
# Calcolo statistiche aggregate per IP (versione ottimizzata)
if 'IP_Attaccante' in sample_df.columns:
ip_stats = sample_df.groupby('IP_Attaccante').agg({
'Timestamp': ['count', 'min', 'max']
}).fillna(0)
# Applica statistiche al dataset completo (approssimazione)
ip_counts = df['IP_Attaccante'].value_counts()
df['events_last_hour'] = df['IP_Attaccante'].map(ip_counts).fillna(0)
df['events_last_day'] = df['events_last_hour'] # Semplificazione
logging.info("Caratteristiche temporali estratte con successo (ottimizzate)")
return df
except Exception as e:
logging.error(f"Errore nell'estrazione delle caratteristiche temporali: {e}")
return df
def connect_to_database():
"""Connette al database MySQL con configurazione ottimizzata"""
try:
logging.info("Connessione al database...")
connection_string = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
# Configurazione ottimizzata per addestramento
engine = create_engine(
connection_string,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
pool_pre_ping=True,
pool_timeout=30,
echo=False,
connect_args={
'charset': 'utf8mb4',
'use_unicode': True,
'autocommit': False, # False per transazioni di addestramento
'sql_mode': 'TRADITIONAL'
}
)
# Test connessione
with engine.connect() as conn:
conn.execute(text("SELECT 1")).fetchone()
return engine
except Exception as e:
logging.error(f"Errore nella connessione al database: {e}")
return None
def extract_data_for_training_optimized(engine, window_hours=12, max_records=500000, batch_size=20000):
"""Estrae dati per l'addestramento in modo ottimizzato per grandi volumi"""
try:
start_progress_tracking(f"estrazione dati addestramento ({window_hours} ore)", 100)
log_phase(f"Estrazione dati per addestramento - ultimi {window_hours} ore")
# Query di conteggio ottimizzata
count_query = text("""
SELECT COUNT(*) AS total
FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
""")
total_count = 0
with engine.connect() as conn:
result = conn.execute(count_query, {"window": window_hours}).fetchone()
total_count = result[0] if result else 0
if total_count == 0:
log_warning("Nessun dato trovato per l'addestramento")
end_progress_tracking(success=False)
return pd.DataFrame()
# Limita al massimo specificato
total_count = min(total_count, max_records)
log_result(f"Trovati {total_count} record, estraendo max {max_records}")
# Estrazione ottimizzata con campionamento se necessario
if total_count > max_records:
# Usa campionamento casuale per dataset molto grandi
sample_rate = max_records / total_count
log_warning(f"Dataset molto grande, usando campionamento {sample_rate:.2%}")
query = text("""
SELECT *
FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
AND RAND() < :sample_rate
ORDER BY RAND()
LIMIT :max_records
""")
update_progress(50, details="Estrazione con campionamento casuale")
df = pd.read_sql(query, engine, params={
"window": window_hours,
"sample_rate": sample_rate,
"max_records": max_records
})
else:
# Estrazione normale con batching
offset = 0
all_data = []
while offset < total_count:
curr_batch_size = min(batch_size, total_count - offset)
percent_complete = (offset / total_count) * 100
update_progress(int(percent_complete),
details=f"Estrazione batch {offset//batch_size + 1} ({offset}/{total_count} record)")
# Controllo memoria
if check_memory_usage():
batch_size = max(5000, batch_size // 2)
log_warning(f"Memoria alta, riducendo batch size a {batch_size}")
query = text("""
SELECT *
FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
ORDER BY ID DESC
LIMIT :batch_size OFFSET :offset
""")
batch_data = pd.read_sql(query, engine, params={
"window": window_hours,
"batch_size": curr_batch_size,
"offset": offset
})
if batch_data.empty:
break
all_data.append(batch_data)
offset += len(batch_data)
# Garbage collection periodico
if len(all_data) % 5 == 0:
gc.collect()
# Combina tutti i batch
if all_data:
df = pd.concat(all_data, ignore_index=True)
del all_data # Libera memoria
gc.collect()
else:
df = pd.DataFrame()
update_progress(100, details=f"Estrazione completata: {len(df)} record")
log_result(f"Estratti {len(df)} record per addestramento")
end_progress_tracking()
return df
except Exception as e:
end_progress_tracking(success=False)
log_error(f"Errore nell'estrazione dei dati: {e}")
return pd.DataFrame()
def process_text_features_optimized(df):
"""Processa le feature testuali in modo ottimizzato"""
try:
logging.info("Elaborazione caratteristiche testuali ottimizzata...")
# Preparazione protocollo ottimizzata
if 'Messaggio1' in df.columns:
df['Protocollo'] = df['Messaggio1'].fillna('')
else:
df['Protocollo'] = ''
# TF-IDF ottimizzato con meno feature per velocità
tfidf_vectorizer = TfidfVectorizer(
max_features=21, # Ridotto per performance
stop_words='english',
ngram_range=(1, 1), # Solo unigrams per velocità
min_df=2, # Ignora termini troppo rari
max_df=0.95 # Ignora termini troppo comuni
)
# Campionamento per TF-IDF se dataset troppo grande
if len(df) > 50000:
sample_df = df.sample(n=50000, random_state=42)
logging.info("Usando campione di 50k record per TF-IDF")
X_protocol = tfidf_vectorizer.fit_transform(sample_df['Protocollo'])
# Applica al dataset completo
X_protocol_full = tfidf_vectorizer.transform(df['Protocollo'])
else:
X_protocol_full = tfidf_vectorizer.fit_transform(df['Protocollo'])
# Converti in DataFrame
tfidf_df = pd.DataFrame(
X_protocol_full.toarray(),
columns=[f'protocol_tfidf_{i}' for i in range(X_protocol_full.shape[1])],
index=df.index
)
# Concatena con il dataframe originale
result_df = pd.concat([df, tfidf_df], axis=1)
logging.info(f"Feature testuali estratte: {tfidf_df.shape[1]} colonne")
return result_df, tfidf_vectorizer
except Exception as e:
logging.error(f"Errore nell'elaborazione delle caratteristiche testuali: {e}")
return df, None
def process_categorical_features_optimized(df):
"""Processa le feature categoriche in modo ottimizzato"""
try:
logging.info("Elaborazione caratteristiche categoriche ottimizzata...")
categorical_features = {}
# One-hot encoding ottimizzato per 'Host'
if 'Host' in df.columns:
# Limita il numero di categorie per performance
top_hosts = df['Host'].value_counts().head(10).index
df['Host_simplified'] = df['Host'].where(df['Host'].isin(top_hosts), 'Other')
host_dummies = pd.get_dummies(df['Host_simplified'], prefix='host', dummy_na=True)
categorical_features['host'] = host_dummies.columns.tolist()
df = pd.concat([df, host_dummies], axis=1)
# Encoding ottimizzato per IP_Attaccante
if 'IP_Attaccante' in df.columns and df['IP_Attaccante'].notna().any():
# Usa hash encoding semplificato per performance
ip_data = df['IP_Attaccante'].fillna('unknown').astype(str)
# Hash encoding manuale per velocità
for i in range(15):
df[f'ip_hash_{i}'] = pd.util.hash_array(ip_data.values) % (2**(i+8)) / (2**(i+8))
categorical_features['IP_Attaccante'] = [f'ip_hash_{i}' for i in range(15)]
logging.info("Caratteristiche categoriche elaborate con successo")
return df, categorical_features
except Exception as e:
logging.error(f"Errore nell'elaborazione delle caratteristiche categoriche: {e}")
return df, {}
def prepare_data_for_model_optimized(df):
"""Prepara i dati per il modello in modo ottimizzato"""
try:
total_steps = 5
start_progress_tracking("preparazione dati ottimizzata", total_steps)
log_phase("Preparazione dati per modello (ottimizzata)")
if df.empty:
log_error("DataFrame vuoto")
end_progress_tracking(success=False)
return None, None
# 1. Feature temporali ottimizzate
update_progress(1, details=f"Caratteristiche temporali per {len(df)} record")
df = extract_time_features_optimized(df)
# 2. Feature testuali ottimizzate
update_progress(2, details="Caratteristiche testuali")
df, text_vectorizer = process_text_features_optimized(df)
# 3. Feature categoriche ottimizzate
update_progress(3, details="Caratteristiche categoriche")
df, categorical_features = process_categorical_features_optimized(df)
# 4. Selezione feature ottimizzata
update_progress(4, details="Selezione e pulizia feature")
# Seleziona colonne numeriche
numeric_cols = [col for col in df.columns if df[col].dtype in ['int64', 'float64']]
# Seleziona colonne dummy
dummy_cols = []
for cols in categorical_features.values():
dummy_cols.extend(cols)
# Seleziona colonne tfidf
tfidf_cols = [col for col in df.columns if col.startswith('protocol_tfidf_')]
# Combina tutte le colonne
feature_cols = numeric_cols + dummy_cols + tfidf_cols
# Rimuovi colonne non necessarie
excluded_cols = [
'ID', 'Data', 'Ora', 'Timestamp', 'Host', 'IndirizzoIP',
'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4',
'IP_Attaccante', 'IP_Attaccato', 'Porta_Attaccante', 'Porta_Attaccato',
'Protocollo', 'Host_simplified'
]
feature_cols = [col for col in feature_cols if col not in excluded_cols]
# Limita numero di feature per performance
if len(feature_cols) > 125:
feature_cols = feature_cols[:125]
log_warning(f"Limitando a 125 feature per performance")
# 5. Finalizzazione
update_progress(5, details="Finalizzazione dataset")
# Crea DataFrame delle feature
X = df[feature_cols].copy()
# Gestisci valori mancanti
X.fillna(0, inplace=True)
# Rimuovi feature con varianza zero
variance_threshold = 0.001
feature_variances = X.var()
valid_features = feature_variances[feature_variances > variance_threshold].index
X = X[valid_features]
# Aggiorna feature_cols
feature_cols = valid_features.tolist()
# Crea preprocessor
preprocessor = {
'feature_columns': feature_cols,
'text_vectorizer': text_vectorizer,
'categorical_features': categorical_features
}
# Salva preprocessor
try:
dump(preprocessor, PREPROCESSOR_PATH)
log_result(f"Preprocessor salvato: {len(feature_cols)} feature")
except Exception as e:
log_error(f"Errore nel salvataggio preprocessor: {e}")
log_result(f"Dati preparati: {X.shape[0]} esempi, {X.shape[1]} feature")
end_progress_tracking()
return X, preprocessor
except Exception as e:
end_progress_tracking(success=False)
log_error(f"Errore nella preparazione dei dati: {e}")
return None, None
def train_models_optimized(X):
"""Addestra i modelli in modo ottimizzato per grandi dataset"""
try:
start_progress_tracking("addestramento modelli ottimizzato", 4)
log_phase("Addestramento modelli ottimizzato")
if X.shape[0] < MIN_TRAINING_SAMPLES:
log_error(f"Troppo pochi campioni per addestramento: {X.shape[0]} < {MIN_TRAINING_SAMPLES}")
end_progress_tracking(success=False)
return None, None, None, None
# Campionamento se dataset troppo grande
if X.shape[0] > MAX_TRAINING_SAMPLES:
log_warning(f"Dataset molto grande ({X.shape[0]}), campionando {MAX_TRAINING_SAMPLES} esempi")
indices = np.random.choice(X.shape[0], MAX_TRAINING_SAMPLES, replace=False)
X_train = X.iloc[indices]
else:
X_train = X
log_result(f"Addestramento su {X_train.shape[0]} esempi con {X_train.shape[1]} feature")
# 1. Isolation Forest ottimizzato
update_progress(1, details=f"Isolation Forest su {X_train.shape[0]} campioni")
if_model = IsolationForest(
n_estimators=100, # Ridotto per velocità
contamination=0.05,
random_state=42,
n_jobs=-1,
max_samples='auto',
max_features=1.0
)
if_model.fit(X_train)
log_result("Isolation Forest addestrato")
# 2. Local Outlier Factor ottimizzato
update_progress(2, details="Local Outlier Factor")
# Limita campioni per LOF se troppi (LOF è O(n²))
if X_train.shape[0] > 50000:
lof_sample = X_train.sample(n=50000, random_state=42)
log_warning("LOF: usando campione di 50k per performance")
else:
lof_sample = X_train
lof_model = LocalOutlierFactor(
n_neighbors=min(20, lof_sample.shape[0] // 10),
contamination=0.05,
novelty=True,
n_jobs=-1
)
lof_model.fit(lof_sample)
log_result("Local Outlier Factor addestrato")
# 3. One-Class SVM ottimizzato
update_progress(3, details="One-Class SVM")
# Limita campioni per SVM se troppi (SVM è lento su grandi dataset)
if X_train.shape[0] > 20000:
svm_sample = X_train.sample(n=20000, random_state=42)
log_warning("SVM: usando campione di 20k per performance")
else:
svm_sample = X_train
svm_model = OneClassSVM(
kernel='rbf',
gamma='scale', # Cambiato da 'auto' per sklearn recenti
nu=0.05
)
svm_model.fit(svm_sample)
log_result("One-Class SVM addestrato")
# 4. Salvataggio modelli
update_progress(4, details="Salvataggio modelli")
try:
dump(if_model, IF_MODEL_PATH)
dump(lof_model, LOF_MODEL_PATH)
dump(svm_model, SVM_MODEL_PATH)
# Pesi ensemble ottimizzati
ensemble_weights = {
'isolation_forest': 0.50, # Peso maggiore per IF (più veloce)
'lof': 0.30,
'svm': 0.20
}
dump(ensemble_weights, ENSEMBLE_MODEL_PATH)
log_result("Tutti i modelli salvati con successo")
except Exception as e:
log_error(f"Errore nel salvataggio: {e}")
end_progress_tracking(success=False)
return None, None, None, None
end_progress_tracking()
return if_model, lof_model, svm_model, ensemble_weights
except Exception as e:
end_progress_tracking(success=False)
log_error(f"Errore nell'addestramento: {e}")
return None, None, None, None
def save_model_timestamp():
"""Salva il timestamp dell'ultimo addestramento"""
try:
engine = connect_to_database()
if not engine:
return False
with engine.connect() as conn:
create_table_query = text("""
CREATE TABLE IF NOT EXISTS model_metadata (
id INT AUTO_INCREMENT PRIMARY KEY,
model_name VARCHAR(50) NOT NULL,
last_trained TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
model_path VARCHAR(255),
training_samples INT DEFAULT 0,
feature_count INT DEFAULT 0,
UNIQUE KEY unique_model (model_name)
)
""")
conn.execute(create_table_query)
upsert_query = text("""
INSERT INTO model_metadata (model_name, last_trained, model_path, training_samples, feature_count)
VALUES ('ensemble', NOW(), :model_path, :samples, :features)
ON DUPLICATE KEY UPDATE
last_trained = NOW(),
model_path = :model_path,
training_samples = :samples,
feature_count = :features
""")
conn.execute(upsert_query, {
"model_path": ENSEMBLE_MODEL_PATH,
"samples": 0, # Sarà aggiornato dal chiamante
"features": 0 # Sarà aggiornato dal chiamante
})
conn.commit()
log_result("Timestamp addestramento salvato")
return True
except Exception as e:
log_error(f"Errore nel salvare timestamp: {e}")
return False
def needs_training(force_training=False):
"""Verifica se il modello deve essere riaddestrato"""
if force_training:
log_result("Riaddestramento forzato richiesto")
return True
try:
engine = connect_to_database()
if not engine:
return True
with engine.connect() as conn:
try:
query = text("""
SELECT last_trained, training_samples, feature_count
FROM model_metadata
WHERE model_name = 'ensemble'
""")
result = conn.execute(query).fetchone()
if not result:
log_result("Nessun addestramento precedente, riaddestramento necessario")
return True
last_trained, samples, features = result
now = datetime.now()
hours_diff = (now - last_trained).total_seconds() / 3600
if hours_diff >= TRAINING_FREQUENCY_HOURS:
log_result(f"Ultimo addestramento: {hours_diff:.1f} ore fa, riaddestramento necessario")
return True
else:
log_result(f"Ultimo addestramento: {hours_diff:.1f} ore fa, non necessario")
return False
except Exception as e:
log_warning(f"Errore nel controllo metadata: {e}")
return True
except Exception as e:
log_error(f"Errore nel verificare necessità addestramento: {e}")
return True
def test_database_connection():
"""Testa la connessione al database"""
try:
engine = connect_to_database()
if not engine:
return False
with engine.connect() as conn:
result = conn.execute(text("SELECT 1")).fetchone()
if result and result[0] == 1:
log_result("Test connessione database superato")
# Verifica tabelle
tables = conn.execute(text("SHOW TABLES")).fetchall()
table_names = [t[0] for t in tables]
if 'Esterna' in table_names:
count = conn.execute(text("SELECT COUNT(*) FROM Esterna")).fetchone()[0]
log_result(f"Tabella Esterna: {count} record")
else:
log_error("Tabella Esterna non trovata")
return False
return True
return False
except Exception as e:
log_error(f"Errore test connessione: {e}")
return False
def main():
"""Funzione principale ottimizzata"""
parser = argparse.ArgumentParser(description='Analisi comportamentale ottimizzata per grandi volumi')
parser.add_argument('--force-training', action='store_true', help='Forza riaddestramento')
parser.add_argument('--test', action='store_true', help='Modalità test')
parser.add_argument('--time-window', type=int, default=12, help='Finestra temporale in ore (default: 12)')
parser.add_argument('--max-records', type=int, default=500000, help='Max record per addestramento (default: 500k)')
parser.add_argument('--batch-size', type=int, default=20000, help='Dimensione batch (default: 20k)')
parser.add_argument('--debug', action='store_true', help='Abilita debug logging')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
log_phase("Avvio sistema di addestramento ottimizzato")
log_result(f"Configurazione: {args.time_window}h, max {args.max_records} record, batch {args.batch_size}")
# Modalità test
if args.test:
log_phase("Modalità test")
if not test_database_connection():
log_error("Test database fallito")
sys.exit(1)
log_result("Test completato con successo")
sys.exit(0)
# Test connessione
if not test_database_connection():
log_error("Impossibile connettersi al database")
sys.exit(1)
# Controllo necessità addestramento
if not needs_training(args.force_training):
log_result("Addestramento non necessario")
sys.exit(0)
try:
engine = connect_to_database()
if not engine:
log_error("Connessione database fallita")
sys.exit(1)
# Estrazione dati
df = extract_data_for_training_optimized(
engine,
args.time_window,
args.max_records,
args.batch_size
)
if df.empty:
log_error("Nessun dato estratto")
sys.exit(1)
# Preparazione dati
X, preprocessor = prepare_data_for_model_optimized(df)
if X is None:
log_error("Errore nella preparazione dati")
sys.exit(1)
# Addestramento
models = train_models_optimized(X)
if all(m is not None for m in models):
log_phase("Addestramento completato con successo")
# Salva timestamp
save_model_timestamp()
# Statistiche finali
memory_usage = psutil.virtual_memory().percent
log_result(f"Memoria finale utilizzata: {memory_usage:.1f}%")
log_result(f"Modelli addestrati su {X.shape[0]} campioni con {X.shape[1]} feature")
else:
log_error("Errore nell'addestramento dei modelli")
sys.exit(1)
except Exception as e:
log_error(f"Errore generale: {e}")
import traceback
logging.error(traceback.format_exc())
sys.exit(1)
if __name__ == "__main__":
main()