ids.alfacom.it/extracted_idf/analisys_ultra_optimized.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

615 lines
22 KiB
Python

#!/usr/bin/env python3
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.feature_extraction.text import TfidfVectorizer
from joblib import dump, load
import logging
import gc
import os
import time
from datetime import datetime, timedelta
import numpy as np
import argparse
import sys
import traceback
import threading
import psutil
import warnings
import signal
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import multiprocessing
warnings.filterwarnings('ignore')
# Configurazione del logging ultra-ottimizzata
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('analisys_debug.log')
]
)
# Configurazione del database
DB_USER = os.environ.get('DB_USER', 'root')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
# Cartella per i modelli
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
try:
os.makedirs(MODEL_DIR, exist_ok=True)
except Exception as e:
logging.error(f"Errore nella creazione della directory models: {e}")
MODEL_DIR = '.'
# Percorsi dei modelli
IF_MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
LOF_MODEL_PATH = os.path.join(MODEL_DIR, 'lof.joblib')
SVM_MODEL_PATH = os.path.join(MODEL_DIR, 'svm.joblib')
ENSEMBLE_MODEL_PATH = os.path.join(MODEL_DIR, 'ensemble_weights.joblib')
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
# Parametri ultra-ottimizzati
TRAINING_FREQUENCY_HOURS = 12
MAX_MEMORY_USAGE = 80
CHUNK_SIZE = 5000
MAX_TRAINING_SAMPLES = 100000 # Ridotto per velocità
MIN_TRAINING_SAMPLES = 500
QUERY_TIMEOUT = 300 # 5 minuti timeout per query
CONNECTION_TIMEOUT = 30 # 30 secondi timeout connessione
# Colori per output
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
END = '\033[0m'
def log_phase(message):
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n")
logging.info(f"FASE: {message}")
def log_result(message):
print(f"{Colors.BLUE}{message}{Colors.END}")
logging.info(f"RISULTATO: {message}")
def log_warning(message):
print(f"{Colors.YELLOW}{message}{Colors.END}")
logging.warning(message)
def log_error(message):
print(f"{Colors.RED}{message}{Colors.END}")
logging.error(message)
# Variabili globali per timeout
interrupted = False
def signal_handler(signum, frame):
global interrupted
interrupted = True
log_warning("Interruzione ricevuta, terminando...")
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)
def check_memory_usage():
"""Controlla l'utilizzo della memoria"""
memory_percent = psutil.virtual_memory().percent
if memory_percent > MAX_MEMORY_USAGE:
log_warning(f"Memoria alta: {memory_percent}%. Forzando garbage collection...")
gc.collect()
return True
return False
def connect_to_database_ultra():
"""Connessione database ultra-ottimizzata con timeout"""
try:
log_phase("Connessione database ultra-ottimizzata")
connection_string = f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
# Configurazione ultra-ottimizzata
engine = create_engine(
connection_string,
pool_size=3, # Ridotto per velocità
max_overflow=5,
pool_recycle=1800,
pool_pre_ping=True,
pool_timeout=CONNECTION_TIMEOUT,
echo=False,
connect_args={
'charset': 'utf8mb4',
'use_unicode': True,
'autocommit': True, # True per velocità
'connection_timeout': CONNECTION_TIMEOUT,
'sql_mode': 'TRADITIONAL'
}
)
# Test connessione con timeout
with engine.connect() as conn:
result = conn.execute(text("SELECT 1")).fetchone()
if result[0] == 1:
log_result("Connessione database stabilita")
return engine
return None
except Exception as e:
log_error(f"Errore connessione database: {e}")
return None
def extract_data_ultra_fast(engine, window_hours=12, max_records=100000, batch_size=5000):
"""Estrazione dati ultra-veloce con timeout e query ottimizzate"""
try:
log_phase(f"Estrazione ultra-veloce - ultimi {window_hours} ore")
# Query di conteggio veloce con LIMIT per evitare scan completo
count_query = text("""
SELECT COUNT(*) FROM (
SELECT 1 FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
LIMIT :max_limit
) as subq
""")
total_count = 0
try:
with engine.connect() as conn:
# Timeout per la query
result = conn.execute(count_query, {
"window": window_hours,
"max_limit": max_records * 2
}).fetchone()
total_count = result[0] if result else 0
except Exception as e:
log_error(f"Errore nel conteggio: {e}")
return pd.DataFrame()
if total_count == 0:
log_warning("Nessun dato trovato")
return pd.DataFrame()
# Limita al massimo
total_count = min(total_count, max_records)
log_result(f"Trovati {total_count} record, estraendo max {max_records}")
# Estrazione diretta con LIMIT per velocità massima
if total_count <= max_records:
log_result("Estrazione diretta con query singola")
# Query ultra-ottimizzata - solo colonne essenziali
query = text("""
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3
FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
ORDER BY ID DESC
LIMIT :max_records
""")
try:
df = pd.read_sql(query, engine, params={
"window": window_hours,
"max_records": max_records
})
log_result(f"Estratti {len(df)} record in modalità diretta")
return df
except Exception as e:
log_error(f"Errore nell'estrazione diretta: {e}")
return pd.DataFrame()
else:
# Campionamento casuale per dataset grandi
log_warning(f"Dataset grande, usando campionamento casuale")
query = text("""
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3
FROM Esterna
WHERE TIMESTAMP(Data, Ora) >= DATE_SUB(NOW(), INTERVAL :window HOUR)
AND RAND() < 0.5
ORDER BY RAND()
LIMIT :max_records
""")
try:
df = pd.read_sql(query, engine, params={
"window": window_hours,
"max_records": max_records
})
log_result(f"Estratti {len(df)} record con campionamento")
return df
except Exception as e:
log_error(f"Errore nel campionamento: {e}")
return pd.DataFrame()
except Exception as e:
log_error(f"Errore generale nell'estrazione: {e}")
return pd.DataFrame()
def prepare_data_ultra_fast(df):
"""Preparazione dati ultra-veloce"""
try:
log_phase("Preparazione dati ultra-veloce")
if df.empty:
log_error("DataFrame vuoto")
return None, None
# Feature engineering minimalista per massima velocità
feature_data = {}
# 1. Feature temporali base (9 feature)
if 'Data' in df.columns and 'Ora' in df.columns:
try:
df['Data'] = pd.to_datetime(df['Data'], errors='coerce')
df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce')
df['Timestamp'] = df['Data'] + df['Ora']
feature_data['hour_of_day'] = df['Timestamp'].dt.hour.fillna(0)
feature_data['day_of_week'] = df['Timestamp'].dt.dayofweek.fillna(0)
except:
feature_data['hour_of_day'] = np.zeros(len(df))
feature_data['day_of_week'] = np.zeros(len(df))
else:
feature_data['hour_of_day'] = np.zeros(len(df))
feature_data['day_of_week'] = np.zeros(len(df))
# Aggiungi 7 feature temporali semplici
for i in range(7):
feature_data[f'time_feature_{i}'] = np.zeros(len(df))
# 2. Feature protocollo semplici (21 feature)
if 'Messaggio1' in df.columns:
proto_data = df['Messaggio1'].fillna('').astype(str)
# Protocolli comuni
protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'HTTPS', 'SSH', 'FTP', 'DNS', 'SMTP', 'POP3']
for i, protocol in enumerate(protocols[:10]):
feature_data[f'protocol_{i}'] = proto_data.str.contains(protocol, case=False).astype(int)
# Riempi rimanenti 11 feature
for i in range(10, 21):
feature_data[f'protocol_{i}'] = np.zeros(len(df))
else:
for i in range(21):
feature_data[f'protocol_{i}'] = np.zeros(len(df))
# 3. Feature Host semplici (2 feature)
if 'Host' in df.columns:
feature_data['host_fibra'] = df['Host'].fillna('').str.contains('FIBRA', case=False).astype(int)
feature_data['host_empty'] = df['Host'].isna().astype(int)
else:
feature_data['host_fibra'] = np.zeros(len(df))
feature_data['host_empty'] = np.zeros(len(df))
# 4. Feature IP semplici (15 feature)
if 'Messaggio2' in df.columns:
ip_data = df['Messaggio2'].str.split(':').str[0].fillna('unknown')
# Hash semplice per IP
for i in range(15):
feature_data[f'ip_hash_{i}'] = pd.util.hash_array(ip_data.values) % (2**(i+5)) / (2**(i+5))
else:
for i in range(15):
feature_data[f'ip_hash_{i}'] = np.zeros(len(df))
# 5. Riempi fino a 125 feature
current_features = len(feature_data)
remaining = 125 - current_features
for i in range(remaining):
feature_data[f'extra_{i}'] = np.zeros(len(df))
# Crea array numpy direttamente
X = np.column_stack([feature_data[col] for col in sorted(feature_data.keys())])
# Preprocessor minimalista
preprocessor = {
'feature_columns': sorted(feature_data.keys()),
'text_vectorizer': None,
'categorical_features': {}
}
# Salva preprocessor
try:
dump(preprocessor, PREPROCESSOR_PATH)
log_result(f"Preprocessor salvato: {X.shape[1]} feature")
except Exception as e:
log_warning(f"Errore salvataggio preprocessor: {e}")
log_result(f"Dati preparati: {X.shape[0]} esempi, {X.shape[1]} feature")
return X, preprocessor
except Exception as e:
log_error(f"Errore nella preparazione dati: {e}")
return None, None
def train_models_ultra_fast(X):
"""Addestramento ultra-veloce"""
try:
log_phase("Addestramento modelli ultra-veloce")
if X.shape[0] < MIN_TRAINING_SAMPLES:
log_error(f"Troppo pochi campioni: {X.shape[0]} < {MIN_TRAINING_SAMPLES}")
return None, None, None, None
# Campionamento aggressivo per velocità
if X.shape[0] > MAX_TRAINING_SAMPLES:
log_warning(f"Campionamento da {X.shape[0]} a {MAX_TRAINING_SAMPLES} esempi")
indices = np.random.choice(X.shape[0], MAX_TRAINING_SAMPLES, replace=False)
X_train = X[indices]
else:
X_train = X
log_result(f"Addestramento su {X_train.shape[0]} esempi")
# 1. Isolation Forest ultra-veloce
log_result("Addestramento Isolation Forest...")
if_model = IsolationForest(
n_estimators=50, # Ridotto drasticamente
contamination=0.05,
random_state=42,
n_jobs=-1,
max_samples=min(1000, X_train.shape[0]), # Limita campioni
max_features=0.8
)
if_model.fit(X_train)
log_result("✓ Isolation Forest completato")
# 2. LOF ultra-veloce (solo su campione piccolo)
log_result("Addestramento LOF...")
lof_sample_size = min(10000, X_train.shape[0])
if X_train.shape[0] > lof_sample_size:
lof_indices = np.random.choice(X_train.shape[0], lof_sample_size, replace=False)
lof_sample = X_train[lof_indices]
else:
lof_sample = X_train
lof_model = LocalOutlierFactor(
n_neighbors=min(10, lof_sample.shape[0] // 20),
contamination=0.05,
novelty=True,
n_jobs=-1
)
lof_model.fit(lof_sample)
log_result("✓ LOF completato")
# 3. SVM ultra-veloce (solo su campione molto piccolo)
log_result("Addestramento SVM...")
svm_sample_size = min(5000, X_train.shape[0])
if X_train.shape[0] > svm_sample_size:
svm_indices = np.random.choice(X_train.shape[0], svm_sample_size, replace=False)
svm_sample = X_train[svm_indices]
else:
svm_sample = X_train
svm_model = OneClassSVM(
kernel='rbf',
gamma='scale',
nu=0.05
)
svm_model.fit(svm_sample)
log_result("✓ SVM completato")
# 4. Salvataggio veloce
log_result("Salvataggio modelli...")
try:
dump(if_model, IF_MODEL_PATH)
dump(lof_model, LOF_MODEL_PATH)
dump(svm_model, SVM_MODEL_PATH)
ensemble_weights = {
'isolation_forest': 0.60, # Peso maggiore per IF
'lof': 0.25,
'svm': 0.15
}
dump(ensemble_weights, ENSEMBLE_MODEL_PATH)
log_result("✓ Tutti i modelli salvati")
except Exception as e:
log_error(f"Errore nel salvataggio: {e}")
return None, None, None, None
return if_model, lof_model, svm_model, ensemble_weights
except Exception as e:
log_error(f"Errore nell'addestramento: {e}")
return None, None, None, None
def save_model_timestamp():
"""Salva timestamp addestramento"""
try:
engine = connect_to_database_ultra()
if not engine:
return False
with engine.connect() as conn:
create_table_query = text("""
CREATE TABLE IF NOT EXISTS model_metadata (
id INT AUTO_INCREMENT PRIMARY KEY,
model_name VARCHAR(50) NOT NULL,
last_trained TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
model_path VARCHAR(255),
UNIQUE KEY unique_model (model_name)
)
""")
conn.execute(create_table_query)
upsert_query = text("""
INSERT INTO model_metadata (model_name, last_trained, model_path)
VALUES ('ensemble', NOW(), :model_path)
ON DUPLICATE KEY UPDATE last_trained = NOW(), model_path = :model_path
""")
conn.execute(upsert_query, {"model_path": ENSEMBLE_MODEL_PATH})
log_result("Timestamp salvato")
return True
except Exception as e:
log_warning(f"Errore salvataggio timestamp: {e}")
return False
def needs_training(force_training=False):
"""Verifica necessità addestramento"""
if force_training:
log_result("Riaddestramento forzato")
return True
try:
engine = connect_to_database_ultra()
if not engine:
return True
with engine.connect() as conn:
query = text("""
SELECT last_trained FROM model_metadata WHERE model_name = 'ensemble'
""")
result = conn.execute(query).fetchone()
if not result:
log_result("Nessun addestramento precedente")
return True
last_trained = result[0]
now = datetime.now()
hours_diff = (now - last_trained).total_seconds() / 3600
if hours_diff >= TRAINING_FREQUENCY_HOURS:
log_result(f"Ultimo addestramento: {hours_diff:.1f} ore fa - necessario")
return True
else:
log_result(f"Ultimo addestramento: {hours_diff:.1f} ore fa - non necessario")
return False
except Exception as e:
log_warning(f"Errore verifica: {e}")
return True
def test_database_connection():
"""Test connessione veloce"""
try:
engine = connect_to_database_ultra()
if not engine:
return False
with engine.connect() as conn:
result = conn.execute(text("SELECT 1")).fetchone()
if result and result[0] == 1:
log_result("Test database superato")
# Verifica tabella Esterna
try:
count = conn.execute(text("SELECT COUNT(*) FROM Esterna LIMIT 1")).fetchone()[0]
log_result(f"Tabella Esterna accessibile")
except:
log_error("Tabella Esterna non accessibile")
return False
return True
return False
except Exception as e:
log_error(f"Errore test: {e}")
return False
def main():
"""Funzione principale ultra-ottimizzata"""
parser = argparse.ArgumentParser(description='Addestramento ultra-veloce per grandi volumi')
parser.add_argument('--force-training', action='store_true', help='Forza riaddestramento')
parser.add_argument('--test', action='store_true', help='Modalità test')
parser.add_argument('--time-window', type=int, default=12, help='Finestra temporale ore (default: 12)')
parser.add_argument('--max-records', type=int, default=100000, help='Max record (default: 100k)')
parser.add_argument('--batch-size', type=int, default=5000, help='Batch size (default: 5k)')
parser.add_argument('--debug', action='store_true', help='Debug logging')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
log_phase("Sistema addestramento ULTRA-VELOCE")
log_result(f"Config: {args.time_window}h, max {args.max_records}, batch {args.batch_size}")
start_time = time.time()
# Test veloce
if args.test:
log_phase("Test veloce")
if test_database_connection():
log_result("Test OK")
sys.exit(0)
else:
log_error("Test FAILED")
sys.exit(1)
# Test connessione
if not test_database_connection():
log_error("Database non raggiungibile")
sys.exit(1)
# Verifica necessità
if not needs_training(args.force_training):
log_result("Addestramento non necessario")
sys.exit(0)
try:
# Connessione
engine = connect_to_database_ultra()
if not engine:
log_error("Connessione fallita")
sys.exit(1)
# Estrazione ultra-veloce
df = extract_data_ultra_fast(engine, args.time_window, args.max_records, args.batch_size)
if df.empty:
log_error("Nessun dato estratto")
sys.exit(1)
# Preparazione ultra-veloce
X, preprocessor = prepare_data_ultra_fast(df)
if X is None:
log_error("Preparazione dati fallita")
sys.exit(1)
# Addestramento ultra-veloce
models = train_models_ultra_fast(X)
if all(m is not None for m in models):
log_phase("SUCCESSO!")
# Salva timestamp
save_model_timestamp()
# Statistiche finali
elapsed = time.time() - start_time
memory_usage = psutil.virtual_memory().percent
log_result(f"Tempo totale: {elapsed:.1f} secondi")
log_result(f"Memoria finale: {memory_usage:.1f}%")
log_result(f"Campioni addestramento: {X.shape[0]}")
log_result(f"Feature utilizzate: {X.shape[1]}")
else:
log_error("Addestramento fallito")
sys.exit(1)
except Exception as e:
log_error(f"Errore generale: {e}")
logging.error(traceback.format_exc())
sys.exit(1)
if __name__ == "__main__":
main()