Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
163 lines
6.5 KiB
Python
163 lines
6.5 KiB
Python
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
from joblib import load
|
|
import logging
|
|
import gc
|
|
import os
|
|
|
|
# Configurazione del logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# 1. Caricamento del modello e degli oggetti di preprocessing
|
|
logging.info("Caricamento del modello e degli oggetti di preprocessing...")
|
|
model = load('isolation_forest_model.joblib')
|
|
he_host = load('hashing_encoder_host.joblib')
|
|
he_ip = load('hashing_encoder_ip.joblib')
|
|
vectorizer = load('tfidf_vectorizer.joblib')
|
|
logging.info("Caricamento completato.")
|
|
|
|
# 2. Connessione al database
|
|
logging.info("Connessione al database...")
|
|
engine = create_engine('mysql+mysqlconnector://root:Hdgtejskjjc0-@localhost/LOG_MIKROTIK') # Sostituisci 'password' con la tua password effettiva
|
|
logging.info("Connessione stabilita.")
|
|
|
|
# 3. Lettura di last_analyzed_id
|
|
try:
|
|
with open('last_analyzed_id.txt', 'r') as f:
|
|
last_analyzed_id = int(f.read().strip())
|
|
except FileNotFoundError:
|
|
last_analyzed_id = 0
|
|
|
|
# 4. Processamento in blocchi
|
|
max_records = 10000 # Numero massimo di record per blocco
|
|
|
|
while True:
|
|
logging.info(f"Estrazione dei nuovi dati a partire da ID > {last_analyzed_id}...")
|
|
query = f"""
|
|
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4
|
|
FROM Esterna
|
|
WHERE ID > {last_analyzed_id}
|
|
ORDER BY ID ASC
|
|
LIMIT {max_records}
|
|
"""
|
|
new_data = pd.read_sql(query, engine)
|
|
logging.info(f"Dati estratti: {len(new_data)} record.")
|
|
|
|
if new_data.empty:
|
|
logging.info("Nessun nuovo dato da analizzare.")
|
|
break # Esci dal ciclo
|
|
|
|
# Aggiorna last_analyzed_id
|
|
last_analyzed_id = new_data['ID'].max()
|
|
with open('last_analyzed_id.txt', 'w') as f:
|
|
f.write(str(last_analyzed_id))
|
|
|
|
# 5. Preprocessing dei nuovi dati
|
|
new_data['Data'] = pd.to_datetime(new_data['Data'], errors='coerce')
|
|
new_data['Ora'] = pd.to_timedelta(new_data['Ora'].astype(str), errors='coerce')
|
|
new_data.dropna(subset=['Data', 'Ora'], inplace=True)
|
|
new_data['Timestamp'] = new_data['Data'] + new_data['Ora']
|
|
|
|
# Salva le colonne per output dettagliato
|
|
anomalies_details = new_data[['ID', 'Timestamp', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4']].copy()
|
|
|
|
# Verifica che 'Timestamp' sia presente e non abbia valori mancanti
|
|
if 'Timestamp' not in anomalies_details.columns or anomalies_details['Timestamp'].isnull().any():
|
|
logging.error("'Timestamp' non è presente o contiene valori mancanti.")
|
|
print("Controlla il DataFrame anomalies_details:")
|
|
print(anomalies_details.head())
|
|
continue # Salta al prossimo blocco
|
|
|
|
# Unione dei messaggi
|
|
new_data['Messaggio'] = new_data[['Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4']].fillna('').agg(' '.join, axis=1)
|
|
new_data.drop(columns=['Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4', 'Data', 'Ora'], inplace=True)
|
|
gc.collect()
|
|
|
|
# 6. Codifica delle variabili categoriali
|
|
logging.info("Codifica delle variabili categoriali...")
|
|
if 'Host' in new_data.columns:
|
|
X_host = he_host.transform(new_data['Host'].astype(str))
|
|
else:
|
|
logging.error("'Host' non è presente nel DataFrame.")
|
|
X_host = pd.DataFrame()
|
|
if 'IndirizzoIP' in new_data.columns:
|
|
X_ip = he_ip.transform(new_data['IndirizzoIP'].astype(str))
|
|
else:
|
|
logging.error("'IndirizzoIP' non è presente nel DataFrame.")
|
|
X_ip = pd.DataFrame()
|
|
|
|
new_data.drop(columns=['Host', 'IndirizzoIP'], inplace=True)
|
|
gc.collect()
|
|
|
|
# 7. Trasformazione TF-IDF
|
|
logging.info("Trasformazione dei messaggi con TF-IDF...")
|
|
X_messages = vectorizer.transform(new_data['Messaggio'])
|
|
new_data.drop(columns=['Messaggio'], inplace=True)
|
|
gc.collect()
|
|
|
|
# 8. Creazione del DataFrame delle caratteristiche
|
|
logging.info("Creazione del DataFrame delle caratteristiche...")
|
|
from scipy.sparse import hstack
|
|
from scipy import sparse
|
|
|
|
# Converti X_host e X_ip in matrici sparse e assicurati che i tipi siano compatibili
|
|
X_host_sparse = sparse.csr_matrix(X_host).astype('float64')
|
|
X_ip_sparse = sparse.csr_matrix(X_ip).astype('float64')
|
|
X_messages = X_messages.astype('float64')
|
|
|
|
X_new = hstack([X_host_sparse, X_ip_sparse, X_messages]).tocsr()
|
|
del X_host, X_ip, X_host_sparse, X_ip_sparse, X_messages
|
|
gc.collect()
|
|
|
|
# 9. Predizione delle anomalie
|
|
logging.info("Inizio predizione delle anomalie...")
|
|
new_data['anomaly_score'] = model.decision_function(X_new)
|
|
new_data['anomaly'] = model.predict(X_new)
|
|
|
|
# 10. Gestione delle anomalie
|
|
anomalies = new_data[new_data['anomaly'] == -1].copy() # Copia per evitare SettingWithCopyWarning
|
|
|
|
if not anomalies.empty:
|
|
logging.info(f"Anomalie rilevate nel blocco corrente: {len(anomalies)}")
|
|
|
|
# Assicurati che 'ID' sia dello stesso tipo in entrambi i DataFrame
|
|
anomalies['ID'] = anomalies['ID'].astype(int)
|
|
anomalies_details['ID'] = anomalies_details['ID'].astype(int)
|
|
|
|
# Esegui il merge specificando i suffissi
|
|
anomalies = anomalies.merge(anomalies_details, on='ID', how='left', suffixes=('', '_details'))
|
|
|
|
# Scegli quale 'Timestamp' utilizzare (da anomalies_details)
|
|
anomalies['Timestamp'] = anomalies['Timestamp_details']
|
|
anomalies.drop(columns=['Timestamp_details'], inplace=True)
|
|
|
|
# Unione dei messaggi per output
|
|
anomalies['Messaggio'] = anomalies[['Messaggio1','Messaggio2','Messaggio3','Messaggio4']].fillna('').agg(' '.join, axis=1)
|
|
anomalies.drop(columns=['Messaggio1','Messaggio2','Messaggio3','Messaggio4'], inplace=True)
|
|
|
|
# Seleziona le colonne da visualizzare
|
|
output_columns = ['ID', 'Timestamp', 'Host', 'IndirizzoIP', 'anomaly_score', 'Messaggio']
|
|
|
|
# Verifica che tutte le colonne siano presenti
|
|
missing_columns = [col for col in output_columns if col not in anomalies.columns]
|
|
if missing_columns:
|
|
logging.error(f"Le seguenti colonne mancano nel DataFrame anomalies: {missing_columns}")
|
|
print("Colonne disponibili in anomalies:", anomalies.columns)
|
|
continue # Salta al prossimo blocco
|
|
|
|
# Ordina le anomalie per punteggio
|
|
anomalies = anomalies.sort_values(by='anomaly_score')
|
|
|
|
# Stampa le anomalie
|
|
print(anomalies[output_columns].to_string(index=False))
|
|
|
|
else:
|
|
logging.info("Nessuna anomalia rilevata nel blocco corrente.")
|
|
|
|
# Rilascia memoria
|
|
del new_data, X_new, anomalies_details, anomalies
|
|
gc.collect()
|