ids.alfacom.it/extracted_idf/detect_multi_fixed.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

1706 lines
72 KiB
Python

#!/usr/bin/env python3
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from joblib import load
import logging
import gc
import os
import time
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import ipaddress
import numpy as np
from sklearn.ensemble import IsolationForest
import threading
import argparse
import signal
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, as_completed
from category_encoders import HashingEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
# Configurazione del logging avanzata per il debug
logging.basicConfig(
level=logging.WARNING, # Modificato da INFO a WARNING per default
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('ddetect_debug.log')
]
)
# Aggiungi un altro handler per il file con livello più dettagliato
file_handler = logging.FileHandler('ddetect_full.log')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(file_handler)
# Configurazione del database
DB_USER = os.environ.get('DB_USER', 'root')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
# Percorsi dei file
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
os.makedirs(MODEL_DIR, exist_ok=True)
MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt')
LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt')
# Definizione dei livelli di rischio e soglie
RISK_LEVELS = {
'NORMALE': 0.1,
'BASSO': 0.3,
'MEDIO': 0.6,
'ALTO': 0.8,
'CRITICO': 0.95
}
logging.debug(f"Percorsi: MODEL_DIR={MODEL_DIR}, WHITELIST_PATH={WHITELIST_PATH}")
logging.debug(f"Livelli di rischio configurati: {RISK_LEVELS}")
# Colori ANSI per i messaggi
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def log_phase(message):
"""Evidenzia una nuova fase principale dell'esecuzione"""
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n")
logging.info(f"FASE: {message}")
def log_result(message):
"""Evidenzia un risultato importante"""
print(f"{Colors.BLUE}{message}{Colors.END}")
logging.info(f"RISULTATO: {message}")
def log_warning(message):
"""Evidenzia un avviso importante"""
print(f"{Colors.YELLOW}{message}{Colors.END}")
logging.warning(message)
def log_error(message):
"""Evidenzia un errore importante"""
print(f"{Colors.RED}{message}{Colors.END}")
logging.error(message)
# Variabili globali per il tracciamento dell'avanzamento
progress_counters = {
'ip_whitelisted': 0,
'ip_analyzed': 0,
'ip_normal': 0,
'ip_low': 0,
'ip_medium': 0,
'ip_high': 0,
'ip_critical': 0,
'metrics_processed': 0,
'last_update': 0,
'in_progress': False,
'operation': '',
'start_time': None
}
def reset_counters():
"""
Resetta i contatori per una nuova esecuzione
"""
global progress_counters
progress_counters['ip_whitelisted'] = 0
progress_counters['ip_analyzed'] = 0
progress_counters['ip_normal'] = 0
progress_counters['ip_low'] = 0
progress_counters['ip_medium'] = 0
progress_counters['ip_high'] = 0
progress_counters['ip_critical'] = 0
progress_counters['metrics_processed'] = 0
progress_counters['last_update'] = 0
progress_counters['in_progress'] = False
progress_counters['operation'] = ''
progress_counters['start_time'] = None
def start_progress_tracking(operation):
"""
Inizia il tracciamento dell'operazione
"""
global progress_counters
reset_counters()
progress_counters['in_progress'] = True
progress_counters['operation'] = operation
progress_counters['start_time'] = time.time()
# Avvia un thread per il reporting
threading.Thread(target=progress_reporter, daemon=True).start()
logging.info(f"Avvio monitoraggio operazione: {operation}")
def update_counter(counter_name, increment=1):
"""
Aggiorna un contatore specifico
"""
global progress_counters
if counter_name in progress_counters:
progress_counters[counter_name] += increment
def end_progress_tracking():
"""
Termina il tracciamento e mostra il report finale
"""
global progress_counters
if not progress_counters['in_progress']:
return
progress_counters['in_progress'] = False
report_progress(force=True)
logging.info(f"Monitoraggio completato per: {progress_counters['operation']}")
def report_progress(force=False):
"""
Riporta lo stato attuale dei contatori
"""
global progress_counters
if not progress_counters['in_progress'] and not force:
return
current_time = time.time()
if not force and (current_time - progress_counters['last_update']) < 10: # Aggiorna ogni 10 secondi
return
elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0
report = f"""
{Colors.BOLD}======== REPORT DI PROGRESSO - {progress_counters['operation']} ========{Colors.END}
Tempo trascorso: {elapsed:.1f} secondi
IP Whitelistati esclusi: {progress_counters['ip_whitelisted']}
Metriche elaborate: {progress_counters['metrics_processed']}
IP Analizzati: {progress_counters['ip_analyzed']}
Classificazione rischio:
- IP NORMALI: {progress_counters['ip_normal']}
- IP BASSI: {progress_counters['ip_low']}
- IP MEDI: {progress_counters['ip_medium']}
- IP ALTI: {progress_counters['ip_high']}
- IP CRITICI: {progress_counters['ip_critical']}
{Colors.BOLD}================================================================{Colors.END}
"""
print(report)
logging.info(report.replace(Colors.BOLD, '').replace(Colors.END, ''))
progress_counters['last_update'] = current_time
def progress_reporter():
"""
Thread che riporta periodicamente i progressi
"""
while progress_counters['in_progress']:
report_progress()
time.sleep(2) # Controlla ogni 2 secondi, ma riporta solo ogni 10
def test_database_connection():
"""
Test di connessione al database
"""
try:
logging.debug("Tentativo di connessione al database...")
engine = create_engine(CONN_STRING)
with engine.connect() as conn:
result = conn.execute(text("SELECT 1")).fetchone()
if result[0] == 1:
logging.debug("Test connessione al database riuscito!")
tables = conn.execute(text("SHOW TABLES")).fetchall()
logging.debug(f"Tabelle disponibili: {[t[0] for t in tables]}")
return True
return False
except Exception as e:
logging.error(f"Errore nel test di connessione al database: {e}")
return False
def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2):
"""
Crea una connessione al database con tentativi multipli
"""
for attempt in range(max_retries):
try:
# Configurazione ottimizzata per SQLAlchemy
engine = create_engine(
conn_string,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
pool_pre_ping=True,
pool_timeout=30,
echo=False,
isolation_level="READ COMMITTED"
)
# Test di connessione
with engine.connect() as conn:
conn.execute(text("SELECT 1")).fetchone()
logging.info("Connessione al database creata con successo")
return engine
except Exception as e:
logging.error(f"Tentativo {attempt+1} fallito: {e}")
if attempt < max_retries - 1:
logging.info(f"Nuovo tentativo tra {retry_delay} secondi...")
time.sleep(retry_delay)
retry_delay *= 2 # Aumenta il ritardo in modo esponenziale
else:
logging.error("Impossibile connettersi al database dopo tutti i tentativi")
raise
def load_models():
"""
Carica i modelli di rilevamento delle anomalie addestrati
"""
try:
# Carica il modello
logging.info(f"Caricamento modello da {MODEL_PATH}...")
if os.path.exists(MODEL_PATH):
model = load(MODEL_PATH)
logging.debug("Modello caricato con successo!")
else:
logging.error(f"File modello non trovato: {MODEL_PATH}")
return None, None
# Carica il preprocessor
try:
logging.info(f"Caricamento preprocessor da {PREPROCESSOR_PATH}...")
if os.path.exists(PREPROCESSOR_PATH):
preprocessor = load(PREPROCESSOR_PATH)
# Verifica che il preprocessor abbia la struttura attesa
if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor:
feature_count = len(preprocessor['feature_columns'])
if feature_count < 125:
logging.warning(f"Il modello si aspetta 125 feature, "
f"ma il preprocessor ne ha {feature_count}")
return model, preprocessor
else:
logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.")
else:
logging.error(f"File preprocessor non trovato: {PREPROCESSOR_PATH}")
except Exception as e:
logging.error(f"Errore nel caricamento del preprocessor: {e}")
# Se arriviamo qui, il preprocessor non è disponibile o non è valido
# Crea un preprocessor di fallback
preprocessor = {'feature_columns': [f'feature_{i}' for i in range(125)]}
return model, preprocessor
except Exception as e:
logging.error(f"Errore nel caricamento dei modelli: {e}")
return None, None
def load_whitelist(whitelist_path=None):
"""
Carica la whitelist da file
"""
if whitelist_path is None:
whitelist_path = '/root/whitelist.txt'
try:
if not os.path.exists(whitelist_path):
logging.warning(f"File whitelist non trovato: {whitelist_path}")
return {'exact_ips': set(), 'networks': []}
with open(whitelist_path, 'r') as f:
whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')]
exact_ips = set()
networks = []
for entry in whitelist_entries:
try:
if '/' in entry:
# È una rete
network = ipaddress.ip_network(entry, strict=False)
networks.append(network)
else:
# È un IP singolo
exact_ips.add(entry)
except Exception as e:
logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}")
whitelist = {
'exact_ips': exact_ips,
'networks': networks
}
logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.")
return whitelist
except Exception as e:
logging.error(f"Errore nel caricamento della whitelist: {e}")
return {'exact_ips': set(), 'networks': []}
def is_ip_whitelisted(ip, whitelist):
"""
Verifica se un IP è nella whitelist
"""
if pd.isna(ip) or not ip:
return False
try:
# Ottimizzazione: utilizziamo un set per la verifica di appartenenza diretta
if ip in whitelist.get('exact_ips', set()):
update_counter('ip_whitelisted')
return True
# Per le reti dobbiamo convertire l'IP in un oggetto
try:
ip_obj = ipaddress.ip_address(ip)
except ValueError:
logging.debug(f"IP non valido: {ip}")
return False
# Limitiamo la verifica a massimo 5000 reti per evitare blocchi
# Verifichiamo le reti solo se l'IP non è già stato trovato come esatto
for i, network in enumerate(whitelist.get('networks', [])):
if i >= 5000: # Limitiamo per evitare blocchi
# Usiamo una variabile globale per loggare questo warning solo una volta per IP
if not hasattr(is_ip_whitelisted, 'warned_ips'):
is_ip_whitelisted.warned_ips = set()
if ip not in is_ip_whitelisted.warned_ips:
logging.debug(f"Limite di 5000 reti raggiunto nella verifica whitelist per IP {ip}")
is_ip_whitelisted.warned_ips.add(ip)
# Limita la dimensione del set per evitare crescita incontrollata
if len(is_ip_whitelisted.warned_ips) > 100:
is_ip_whitelisted.warned_ips.clear()
break
if ip_obj in network:
update_counter('ip_whitelisted')
return True
return False
except Exception as e:
logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}")
return False
def load_last_analyzed_id():
"""
Carica l'ultimo ID analizzato dal file
"""
try:
if os.path.exists(LAST_ID_PATH):
with open(LAST_ID_PATH, 'r') as f:
last_id = int(f.read().strip())
return last_id
else:
logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.")
return 0
except Exception as e:
logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}")
return 0
def save_last_analyzed_id(last_id):
"""
Salva l'ultimo ID analizzato nel file
"""
try:
with open(LAST_ID_PATH, 'w') as f:
f.write(str(last_id))
logging.info(f"Ultimo ID analizzato salvato: {last_id}")
except Exception as e:
logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}")
def extract_data(engine, last_id=0, batch_size=1000, max_id=None):
"""
Estrae i dati dalla tabella Esterna per l'analisi
Ora supporta estrazione ottimizzata e soglia massima
"""
try:
# Se viene specificato max_id, limitiamo l'estrazione fino a quell'ID
# Utile per evitare di analizzare log molto vecchi e concentrarsi sugli ultimi
if max_id:
logging.info(f"Limitazione estrazione fino a ID {max_id}")
# Otteniamo il numero totale di record inserendo i parametri direttamente
# Questa è una soluzione che evita problemi di compatibilità con i segnaposti
if max_id:
query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id} AND ID <= {max_id}"
else:
query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id}"
with engine.connect() as conn:
count_result = conn.execute(text(query_count)).scalar()
if count_result == 0:
logging.info("Nessun nuovo record da estrarre")
return pd.DataFrame()
logging.info(f"Trovati {count_result} nuovi record da estrarre")
# Se ci sono troppi record (>50k), otteniamo solo i più recenti
effective_batch_size = batch_size
if count_result > 50000:
logging.warning(f"Troppi record in coda ({count_result}). Limitando l'analisi ai più recenti.")
# Troviamo l'ID limite per i record più recenti
with engine.connect() as conn:
latest_id_query = "SELECT MAX(ID) FROM Esterna"
latest_id = conn.execute(text(latest_id_query)).scalar()
if latest_id:
# Estrarre gli ultimi N record più recenti
max_id = latest_id
first_id_query = f"SELECT ID FROM Esterna WHERE ID <= {latest_id} ORDER BY ID DESC LIMIT {batch_size*5}"
result = conn.execute(text(first_id_query)).fetchall()
if result and len(result) > 0:
# Prendiamo l'ID più basso dei record recenti
first_recent_id = result[-1][0]
logging.info(f"Limitando l'analisi ai record con ID da {first_recent_id} a {latest_id}")
last_id = first_recent_id - 1
# Aggiorniamo il conteggio effettivo
count_result = latest_id - last_id
# Limitiamo il batch_size in base al conteggio effettivo
if count_result < effective_batch_size:
effective_batch_size = count_result
# Calcolo del numero di batch
num_batches = (count_result + effective_batch_size - 1) // effective_batch_size
logging.info(f"Estrazione suddivisa in {num_batches} batch di massimo {effective_batch_size} record ciascuno")
# Evitare estrazione su ID troppo elevati
max_extraction_limit = 100000 # Limita a 100k record per esecuzione
if count_result > max_extraction_limit:
logging.warning(f"Troppi record da analizzare ({count_result}). Limitando a {max_extraction_limit}.")
count_result = max_extraction_limit
# Prima verifichiamo quali colonne sono disponibili nella tabella Esterna
try:
with engine.connect() as conn:
# Ottieni informazioni sulle colonne
col_query = f"SHOW COLUMNS FROM Esterna"
columns_info = conn.execute(text(col_query)).fetchall()
available_columns = [col[0] for col in columns_info]
logging.info(f"Colonne disponibili nella tabella Esterna: {available_columns}")
except Exception as e:
logging.error(f"Errore nel verificare le colonne disponibili: {e}")
# Se non riusciamo a ottenere le colonne, usiamo un set predefinito di colonne base
available_columns = ['ID', 'Data', 'Ora', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4']
logging.info(f"Utilizzo colonne predefinite: {available_columns}")
# Per ogni batch, costruiamo ed eseguiamo la query inserendo
# i parametri direttamente nella stringa SQL e utilizzando solo le colonne disponibili
current_id = last_id
frames = []
# Costruiamo la lista di colonne per la query
# Assicuriamoci che ID sia sempre incluso
select_columns = ['ID']
# Aggiungiamo altre colonne solo se disponibili
for col in ['Data', 'Ora', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3', 'Messaggio4',
'Topic', 'RouterOS', 'RouterIP', 'RouterName']:
if col in available_columns:
select_columns.append(col)
# Costruiamo la stringa delle colonne
columns_str = ', '.join(select_columns)
for i in range(num_batches):
with engine.connect() as conn:
if max_id:
query = f"""
SELECT {columns_str}
FROM Esterna
WHERE ID > {current_id}
AND ID <= {max_id}
ORDER BY ID ASC
LIMIT {effective_batch_size}
"""
else:
query = f"""
SELECT {columns_str}
FROM Esterna
WHERE ID > {current_id}
ORDER BY ID ASC
LIMIT {effective_batch_size}
"""
logging.info(f"Estraendo batch {i+1}/{num_batches}: ID > {current_id}")
# Esecuzione della query senza parametri (sono già inseriti nella stringa)
result = conn.execute(text(query))
# Convertiamo il risultato in DataFrame
chunk = pd.DataFrame(result.fetchall(), columns=result.keys())
if chunk.empty:
break
# Aggiorna l'ID corrente per il prossimo batch
current_id = chunk['ID'].max()
# Aggiungiamo una colonna Timestamp calcolata se abbiamo Data e Ora
if 'Data' in chunk.columns and 'Ora' in chunk.columns:
try:
chunk['Data'] = pd.to_datetime(chunk['Data'], errors='coerce')
chunk['Ora'] = pd.to_timedelta(chunk['Ora'].astype(str), errors='coerce')
chunk['Timestamp'] = chunk['Data'] + chunk['Ora']
except Exception as e:
logging.warning(f"Impossibile creare colonna Timestamp: {e}")
# Accumula il chunk
frames.append(chunk)
# Feedback sull'avanzamento
logging.info(f"Estratti {len(chunk)} record, fino all'ID {current_id}")
# Combina tutti i frame
if not frames:
return pd.DataFrame()
result = pd.concat(frames, ignore_index=True)
logging.info(f"Estrazione completata: {len(result)} record totali")
return result
except Exception as e:
logging.error(f"Errore nell'estrazione dei dati: {e}")
import traceback
logging.error(traceback.format_exc())
return pd.DataFrame()
def prepare_data(df, preprocessor):
"""
Prepara i dati per il modello, generando tutte le feature necessarie
per garantire la compatibilità con ddetect_fixed.py e ridurre i placeholder
"""
try:
# Crea una copia esplicita del dataframe per evitare SettingWithCopyWarning
df = df.copy()
import numpy as np
# Numero atteso di feature (dal modello)
expected_features = 125
# Prepara un dizionario per tutte le feature
feature_data = {}
feature_count = 0
# 1. Aggiungi le caratteristiche temporali essenziali (9 feature)
time_features = [
'time_since_last', 'events_last_hour', 'events_last_day',
'time_since_last_mean', 'time_since_last_std', 'time_since_last_min',
'time_since_last_max', 'events_last_hour_max', 'events_last_day_max'
]
for feat in time_features:
if feat in df.columns:
feature_data[feat] = df[feat].fillna(0).values
else:
feature_data[feat] = np.zeros(len(df))
feature_count += 1
# 2. Estrai caratteristiche TF-IDF dal protocollo (21 feature)
if 'Messaggio1' in df.columns:
try:
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer(max_features=21)
proto_data = df['Messaggio1'].fillna('').astype(str)
# Fit e transform
tfidf_matrix = vectorizer.fit_transform(proto_data)
tfidf_features = tfidf_matrix.toarray()
# Aggiungi al feature data
for i in range(min(21, tfidf_features.shape[1])):
feature_data[f'protocol_tfidf_{i}'] = tfidf_features[:, i]
feature_count += 1
# Se abbiamo meno di 21 feature, riempi con zeri
for i in range(tfidf_features.shape[1], 21):
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
feature_count += 1
except Exception as e:
logging.error(f"Errore nell'estrazione TF-IDF: {e}")
# Fallback: aggiungi 21 feature vuote
for i in range(21):
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
feature_count += 1
else:
# Se non c'è Messaggio1, aggiungi 21 feature vuote
for i in range(21):
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
feature_count += 1
# 3. Aggiungi feature di Host (2 feature)
if 'Host' in df.columns:
feature_data['host_FIBRA'] = df['Host'].fillna('').str.contains('FIBRA').astype(int).values
feature_data['host_nan'] = df['Host'].isna().astype(int).values
else:
feature_data['host_FIBRA'] = np.zeros(len(df))
feature_data['host_nan'] = np.zeros(len(df))
feature_count += 2
# 4. Estrai e codifica IP e Host (15 feature)
# Estrai IP attaccante se non presente
if 'IP_Attaccante' not in df.columns and 'Messaggio2' in df.columns:
df['IP_Attaccante'] = df['Messaggio2'].apply(
lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None
)
# Usa HashingEncoder per IP e Host se disponibili
try:
hash_encoder = HashingEncoder(n_components=15)
# Prepara i dati per l'encoding
encode_cols = []
encode_data = []
if 'IP_Attaccante' in df.columns:
encode_cols.append('IP_Attaccante')
encode_data.append(df['IP_Attaccante'].fillna('unknown').astype(str))
if 'Host' in df.columns:
encode_cols.append('Host')
encode_data.append(df['Host'].fillna('unknown').astype(str))
if encode_cols:
# Combina i dati per l'encoding
encode_df = pd.DataFrame({
col: data for col, data in zip(encode_cols, encode_data)
}, index=df.index)
# Fai l'encoding
encoded = hash_encoder.fit_transform(encode_df)
# Aggiungi le feature codificate
for i in range(min(15, encoded.shape[1])):
feature_data[f'col_{i}'] = encoded.iloc[:, i].values
feature_count += 1
# Aggiungi colonne mancanti se necessario
for i in range(encoded.shape[1], 15):
feature_data[f'col_{i}'] = np.zeros(len(df))
feature_count += 1
else:
# Nessuna colonna da codificare, aggiungi feature vuote
for i in range(15):
feature_data[f'col_{i}'] = np.zeros(len(df))
feature_count += 1
except Exception as e:
logging.error(f"Errore nell'encoding delle colonne: {e}")
# Fallback: aggiungi 15 feature vuote
for i in range(15):
feature_data[f'col_{i}'] = np.zeros(len(df))
feature_count += 1
# 5. Caratteristiche aggiuntive da ddetect_fixed (36 feature = 15 + 21)
# Aggiungi 15 colonne additional_col
for i in range(15):
feature_data[f'additional_col_{i}'] = np.zeros(len(df))
feature_count += 1
# Aggiungi 21 colonne additional_tfidf
for i in range(21):
feature_data[f'additional_tfidf_{i}'] = np.zeros(len(df))
feature_count += 1
# 6. Genera colonne rimanenti per arrivare a 125
remaining = expected_features - feature_count
if remaining > 0:
placeholder_data = {}
for i in range(remaining):
feature_data[f'extra_col_{i}'] = np.zeros(len(df))
feature_count += 1
logging.info(f"Aggiunte {remaining} colonne extra per raggiungere {expected_features} feature")
# Controlla se abbiamo generato il numero corretto di feature
assert feature_count == expected_features, f"Numero di feature generate ({feature_count}) != attese ({expected_features})"
# Crea il DataFrame in un unico passaggio e converti in numpy array
X = pd.DataFrame(feature_data, index=df.index)
X_array = X.to_numpy()
# Verifica finale
logging.debug(f"Generate {feature_count} feature senza placeholder")
return X_array
except Exception as e:
logging.error(f"Errore nella preparazione dei dati: {e}")
import traceback
logging.error(f"Traceback: {traceback.format_exc()}")
return None
def predict_anomalies(model, features, sensitivity=5):
"""
Predice le anomalie utilizzando il modello caricato
Il parametro sensitivity (1-10) regola la sensibilità di rilevamento:
- 1: massima sensibilità (più falsi positivi)
- 10: minima sensibilità (più falsi negativi)
"""
try:
logging.debug(f"Predizione su {features.shape[0]} esempi con {features.shape[1]} feature (sensibilità: {sensitivity})")
# Verifica che il numero di feature corrisponda
if features.shape[1] != 125:
logging.error(f"Dimensione feature errata: trovate {features.shape[1]}, attese 125")
return np.zeros(features.shape[0]) # Fallback sicuro
# Aggiorna il contatore PRIMA di fare la predizione
update_counter('metrics_processed', features.shape[0])
if hasattr(model, 'predict'):
# Esegui la predizione con timeout per evitare blocchi
start_time = time.time()
max_time = 60 # Massimo 60 secondi
try:
# Sopprimiamo i warning sui nomi delle feature
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning)
# Crea un DataFrame con i nomi di feature corretti dal modello se disponibili
try:
if hasattr(model, 'feature_names_in_'):
# Se il modello ha feature_names_in_, usa quelli
feature_names = model.feature_names_in_
raw_predictions = model.predict(features)
else:
# Altrimenti, fai la predizione direttamente sull'array numpy
raw_predictions = model.predict(features)
except Exception as e:
logging.warning(f"Errore con feature_names_in_: {e}, tentativo alternativo")
# Fallback: predizione diretta su array numpy
raw_predictions = model.predict(features)
# Se il modello supporta il decision_function, usiamo quello per applicare la sensibilità
if hasattr(model, 'decision_function'):
try:
# Ottieni gli score di decisione
decision_scores = model.decision_function(features)
# Normalizza gli score per la sensibilità
# Un valore inferiore del threshold rende il modello più sensibile
# La sensibilità è inversa al threshold: sensitivity 1 = threshold basso, quindi più anomalie
threshold_multiplier = sensitivity / 5.0 # 5 è il valore neutro (1.0)
custom_threshold = -0.2 * threshold_multiplier # Valore base regolabile
# Applica il threshold personalizzato
predictions = np.where(decision_scores < custom_threshold, -1, 1)
num_anomalies = np.sum(predictions == -1)
logging.debug(f"Trovate {num_anomalies} anomalie con sensibilità {sensitivity} (threshold: {custom_threshold:.3f})")
except Exception as e:
logging.warning(f"Errore nell'utilizzo di decision_function: {e}, usando predict standard")
predictions = raw_predictions
else:
# Usa le predizioni standard
predictions = raw_predictions
logging.debug(f"Predizione completata: {len(predictions)} risultati")
except Exception as e:
# Se c'è un errore, registriamolo per debug
logging.error(f"Errore durante la predizione: {e}")
import traceback
logging.error(traceback.format_exc())
# Aggiorniamo anche il timeout
elapsed = time.time() - start_time
if elapsed >= max_time:
logging.error(f"Timeout durante la predizione ({elapsed:.1f} sec)")
# Fallback: array di zeri (non anomali)
predictions = np.zeros(features.shape[0])
return predictions
else:
logging.error("Modello non ha il metodo predict")
return np.zeros(features.shape[0])
except Exception as e:
logging.error(f"Errore generale nella predizione: {e}")
import traceback
logging.error(traceback.format_exc())
return np.zeros(features.shape[0])
def get_details(engine, ids):
"""
Ottieni i dettagli completi per gli ID specificati dalla tabella Esterna
"""
try:
if not ids:
return pd.DataFrame()
id_list = ','.join(map(str, ids))
query = text("""
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4
FROM Esterna
WHERE ID IN ({})
""".format(id_list))
details = pd.read_sql(query, engine)
# Converti timestamp
if 'Data' in details.columns and 'Ora' in details.columns:
details['Data'] = pd.to_datetime(details['Data'], errors='coerce')
details['Ora'] = pd.to_timedelta(details['Ora'].astype(str), errors='coerce')
details['Timestamp'] = details['Data'] + details['Ora']
return details
except Exception as e:
logging.error(f"Errore nell'ottenere i dettagli dalla tabella Esterna: {e}")
return pd.DataFrame()
def classify_risk(anomaly_score):
"""
Classifica il livello di rischio in base allo score di anomalia
"""
# Normalizza lo score (potrebbe essere già normalizzato)
score = abs(anomaly_score)
if score < RISK_LEVELS['NORMALE']:
update_counter('ip_normal')
return 'NORMALE'
elif score < RISK_LEVELS['BASSO']:
update_counter('ip_low')
return 'BASSO'
elif score < RISK_LEVELS['MEDIO']:
update_counter('ip_medium')
return 'MEDIO'
elif score < RISK_LEVELS['ALTO']:
update_counter('ip_high')
return 'ALTO'
else:
update_counter('ip_critical')
return 'CRITICO'
def is_known_attacker(engine, ip_address):
"""
Verifica se un IP è un attaccante noto
Crea la tabella se non esiste
"""
try:
with engine.connect() as conn:
# Crea la tabella se non esiste
logging.debug(f"Verifica tabella known_attackers per IP {ip_address}")
create_table_query = text("""
CREATE TABLE IF NOT EXISTS known_attackers (
id INT AUTO_INCREMENT PRIMARY KEY,
ip_address VARCHAR(45) NOT NULL,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
attack_count INT DEFAULT 1,
risk_level VARCHAR(20) DEFAULT 'NORMALE',
ports_used TEXT DEFAULT NULL,
attack_patterns TEXT DEFAULT NULL,
is_blocked TINYINT(1) DEFAULT 0,
UNIQUE KEY unique_ip (ip_address)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# Assicurati che la transazione sia valida
conn.execute(create_table_query)
# Verifica se l'IP esiste
check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
logging.debug(f"Esecuzione query: {check_query} con parametri: {{'ip': {ip_address}}}")
result = conn.execute(check_query, {"ip": ip_address}).fetchone()
exists = result is not None
logging.debug(f"IP {ip_address} è un attaccante noto: {exists}")
return exists
except Exception as e:
logging.error(f"Errore nel verificare se l'IP {ip_address} è un attaccante noto: {e}")
return False
def update_known_attacker(engine, ip_address, risk_level, port=None, message=None):
"""
Aggiorna o inserisce informazioni su un attaccante noto
Ritorna il livello di rischio aggiornato
"""
try:
# Inizializza porta e messaggio a valori accettabili per il DB
port_str = str(port) if port is not None else None
message_str = str(message) if message is not None else None
with engine.connect() as conn:
# Crea una transazione
trans = conn.begin()
# Verifica se l'IP esiste già
check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
result = conn.execute(check_query, {"ip": ip_address}).fetchone()
if result:
# Aggiorna esistente
logging.debug(f"Aggiornamento attaccante esistente: {ip_address}, nuovo rischio={risk_level}")
# Estrai le porte utilizzate e aggiungi quella nuova se non presente
ports = result.ports_used or ""
if port_str and port_str not in ports.split(','):
if ports:
ports += f",{port_str}"
else:
ports = port_str
# Determina il nuovo livello di rischio (il massimo tra quello esistente e il nuovo)
existing_risk = result.risk_level
if existing_risk:
risk_levels = ['BASSO', 'MEDIO', 'ALTO', 'CRITICO']
existing_idx = risk_levels.index(existing_risk) if existing_risk in risk_levels else 0
new_idx = risk_levels.index(risk_level) if risk_level in risk_levels else 1
final_risk = risk_levels[max(existing_idx, new_idx)]
else:
final_risk = risk_level
# Aggiorna l'entry
update_query = text("""
UPDATE known_attackers
SET last_seen = NOW(),
attack_count = attack_count + 1,
risk_level = :risk,
ports_used = :ports
WHERE ip_address = :ip
""")
conn.execute(update_query, {"ip": ip_address, "risk": final_risk, "ports": ports})
trans.commit()
return final_risk
else:
# Inserisci nuovo
try:
logging.debug(f"Inserimento nuovo attaccante: {ip_address}, rischio={risk_level}, porta={port}")
insert_query = text("""
INSERT INTO known_attackers
(ip_address, risk_level, ports_used, attack_patterns, is_blocked)
VALUES (:ip, :risk, :port, :message, 0)
""")
conn.execute(insert_query, {"ip": ip_address, "risk": risk_level, "port": port_str, "message": message_str})
trans.commit()
# Inizializza il contatore al nuovo livello di rischio
if risk_level == 'BASSO':
update_counter('ip_low')
elif risk_level == 'MEDIO':
update_counter('ip_medium')
elif risk_level == 'ALTO':
update_counter('ip_high')
elif risk_level == 'CRITICO':
update_counter('ip_critical')
return risk_level
except Exception as e:
logging.error(f"Errore nell'inserimento del nuovo attaccante: {e}")
trans.rollback()
return risk_level
except Exception as e:
logging.error(f"Errore nell'aggiornamento dell'attaccante {ip_address}: {e}")
return risk_level
def should_block_ip(risk_level):
"""
Determina se un IP dovrebbe essere bloccato in base al suo livello di rischio
"""
should_block = risk_level in ['ALTO', 'CRITICO']
logging.debug(f"IP con rischio {risk_level} dovrebbe essere bloccato: {should_block}")
return should_block
def insert_anomaly_to_db(engine, ip_address, risk_level, list_name):
"""
Inserisce un'anomalia nel database
Ritorna True se l'inserimento è avvenuto con successo, False altrimenti
"""
try:
# Verifica la connessione al database
if not engine:
logging.error("Nessuna connessione al database disponibile")
return False
logging.debug(f"Tentativo di inserimento IP {ip_address} nella lista {list_name}")
# Converti il livello di rischio in un valore numerico per il database
risk_value = {
'BASSO': 1,
'MEDIO': 2,
'ALTO': 3,
'CRITICO': 4
}.get(risk_level, 2) # Default a MEDIO se non riconosciuto
# Prepara la query di inserimento
insert_query = text("""
INSERT INTO Fibra (IndirizzoIP, Data, Ora, Host, Attivo, Lista, NumeroAttacchi, LivelloDiRischio)
VALUES (:ip, CURDATE(), CURTIME(), :host, 1, :lista, 1, :rischio)
ON DUPLICATE KEY UPDATE
Attivo = 1,
NumeroAttacchi = NumeroAttacchi + 1,
Data = CURDATE(),
Ora = CURTIME(),
LivelloDiRischio = GREATEST(LivelloDiRischio, :rischio)
""")
# Hostname dal database se disponibile
hostname = ""
# Tenta di ottenere l'hostname dal database se possibile
try:
hostname_query = text("""
SELECT Host FROM Esterna
WHERE Messaggio2 LIKE :pattern
AND Host IS NOT NULL AND Host != ''
ORDER BY ID DESC LIMIT 1
""")
with engine.connect() as conn:
result = conn.execute(hostname_query, {"pattern": f"{ip_address}:%"}).fetchone()
if result and result[0]:
hostname = result[0]
logging.debug(f"Hostname trovato per {ip_address}: {hostname}")
except Exception as e:
logging.warning(f"Impossibile recuperare l'hostname per {ip_address}: {e}")
# Esegui la query di inserimento
with engine.connect() as conn:
result = conn.execute(
insert_query,
{
"ip": ip_address,
"host": hostname,
"lista": list_name,
"rischio": risk_value
}
)
conn.commit() # Esplicito commit per assicurarsi che la transazione venga completata
logging.info(f"IP {ip_address} inserito/aggiornato nella lista {list_name} con rischio {risk_level}")
# Verifica l'inserimento
verify_query = text("SELECT COUNT(*) FROM Fibra WHERE IndirizzoIP = :ip AND Lista = :lista")
count = conn.execute(verify_query, {"ip": ip_address, "lista": list_name}).scalar()
if count > 0:
logging.debug(f"Verifica inserimento: trovate {count} righe per IP {ip_address}")
return True
logging.error(f"Verifica inserimento fallita: nessuna riga trovata per IP {ip_address}")
return False
except Exception as e:
logging.error(f"Errore nell'inserimento dell'anomalia nel database per IP {ip_address}: {e}")
import traceback
logging.error(traceback.format_exc())
return False
def handle_anomaly(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'):
"""
Gestisce un'anomalia rilevata, aggiornando il database e generando avvisi
"""
try:
# Verifico che l'indirizzo IP sia valido
if not ip_address or pd.isna(ip_address):
logging.warning(f"Indirizzo IP non valido: {ip_address}")
return
# Controllo se l'IP è già noto come attaccante
is_known = is_known_attacker(engine, ip_address)
# Se è già noto, aggiorno l'attacco, altrimenti lo creo
if is_known:
logging.debug(f"IP {ip_address} già noto, aggiornamento entry...")
update_known_attacker(engine, ip_address, risk_level, port, message)
else:
logging.debug(f"IP {ip_address} nuovo attaccante, creazione entry...")
# Inserisci l'anomalia nel database
try:
logging.debug(f"Inserimento anomalia nel DB per {ip_address} con rischio {risk_level}")
insert_success = insert_anomaly_to_db(engine, ip_address, risk_level, list_name)
if insert_success:
# Aggiorna i contatori appropriati
if risk_level == 'BASSO':
update_counter('ip_low')
elif risk_level == 'MEDIO':
update_counter('ip_medium')
elif risk_level == 'ALTO':
update_counter('ip_high')
elif risk_level == 'CRITICO':
update_counter('ip_critical')
# Conta per ogni tipo di lista
update_counter(f'list_{list_name}')
logging.info(f"Anomalia inserita con successo per IP {ip_address} (Rischio: {risk_level})")
else:
logging.error(f"Errore nell'inserimento dell'anomalia per IP {ip_address}")
except Exception as insert_e:
logging.error(f"Eccezione durante l'inserimento dell'anomalia: {insert_e}")
import traceback
logging.error(traceback.format_exc())
# Determina se l'IP dovrebbe essere bloccato
if should_block_ip(risk_level):
logging.warning(f"IP {ip_address} dovrebbe essere bloccato (Rischio: {risk_level})")
# Qui potremmo implementare il blocco automatico dell'IP
return True
except Exception as e:
logging.error(f"Errore nella gestione dell'anomalia per IP {ip_address}: {e}")
import traceback
logging.error(traceback.format_exc())
return False
def process_batch(batch_data, engine, model, preprocessor, whitelist, sensitivity=5):
"""
Processa un batch di dati per rilevare anomalie
Ritorna gli IP processati e il numero di anomalie rilevate
"""
try:
# Prepara i dati
X = prepare_data(batch_data, preprocessor)
if X is None or X.shape[0] == 0:
logging.error("Nessuna feature generata per la predizione nel batch")
return set(), 0
# Fai la predizione
predictions = predict_anomalies(model, X, sensitivity)
# Aggiusta dimensioni se necessario
if len(predictions) != len(batch_data):
logging.warning(f"Dimensioni predizioni ({len(predictions)}) != dimensioni batch ({len(batch_data)})")
if len(predictions) < len(batch_data):
# Estendi l'array
predictions = np.append(predictions, np.zeros(len(batch_data) - len(predictions)))
else:
# Tronca l'array
predictions = predictions[:len(batch_data)]
# Aggiungi il risultato al batch
batch_data.loc[:, 'anomaly'] = predictions
# Estrai gli IP e trova le anomalie
processed_ips = set()
anomaly_count = 0
# Gestisci le predizioni
anomalies = batch_data[batch_data['anomaly'] == -1]
anomaly_count = len(anomalies)
if not anomalies.empty:
# Estrai e prepara gli IP attaccanti
if 'IP_Attaccante' not in batch_data.columns and 'Messaggio2' in batch_data.columns:
batch_data['IP_Attaccante'] = batch_data['Messaggio2'].apply(
lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None
)
if 'Porta_Attaccante' not in batch_data.columns and 'Messaggio2' in batch_data.columns:
batch_data['Porta_Attaccante'] = batch_data['Messaggio2'].apply(
lambda x: x.split(':')[1] if pd.notna(x) and isinstance(x, str) and ':' in x else None
)
# Processa ciascun IP anomalo
for idx, row in anomalies.iterrows():
ip = row.get('IP_Attaccante')
if pd.notna(ip) and not is_ip_whitelisted(ip, whitelist):
processed_ips.add(ip)
# Assegna un livello di rischio base
risk_level = 'MEDIO' # Default
port = None
if 'Porta_Attaccante' in row and pd.notna(row['Porta_Attaccante']):
port = row['Porta_Attaccante']
# Crea un messaggio informativo
msg = f"Anomalia rilevata da {row.get('Messaggio2', 'N/A')}"
try:
# Gestisci anomalia con cattura delle eccezioni specifiche
handle_anomaly(engine, ip, risk_level, port, msg, 'ddos_ia')
except Exception as e:
# Log dell'errore ma continua con altre IP
logging.warning(f"Errore nel gestire l'anomalia per IP {ip}: {e}")
# Restituisci IP elaborati e conteggio anomalie
return processed_ips, len(anomalies)
except Exception as e:
logging.error(f"Errore nell'elaborazione del batch: {e}")
return set(), 0
def esegui_analisi(args, ciclo_count=0):
# Visualizza informazioni di avvio
if args.ciclo:
ciclo_txt = f" (ciclo {ciclo_count})"
else:
ciclo_txt = ""
log_phase(f"Avvio rilevamento DDoS{ciclo_txt}")
# Mostra informazioni sulla modalità di esecuzione
if args.parallel:
log_result(f"Modalità parallela attiva con {args.workers} worker")
log_result(f"Dimensione batch: {args.batch_size} record")
start_progress_tracking(f"rilevamento DDoS{ciclo_txt}")
# Statistiche per questo ciclo
stats = {
'records': 0,
'anomalies': 0,
'unique_ips': 0
}
# Verifica percorsi e autorizzazioni dei file di modello
logging.debug("Verifica dei percorsi dei modelli...")
model_files = {
"Modello principale": MODEL_PATH,
"Preprocessor": PREPROCESSOR_PATH
}
all_models_ok = True
for name, path in model_files.items():
if os.path.exists(path):
try:
# Verifica che il file sia leggibile
with open(path, 'rb') as f:
f.read(1)
logging.debug(f"{name} trovato e leggibile: {path}")
except Exception as e:
log_error(f"Il file {name} esiste ma non è leggibile: {e}")
all_models_ok = False
else:
log_error(f"File {name} non trovato: {path}")
all_models_ok = False
if all_models_ok:
log_result("Tutti i file modello sono presenti e leggibili")
# Test connessione database
if not test_database_connection():
log_error("Impossibile connettersi al database. Verificare le credenziali e la disponibilità del server.")
end_progress_tracking()
return False, stats
log_result("Connessione al database stabilita")
try:
# Connessione al database
logging.debug("Connessione al database...")
engine = create_engine_with_retry(CONN_STRING)
# Caricamento del modello
log_phase("Caricamento dei modelli")
model, preprocessor = load_models()
if model is None:
log_error("Impossibile caricare il modello. Arresto del programma.")
end_progress_tracking()
return False, stats
# Verifica che il modello sia valido
if not hasattr(model, 'predict'):
log_error("Il modello caricato non è valido (manca il metodo predict). Arresto del programma.")
end_progress_tracking()
return False, stats
log_result(f"Modello caricato correttamente con {len(preprocessor.get('feature_columns', []))} feature")
# Carica la whitelist e processa i dati
whitelist = load_whitelist(args.whitelist)
# Verifica se ci sono dati nuovi
last_id = load_last_analyzed_id()
logging.debug(f"Last analyzed ID: {last_id}")
# Se richiesto di saltare i vecchi record
if args.skip_old:
# Trova il record più recente
try:
with engine.connect() as conn:
latest_id_query = "SELECT MAX(ID) FROM Esterna"
latest_id = conn.execute(text(latest_id_query)).scalar()
if latest_id:
# Calcola un offset (es. ultimi 10k record)
offset = min(latest_id - last_id, 10000)
new_last_id = max(latest_id - offset, last_id)
if new_last_id > last_id:
log_warning(f"Saltando {new_last_id - last_id} record vecchi")
last_id = new_last_id
except Exception as e:
log_error(f"Errore nel determinare l'ID più recente: {e}")
# Usa max_id se specificato
max_id = args.max_id
# FLUSSO OTTIMIZZATO
try:
# 1. Estrai dati (ottimizzato)
log_phase("Estrazione dati dal database")
new_data = extract_data(engine, last_id, args.batch_size, max_id)
if new_data.empty:
log_result("Nessun nuovo dato da analizzare")
end_progress_tracking()
return True, stats
total_records = len(new_data)
stats['records'] = total_records
last_analyzed_id = new_data['ID'].max()
log_result(f"Estratti {total_records} record (ID da {last_id+1} a {last_analyzed_id})")
# Elaborazione in parallelo o sequenziale
if args.parallel and total_records > 1000:
log_phase(f"Elaborazione parallela con {args.workers} worker")
# Dividi i dati in batch più piccoli
batch_size = min(1000, max(100, total_records // args.workers))
num_batches = (total_records + batch_size - 1) // batch_size
batches = []
# Usa un metodo più sicuro per dividere i dati in batch
# Evita warning di pandas utilizzando loc in modo esplicito
for i in range(num_batches):
start_idx = i * batch_size
end_idx = min(start_idx + batch_size, total_records)
# Crea una copia dei dati per ogni batch invece di una vista
batch_indices = new_data.index[start_idx:end_idx]
batch = new_data.loc[batch_indices].copy()
batches.append(batch)
logging.debug(f"Dati suddivisi in {len(batches)} batch per elaborazione parallela")
# Processa batch in parallelo con gestione degli errori migliorata
all_processed_ips = set()
total_anomalies = 0
# Usa lock per aggiornamenti concorrenti più sicuri
processed_lock = threading.Lock()
with ThreadPoolExecutor(max_workers=args.workers) as executor:
# Crea un dizionario di future
future_to_batch = {
executor.submit(process_batch, batch, engine, model, preprocessor, whitelist, args.sensibility): i
for i, batch in enumerate(batches)
}
for future in as_completed(future_to_batch):
batch_idx = future_to_batch[future]
try:
processed_ips, anomaly_count = future.result()
# Usa lock per aggiornamenti thread-safe
with processed_lock:
if processed_ips:
all_processed_ips.update(processed_ips)
total_anomalies += anomaly_count
logging.debug(f"Batch {batch_idx+1}/{len(batches)} completato: {anomaly_count} anomalie")
except Exception as e:
log_error(f"Errore nell'elaborazione del batch {batch_idx}: {e}")
# Aggiorna le statistiche
stats['anomalies'] = total_anomalies
stats['unique_ips'] = len(all_processed_ips)
log_result(f"Elaborazione completata: {len(all_processed_ips)} IP unici con anomalie, {total_anomalies} anomalie totali")
else:
# Elaborazione sequenziale (come prima)
log_phase("Analisi dati e rilevamento anomalie")
# Crea una copia esplicita per evitare SettingWithCopyWarning
df = new_data.copy()
# Estrai e prepara gli IP attaccanti
logging.debug("Preparazione IP attaccanti...")
# Converti in pochi passaggi per evitare blocchi
df.loc[:, 'IP_Attaccante'] = df['Messaggio2'].apply(
lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None
)
# Conta solo un campione di IP whitelistati per evitare blocchi
whitelisted_ips = set()
if len(df) > 100:
# Verifichiamo tutti gli IP per la whitelist
all_ips = df['IP_Attaccante'].dropna().unique()
logging.debug(f"Verifica di {len(all_ips)} IP unici contro la whitelist...")
for ip in all_ips:
if is_ip_whitelisted(ip, whitelist):
whitelisted_ips.add(ip)
logging.debug(f"Trovati {len(whitelisted_ips)} IP in whitelist su {len(all_ips)} IP unici")
# Prepara i dati con metodo minimalista
logging.debug("Preparazione dati per predizione...")
features = prepare_data(df, preprocessor)
if features is None or features.shape[0] == 0:
log_error("Nessuna feature generata per la predizione.")
end_progress_tracking()
return True, stats
# Fai la predizione con la sensibilità specificata
logging.debug("Predizione anomalie...")
predictions = predict_anomalies(model, features, args.sensibility)
# Aggiusta le dimensioni se necessario
if len(predictions) != len(df):
log_warning(f"Dimensioni differenti: predizioni {len(predictions)}, dati {len(df)}")
if len(predictions) < len(df):
# Estendi l'array delle predizioni
predictions = np.append(predictions, np.zeros(len(df) - len(predictions)))
else:
# Tronca l'array delle predizioni
predictions = predictions[:len(df)]
# Applica risultati usando .loc per evitare warning
df.loc[:, 'anomaly'] = predictions
# Aggiorna i contatori per ogni IP (senza duplicati)
ip_counter = {}
for idx, row in df.iterrows():
ip = row.get('IP_Attaccante')
if pd.notna(ip) and ip not in ip_counter:
ip_counter[ip] = True
if ip not in whitelisted_ips:
# Un nuovo IP analizzato
update_counter('ip_analyzed')
# Classifica in base al risultato della predizione
if row['anomaly'] == -1:
# Anomalo - non incrementiamo qui, lo farà update_known_attacker
pass
else:
# Normale
update_counter('ip_normal')
# Aggiorna le statistiche
stats['unique_ips'] = len(ip_counter) - len(whitelisted_ips)
log_result(f"Analizzati {len(ip_counter)} IP unici, {len(whitelisted_ips)} in whitelist")
# Gestisci anomalie
anomalies = df[df['anomaly'] == -1]
stats['anomalies'] = len(anomalies)
if not anomalies.empty:
log_phase(f"Gestione di {len(anomalies)} anomalie")
# Mostra un esempio delle anomalie nei log
sample_size = min(5, len(anomalies))
if sample_size > 0:
logging.debug(f"Esempio di {sample_size} anomalie rilevate:")
for idx, row in anomalies.head(sample_size).iterrows():
ip = row.get('IP_Attaccante')
msg = row.get('Messaggio2', 'N/A')
logging.debug(f" - [{idx}] IP: {ip}, Messaggio: {msg}")
# Conteggia gli IP anomali per IP
anomaly_ips = {}
for idx, row in anomalies.iterrows():
ip = row.get('IP_Attaccante')
if pd.notna(ip):
if ip not in anomaly_ips:
anomaly_ips[ip] = 0
anomaly_ips[ip] += 1
log_result(f"Trovati {len(anomaly_ips)} IP unici con anomalie")
# Mostra i top 10 IP con più anomalie
top_anomalies = sorted(anomaly_ips.items(), key=lambda x: x[1], reverse=True)[:10]
if top_anomalies:
logging.debug("Top 10 IP con più anomalie:")
for ip, count in top_anomalies:
logging.debug(f" - IP: {ip}, Anomalie: {count}")
# Processa ciascun IP una sola volta
processed_ips = set()
for idx, row in anomalies.iterrows():
ip = row.get('IP_Attaccante')
if pd.notna(ip) and ip not in processed_ips and not is_ip_whitelisted(ip, whitelist):
processed_ips.add(ip)
# Assegna un livello di rischio base
risk_level = 'MEDIO' # Default
port = None
if 'Porta_Attaccante' in row and pd.notna(row['Porta_Attaccante']):
port = row['Porta_Attaccante']
# Crea un messaggio informativo
msg = f"Anomalia rilevata da {row.get('Messaggio2', 'N/A')}"
try:
# Gestisci l'anomalia con cattura dell'eccezione
handle_anomaly(engine, ip, risk_level, port, msg, 'ddos_ia')
except Exception as e:
logging.error(f"Errore nella gestione dell'anomalia per IP {ip}: {e}")
log_result(f"Elaborate {len(processed_ips)} anomalie uniche")
else:
log_result("Nessuna anomalia rilevata")
# Salva l'ultimo ID analizzato
try:
save_last_analyzed_id(last_analyzed_id)
except Exception as e:
logging.error(f"Errore nel salvare l'ultimo ID analizzato: {e}")
# Segnala il completamento
log_phase(f"Analisi completata")
log_result(f"Processati {len(new_data)} eventi, ID fino a {last_analyzed_id}")
end_progress_tracking()
# Forza la liberazione della memoria
del new_data
gc.collect()
return True, stats
except Exception as e:
log_error(f"Errore durante l'analisi: {e}")
import traceback
logging.error(f"Traceback completo: {traceback.format_exc()}")
end_progress_tracking()
return False, stats
except Exception as e:
log_error(f"Errore generale: {e}")
import traceback
logging.error(f"Traceback completo: {traceback.format_exc()}")
end_progress_tracking()
return False, stats
def main():
"""
Funzione principale per il rilevamento DDoS
"""
# Parsing degli argomenti
parser = argparse.ArgumentParser(description='Rilevamento DDoS in tempo reale')
parser.add_argument('--debug', action='store_true', help='Abilita logging di debug dettagliato')
parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione del batch di dati da analizzare')
parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso del file whitelist')
parser.add_argument('--ciclo', action='store_true', help='Esegui in un ciclo infinito fino all\'interruzione')
parser.add_argument('--pausa', type=int, default=60, help='Secondi di pausa tra un ciclo e l\'altro (con --ciclo)')
parser.add_argument('--parallel', action='store_true', help='Abilita elaborazione parallela per migliorare performance')
parser.add_argument('--workers', type=int, default=4, help='Numero di worker per elaborazione parallela')
parser.add_argument('--max-id', type=int, default=None, help='ID massimo da elaborare, utile per limitare arretrati')
parser.add_argument('--skip-old', action='store_true', help='Salta i record vecchi e analizza solo i più recenti')
parser.add_argument('--quiet', action='store_true', help='Modalità silenziosa, mostra solo errori e risultati fondamentali')
parser.add_argument('--max-records', type=int, default=5000, help='Numero massimo di record da analizzare per ciclo')
parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11),
help='Sensibilità di rilevamento anomalie (1-10): 1=massima sensibilità, 10=minima sensibilità')
args = parser.parse_args()
# Imposta il livello di logging in base agli argomenti
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("Modalità debug attivata")
elif args.quiet:
logging.getLogger().setLevel(logging.ERROR)
print("Modalità silenziosa attivata: verranno mostrati solo errori e risultati fondamentali")
# Visualizza info sulla sensibilità
if args.sensibility != 5:
log_result(f"Sensibilità di rilevamento: {args.sensibility}/10 " +
("(più sensibile)" if args.sensibility < 5 else "(meno sensibile)" if args.sensibility > 5 else "(standard)"))
# Statistiche globali per i cicli
cicli_stats = {
'cicli_completati': 0,
'total_records': 0,
'total_anomalies': 0,
'total_unique_ips': 0,
'start_time': time.time()
}
# Gestisce l'interruzione con CTRL+C
def handle_interrupt(signum, frame):
elapsed = time.time() - cicli_stats['start_time']
print(f"\n{Colors.BOLD}{Colors.RED}Interruzione ricevuta. Chiusura del programma...{Colors.END}")
print(f"\n{Colors.BOLD}{Colors.BLUE}STATISTICHE TOTALI{Colors.END}")
print(f"Tempo totale di esecuzione: {elapsed:.1f} secondi")
print(f"Cicli completati: {cicli_stats['cicli_completati']}")
print(f"Record analizzati: {cicli_stats['total_records']}")
print(f"Anomalie rilevate: {cicli_stats['total_anomalies']}")
print(f"IP unici analizzati: {cicli_stats['total_unique_ips']}")
end_progress_tracking()
exit(0)
# Registra il gestore per SIGINT (CTRL+C)
signal.signal(signal.SIGINT, handle_interrupt)
# Ciclo infinito quando --ciclo è specificato
ciclo_count = 0
# Esegui una singola analisi o in ciclo
if args.ciclo:
log_phase(f"Esecuzione in modalità ciclo infinito")
while True:
ciclo_count += 1
# Limita il massimo di record da elaborare per ciclo
args.batch_size = min(args.batch_size, args.max_records)
# Esegui l'analisi e cattura i risultati
success, stats = esegui_analisi(args, ciclo_count)
# Aggiorna le statistiche complessive
if success and stats:
cicli_stats['cicli_completati'] += 1
cicli_stats['total_records'] += stats.get('records', 0)
cicli_stats['total_anomalies'] += stats.get('anomalies', 0)
cicli_stats['total_unique_ips'] += stats.get('unique_ips', 0)
if success:
log_result(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...")
time.sleep(args.pausa)
else:
log_error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi prima di riprovare...")
time.sleep(args.pausa * 2) # Pausa più lunga in caso di errore
else:
# Modalità singola
esegui_analisi(args, 0)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--test":
logging.info("MODALITÀ TEST: verifica delle funzioni principali")
try:
engine = create_engine_with_retry(CONN_STRING)
test_ip = "192.168.1.1"
logging.info(f"Test 1: Verifica tabella known_attackers")
is_known = is_known_attacker(engine, test_ip)
logging.info(f"IP {test_ip} è un attaccante noto: {is_known}")
logging.info(f"Test 2: Aggiornamento known_attacker")
new_risk = update_known_attacker(engine, test_ip, "NORMALE", "80", "Test message")
logging.info(f"Nuovo livello di rischio: {new_risk}")
logging.info(f"Test 3: Verifica se ora è un attaccante noto")
is_known = is_known_attacker(engine, test_ip)
logging.info(f"IP {test_ip} è ora un attaccante noto: {is_known}")
logging.info("MODALITÀ TEST completata")
sys.exit(0)
except Exception as e:
logging.error(f"Errore nei test: {e}")
sys.exit(1)
else:
main()