import pandas as pd from sqlalchemy import create_engine from joblib import load import logging import gc import os # Configurazione del logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) # 1. Caricamento del modello e degli oggetti di preprocessing logging.info("Caricamento del modello e degli oggetti di preprocessing...") model = load('isolation_forest_model.joblib') he_host = load('hashing_encoder_host.joblib') he_ip = load('hashing_encoder_ip.joblib') vectorizer = load('tfidf_vectorizer.joblib') logging.info("Caricamento completato.") # 2. Connessione al database logging.info("Connessione al database...") engine = create_engine('mysql+mysqlconnector://root:Hdgtejskjjc0-@localhost/LOG_MIKROTIK') # Sostituisci 'password' con la tua password effettiva logging.info("Connessione stabilita.") # 3. Lettura di last_analyzed_id try: with open('last_analyzed_id.txt', 'r') as f: last_analyzed_id = int(f.read().strip()) except FileNotFoundError: last_analyzed_id = 0 # 4. Processamento in blocchi max_records = 10000 # Numero massimo di record per blocco while True: logging.info(f"Estrazione dei nuovi dati a partire da ID > {last_analyzed_id}...") query = f""" SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4 FROM Esterna WHERE ID > {last_analyzed_id} ORDER BY ID ASC LIMIT {max_records} """ new_data = pd.read_sql(query, engine) logging.info(f"Dati estratti: {len(new_data)} record.") if new_data.empty: logging.info("Nessun nuovo dato da analizzare.") break # Esci dal ciclo # Aggiorna last_analyzed_id last_analyzed_id = new_data['ID'].max() with open('last_analyzed_id.txt', 'w') as f: f.write(str(last_analyzed_id)) # 5. Preprocessing dei nuovi dati new_data['Data'] = pd.to_datetime(new_data['Data'], errors='coerce') new_data['Ora'] = pd.to_timedelta(new_data['Ora'].astype(str), errors='coerce') new_data.dropna(subset=['Data', 'Ora'], inplace=True) new_data['Timestamp'] = new_data['Data'] + new_data['Ora'] # Salva le colonne per output dettagliato anomalies_details = new_data[['ID', 'Timestamp', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4']].copy() # Verifica che 'Timestamp' sia presente e non abbia valori mancanti if 'Timestamp' not in anomalies_details.columns or anomalies_details['Timestamp'].isnull().any(): logging.error("'Timestamp' non è presente o contiene valori mancanti.") print("Controlla il DataFrame anomalies_details:") print(anomalies_details.head()) continue # Salta al prossimo blocco # Unione dei messaggi new_data['Messaggio'] = new_data[['Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4']].fillna('').agg(' '.join, axis=1) new_data.drop(columns=['Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4', 'Data', 'Ora'], inplace=True) gc.collect() # 6. Codifica delle variabili categoriali logging.info("Codifica delle variabili categoriali...") if 'Host' in new_data.columns: X_host = he_host.transform(new_data['Host'].astype(str)) else: logging.error("'Host' non è presente nel DataFrame.") X_host = pd.DataFrame() if 'IndirizzoIP' in new_data.columns: X_ip = he_ip.transform(new_data['IndirizzoIP'].astype(str)) else: logging.error("'IndirizzoIP' non è presente nel DataFrame.") X_ip = pd.DataFrame() new_data.drop(columns=['Host', 'IndirizzoIP'], inplace=True) gc.collect() # 7. Trasformazione TF-IDF logging.info("Trasformazione dei messaggi con TF-IDF...") X_messages = vectorizer.transform(new_data['Messaggio']) new_data.drop(columns=['Messaggio'], inplace=True) gc.collect() # 8. Creazione del DataFrame delle caratteristiche logging.info("Creazione del DataFrame delle caratteristiche...") from scipy.sparse import hstack from scipy import sparse # Converti X_host e X_ip in matrici sparse e assicurati che i tipi siano compatibili X_host_sparse = sparse.csr_matrix(X_host).astype('float64') X_ip_sparse = sparse.csr_matrix(X_ip).astype('float64') X_messages = X_messages.astype('float64') X_new = hstack([X_host_sparse, X_ip_sparse, X_messages]).tocsr() del X_host, X_ip, X_host_sparse, X_ip_sparse, X_messages gc.collect() # 9. Predizione delle anomalie logging.info("Inizio predizione delle anomalie...") new_data['anomaly_score'] = model.decision_function(X_new) new_data['anomaly'] = model.predict(X_new) # 10. Gestione delle anomalie anomalies = new_data[new_data['anomaly'] == -1].copy() # Copia per evitare SettingWithCopyWarning if not anomalies.empty: logging.info(f"Anomalie rilevate nel blocco corrente: {len(anomalies)}") # Assicurati che 'ID' sia dello stesso tipo in entrambi i DataFrame anomalies['ID'] = anomalies['ID'].astype(int) anomalies_details['ID'] = anomalies_details['ID'].astype(int) # Esegui il merge specificando i suffissi anomalies = anomalies.merge(anomalies_details, on='ID', how='left', suffixes=('', '_details')) # Scegli quale 'Timestamp' utilizzare (da anomalies_details) anomalies['Timestamp'] = anomalies['Timestamp_details'] anomalies.drop(columns=['Timestamp_details'], inplace=True) # Unione dei messaggi per output anomalies['Messaggio'] = anomalies[['Messaggio1','Messaggio2','Messaggio3','Messaggio4']].fillna('').agg(' '.join, axis=1) anomalies.drop(columns=['Messaggio1','Messaggio2','Messaggio3','Messaggio4'], inplace=True) # Seleziona le colonne da visualizzare output_columns = ['ID', 'Timestamp', 'Host', 'IndirizzoIP', 'anomaly_score', 'Messaggio'] # Verifica che tutte le colonne siano presenti missing_columns = [col for col in output_columns if col not in anomalies.columns] if missing_columns: logging.error(f"Le seguenti colonne mancano nel DataFrame anomalies: {missing_columns}") print("Colonne disponibili in anomalies:", anomalies.columns) continue # Salta al prossimo blocco # Ordina le anomalie per punteggio anomalies = anomalies.sort_values(by='anomaly_score') # Stampa le anomalie print(anomalies[output_columns].to_string(index=False)) else: logging.info("Nessuna anomalia rilevata nel blocco corrente.") # Rilascia memoria del new_data, X_new, anomalies_details, anomalies gc.collect()