ids.alfacom.it/extracted_idf/ddetect_fixed.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

1140 lines
48 KiB
Python

#!/usr/bin/env python3
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from joblib import load
import logging
import gc
import os
import time
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import ipaddress
import numpy as np
from sklearn.ensemble import IsolationForest
import threading
import argparse
import signal
# Configurazione del logging avanzata per il debug
logging.basicConfig(
level=logging.INFO, # Modificato da DEBUG a INFO per default
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('ddetect_debug.log')
]
)
# Configurazione del database
DB_USER = os.environ.get('DB_USER', 'root')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
# Percorsi dei file
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
os.makedirs(MODEL_DIR, exist_ok=True)
MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt')
LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt')
# Definizione dei livelli di rischio e soglie
RISK_LEVELS = {
'NORMALE': 0.1,
'BASSO': 0.3,
'MEDIO': 0.6,
'ALTO': 0.8,
'CRITICO': 0.95
}
logging.debug(f"Percorsi: MODEL_DIR={MODEL_DIR}, WHITELIST_PATH={WHITELIST_PATH}")
logging.debug(f"Livelli di rischio configurati: {RISK_LEVELS}")
# Variabili globali per il tracciamento dell'avanzamento
progress_counters = {
'ip_whitelisted': 0,
'ip_analyzed': 0,
'ip_normal': 0,
'ip_low': 0,
'ip_medium': 0,
'ip_high': 0,
'ip_critical': 0,
'metrics_processed': 0,
'last_update': 0,
'in_progress': False,
'operation': '',
'start_time': None
}
def reset_counters():
"""
Resetta i contatori per una nuova esecuzione
"""
global progress_counters
progress_counters['ip_whitelisted'] = 0
progress_counters['ip_analyzed'] = 0
progress_counters['ip_normal'] = 0
progress_counters['ip_low'] = 0
progress_counters['ip_medium'] = 0
progress_counters['ip_high'] = 0
progress_counters['ip_critical'] = 0
progress_counters['metrics_processed'] = 0
progress_counters['last_update'] = 0
progress_counters['in_progress'] = False
progress_counters['operation'] = ''
progress_counters['start_time'] = None
def start_progress_tracking(operation):
"""
Inizia il tracciamento dell'operazione
"""
global progress_counters
reset_counters()
progress_counters['in_progress'] = True
progress_counters['operation'] = operation
progress_counters['start_time'] = time.time()
# Avvia un thread per il reporting
threading.Thread(target=progress_reporter, daemon=True).start()
logging.info(f"Avvio monitoraggio operazione: {operation}")
def update_counter(counter_name, increment=1):
"""
Aggiorna un contatore specifico
"""
global progress_counters
if counter_name in progress_counters:
progress_counters[counter_name] += increment
def end_progress_tracking():
"""
Termina il tracciamento e mostra il report finale
"""
global progress_counters
if not progress_counters['in_progress']:
return
progress_counters['in_progress'] = False
report_progress(force=True)
logging.info(f"Monitoraggio completato per: {progress_counters['operation']}")
def report_progress(force=False):
"""
Riporta lo stato attuale dei contatori
"""
global progress_counters
if not progress_counters['in_progress'] and not force:
return
current_time = time.time()
if not force and (current_time - progress_counters['last_update']) < 10: # Aggiorna ogni 10 secondi
return
elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0
report = f"""
======== REPORT DI PROGRESSO - {progress_counters['operation']} ========
Tempo trascorso: {elapsed:.1f} secondi
IP Whitelistati esclusi: {progress_counters['ip_whitelisted']}
Metriche elaborate: {progress_counters['metrics_processed']}
IP Analizzati: {progress_counters['ip_analyzed']}
Classificazione rischio:
- IP NORMALI: {progress_counters['ip_normal']}
- IP BASSI: {progress_counters['ip_low']}
- IP MEDI: {progress_counters['ip_medium']}
- IP ALTI: {progress_counters['ip_high']}
- IP CRITICI: {progress_counters['ip_critical']}
================================================================
"""
logging.info(report)
progress_counters['last_update'] = current_time
def progress_reporter():
"""
Thread che riporta periodicamente i progressi
"""
while progress_counters['in_progress']:
report_progress()
time.sleep(2) # Controlla ogni 2 secondi, ma riporta solo ogni 10
def test_database_connection():
"""
Test di connessione al database
"""
try:
logging.debug("Tentativo di connessione al database...")
engine = create_engine(CONN_STRING)
with engine.connect() as conn:
result = conn.execute(text("SELECT 1")).fetchone()
if result[0] == 1:
logging.debug("Test connessione al database riuscito!")
tables = conn.execute(text("SHOW TABLES")).fetchall()
logging.debug(f"Tabelle disponibili: {[t[0] for t in tables]}")
return True
return False
except Exception as e:
logging.error(f"Errore nel test di connessione al database: {e}")
return False
def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2):
"""
Crea una connessione al database con tentativi multipli
"""
for attempt in range(max_retries):
try:
# Configurazione ottimizzata per SQLAlchemy
engine = create_engine(
conn_string,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
pool_pre_ping=True,
pool_timeout=30,
echo=False,
isolation_level="READ COMMITTED"
)
# Test di connessione
with engine.connect() as conn:
conn.execute(text("SELECT 1")).fetchone()
logging.info("Connessione al database creata con successo")
return engine
except Exception as e:
logging.error(f"Tentativo {attempt+1} fallito: {e}")
if attempt < max_retries - 1:
logging.info(f"Nuovo tentativo tra {retry_delay} secondi...")
time.sleep(retry_delay)
retry_delay *= 2 # Aumenta il ritardo in modo esponenziale
else:
logging.error("Impossibile connettersi al database dopo tutti i tentativi")
raise
def load_models():
"""
Carica i modelli salvati e gli oggetti per il preprocessing
"""
try:
# Carica il modello Isolation Forest
model = load(MODEL_PATH)
logging.info("Modello Isolation Forest caricato con successo")
# Tenta di caricare il preprocessor
try:
preprocessor = load(PREPROCESSOR_PATH)
# Verifica che il preprocessor abbia la struttura attesa
if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor:
logging.info(f"Preprocessor caricato con successo: {len(preprocessor['feature_columns'])} feature")
return model, preprocessor
else:
logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.")
except Exception as e:
logging.error(f"Errore nel caricamento del preprocessor: {e}")
# Se siamo qui, il preprocessor non è valido o ha dato errore
logging.error("Preprocessor non valido o non contiene le informazioni sulle feature. Utilizzo metodo fallback.")
# Crea un preprocessor fallback con 53 colonne (numero atteso dal modello)
fallback_preprocessor = {
'feature_columns': [f'col_{i}' for i in range(53)],
'categorical_features': {},
'text_vectorizer': None
}
logging.info("Creato preprocessor fallback con 53 feature")
return model, fallback_preprocessor
except Exception as e:
logging.error(f"Errore nel caricamento dei modelli: {e}")
return None, None
def load_whitelist(whitelist_path=None):
"""
Carica la whitelist da file
"""
if whitelist_path is None:
whitelist_path = '/root/whitelist.txt'
try:
if not os.path.exists(whitelist_path):
logging.warning(f"File whitelist non trovato: {whitelist_path}")
return {'exact_ips': set(), 'networks': []}
with open(whitelist_path, 'r') as f:
whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')]
exact_ips = set()
networks = []
for entry in whitelist_entries:
try:
if '/' in entry:
# È una rete
network = ipaddress.ip_network(entry, strict=False)
networks.append(network)
else:
# È un IP singolo
exact_ips.add(entry)
except Exception as e:
logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}")
whitelist = {
'exact_ips': exact_ips,
'networks': networks
}
logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.")
return whitelist
except Exception as e:
logging.error(f"Errore nel caricamento della whitelist: {e}")
return {'exact_ips': set(), 'networks': []}
def is_ip_whitelisted(ip, whitelist):
"""
Verifica se un IP è nella whitelist
"""
if pd.isna(ip) or not ip:
return False
try:
# Ottimizzazione: utilizziamo un set per la verifica di appartenenza diretta
if ip in whitelist.get('exact_ips', set()):
update_counter('ip_whitelisted')
return True
# Per le reti dobbiamo convertire l'IP in un oggetto
try:
ip_obj = ipaddress.ip_address(ip)
except ValueError:
logging.debug(f"IP non valido: {ip}")
return False
# Limitiamo la verifica a massimo 5000 reti per evitare blocchi
# Verifichiamo le reti solo se l'IP non è già stato trovato come esatto
for i, network in enumerate(whitelist.get('networks', [])):
if i >= 5000: # Limitiamo per evitare blocchi
# Usiamo una variabile globale per loggare questo warning solo una volta per IP
if not hasattr(is_ip_whitelisted, 'warned_ips'):
is_ip_whitelisted.warned_ips = set()
if ip not in is_ip_whitelisted.warned_ips:
logging.debug(f"Limite di 5000 reti raggiunto nella verifica whitelist per IP {ip}")
is_ip_whitelisted.warned_ips.add(ip)
# Limita la dimensione del set per evitare crescita incontrollata
if len(is_ip_whitelisted.warned_ips) > 100:
is_ip_whitelisted.warned_ips.clear()
break
if ip_obj in network:
update_counter('ip_whitelisted')
return True
return False
except Exception as e:
logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}")
return False
def load_last_analyzed_id():
"""
Carica l'ultimo ID analizzato dal file
"""
try:
if os.path.exists(LAST_ID_PATH):
with open(LAST_ID_PATH, 'r') as f:
last_id = int(f.read().strip())
return last_id
else:
logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.")
return 0
except Exception as e:
logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}")
return 0
def save_last_analyzed_id(last_id):
"""
Salva l'ultimo ID analizzato nel file
"""
try:
with open(LAST_ID_PATH, 'w') as f:
f.write(str(last_id))
logging.info(f"Ultimo ID analizzato salvato: {last_id}")
except Exception as e:
logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}")
def extract_data(engine, last_analyzed_id, batch_size=10000):
"""
Estrae i dati dal database a partire dall'ultimo ID analizzato
Utilizzando la tabella Esterna
"""
try:
query = text("""
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4
FROM Esterna
WHERE ID > :last_id
ORDER BY ID ASC
LIMIT :batch_size
""")
new_data = pd.read_sql(query, engine, params={"last_id": last_analyzed_id, "batch_size": batch_size})
logging.info(f"Dati estratti: {len(new_data)} record dalla tabella Esterna.")
return new_data
except Exception as e:
logging.error(f"Errore durante l'esecuzione della query SQL: {e}")
return pd.DataFrame()
def prepare_data(new_data, preprocessor=None):
"""
Prepara i dati per la predizione con un approccio compatibile con il modello
addestrato in analisys_fixed.py, che si aspetta 83 feature
"""
logging.info("--DEBUG-- Inizio preparazione dati compatibile...")
try:
# Ottieni i nomi delle feature dal preprocessor se disponibile
feature_names = []
if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor:
feature_names = preprocessor['feature_columns']
logging.info(f"Trovate {len(feature_names)} feature nel preprocessor")
else:
# Feature predefinite osservate nei log
feature_names = [
'time_since_last', 'events_last_hour', 'events_last_day', 'time_since_last_mean',
'time_since_last_std', 'time_since_last_min', 'time_since_last_max',
'events_last_hour_max', 'events_last_day_max'
]
# Aggiungi feature TF-IDF del protocollo
for i in range(21):
feature_names.append(f'protocol_tfidf_{i}')
# Aggiungi colonne numeriche
for i in range(15):
feature_names.append(f'col_{i}')
# Aggiungi feature categoriche per Host
feature_names.extend(['host_FIBRA', 'host_nan'])
# Aggiungi altre colonne come osservato nei log
for i in range(15):
feature_names.append(f'col_{i}')
# Aggiungi di nuovo le TF-IDF
for i in range(21):
feature_names.append(f'protocol_tfidf_{i}')
logging.info(f"Usando {len(feature_names)} feature predefinite")
# Crea un dataframe con tutte le feature richieste, inizializzate a zero
import numpy as np
X = pd.DataFrame(np.zeros((len(new_data), len(feature_names))), index=new_data.index)
# Imposta i nomi delle colonne
X.columns = feature_names
# Estrai IP attaccante da Messaggio2 se disponibile
if 'Messaggio2' in new_data.columns:
new_data['IP_Attaccante'] = new_data['Messaggio2'].apply(lambda x: x.split(':')[0] if pd.notna(x) and ':' in str(x) else None)
logging.info(f"Estratti {len(new_data[new_data['IP_Attaccante'].notna()])} IP attaccanti")
# Se possibile, aggiungiamo alcune informazioni
if 'IP_Attaccante' in new_data.columns and 'Host' in new_data.columns:
try:
from category_encoders import HashingEncoder
# Converti i valori essenziali (IP e Host)
logging.info("--DEBUG-- Encoding valori essenziali...")
he = HashingEncoder(n_components=10, hash_method='md5')
# Attento a come gestiamo i valori nulli
encoded_values = he.fit_transform(
new_data[['Host', 'IP_Attaccante']].fillna('unknown')
)
# Popola le prime colonne disponibili con i valori codificati
for i in range(min(10, encoded_values.shape[1])):
col_name = f'col_{i}'
if col_name in X.columns:
X[col_name] = encoded_values.iloc[:, i].values
# Imposta il valore della colonna host_FIBRA se presente
if 'host_FIBRA' in X.columns:
X['host_FIBRA'] = new_data['Host'].fillna('').str.contains('FIBRA').astype(int)
# Imposta il valore della colonna host_nan se presente
if 'host_nan' in X.columns:
X['host_nan'] = new_data['Host'].isna().astype(int)
except Exception as e:
logging.error(f"Errore nell'encoding dei valori: {e}")
logging.info(f"--DEBUG-- Preparazione dati completata: {X.shape} (dovrebbe essere {len(new_data)} righe, 83 colonne)")
return X
except Exception as e:
logging.error(f"Errore nella preparazione dei dati: {e}")
import traceback
logging.error(f"Traceback: {traceback.format_exc()}")
return None
def predict_anomalies(model, features):
"""
Predice le anomalie utilizzando il modello caricato
"""
try:
logging.info(f"--DEBUG-- Predizione su {features.shape[0]} esempi con {features.shape[1]} feature")
# Verifica se i nomi delle feature sono corretti
expected_names = [f'col_{i}' for i in range(53)]
if not all(col in features.columns for col in expected_names):
logging.error("Nomi delle colonne non compatibili con il modello")
# Rinomina le colonne se necessario
features.columns = [f'col_{i}' for i in range(features.shape[1])]
logging.info("Colonne rinominate per compatibilità")
# Aggiorna il contatore PRIMA di fare la predizione
update_counter('metrics_processed', features.shape[0])
if hasattr(model, 'predict'):
# Esegui la predizione con timeout
start_time = time.time()
max_time = 60 # Massimo 60 secondi
# Imposta X_sparse per compatibilità con il modello
try:
# Sopprimiamo il warning sui nomi delle feature
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, message="X does not have valid feature names")
from scipy import sparse
# Salva i nomi delle colonne prima di convertire in matrice sparse
feature_names = features.columns.tolist()
X_sparse = sparse.csr_matrix(features.values)
logging.info(f"--DEBUG-- Matrice sparse creata: {X_sparse.shape}")
# Prova a fare la predizione con la matrice sparse
predictions = model.predict(X_sparse)
logging.info(f"--DEBUG-- Predizione su matrice sparse completata: {len(predictions)} risultati")
except Exception as e:
logging.warning(f"Errore sulla matrice sparse: {e}. Provo con DataFrame normale.")
# Fallback: usa il DataFrame originale
predictions = model.predict(features)
logging.info(f"--DEBUG-- Predizione su DataFrame completata: {len(predictions)} risultati")
# Conta le anomalie
anomaly_count = sum(1 for p in predictions if p == -1)
logging.info(f"Trovate {anomaly_count} anomalie su {len(predictions)} predizioni")
return predictions
else:
logging.error("Il modello non ha il metodo predict")
# Fallback - considera tutto normale
return np.zeros(features.shape[0])
except Exception as e:
logging.error(f"Errore durante la predizione: {e}")
import traceback
logging.error(traceback.format_exc())
# Fallback - considera tutto normale
return np.zeros(features.shape[0])
def get_details(engine, ids):
"""
Ottieni i dettagli completi per gli ID specificati dalla tabella Esterna
"""
try:
if not ids:
return pd.DataFrame()
id_list = ','.join(map(str, ids))
query = text("""
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4
FROM Esterna
WHERE ID IN ({})
""".format(id_list))
details = pd.read_sql(query, engine)
# Converti timestamp
if 'Data' in details.columns and 'Ora' in details.columns:
details['Data'] = pd.to_datetime(details['Data'], errors='coerce')
details['Ora'] = pd.to_timedelta(details['Ora'].astype(str), errors='coerce')
details['Timestamp'] = details['Data'] + details['Ora']
return details
except Exception as e:
logging.error(f"Errore nell'ottenere i dettagli dalla tabella Esterna: {e}")
return pd.DataFrame()
def classify_risk(anomaly_score):
"""
Classifica il livello di rischio in base allo score di anomalia
"""
# Normalizza lo score (potrebbe essere già normalizzato)
score = abs(anomaly_score)
if score < RISK_LEVELS['NORMALE']:
update_counter('ip_normal')
return 'NORMALE'
elif score < RISK_LEVELS['BASSO']:
update_counter('ip_low')
return 'BASSO'
elif score < RISK_LEVELS['MEDIO']:
update_counter('ip_medium')
return 'MEDIO'
elif score < RISK_LEVELS['ALTO']:
update_counter('ip_high')
return 'ALTO'
else:
update_counter('ip_critical')
return 'CRITICO'
def is_known_attacker(engine, ip_address):
"""
Verifica se un IP è un attaccante noto
Crea la tabella se non esiste
"""
try:
with engine.connect() as conn:
# Crea la tabella se non esiste
logging.debug(f"Verifica tabella known_attackers per IP {ip_address}")
create_table_query = text("""
CREATE TABLE IF NOT EXISTS known_attackers (
id INT AUTO_INCREMENT PRIMARY KEY,
ip_address VARCHAR(45) NOT NULL,
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
attack_count INT DEFAULT 1,
risk_level VARCHAR(20) DEFAULT 'NORMALE',
ports_used TEXT DEFAULT NULL,
attack_patterns TEXT DEFAULT NULL,
is_blocked TINYINT(1) DEFAULT 0,
UNIQUE KEY unique_ip (ip_address)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# Assicurati che la transazione sia valida
conn.execute(create_table_query)
# Verifica se l'IP esiste
check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
logging.debug(f"Esecuzione query: {check_query} con parametri: {{'ip': {ip_address}}}")
result = conn.execute(check_query, {"ip": ip_address}).fetchone()
exists = result is not None
logging.debug(f"IP {ip_address} è un attaccante noto: {exists}")
return exists
except Exception as e:
logging.error(f"Errore nel verificare se l'IP {ip_address} è un attaccante noto: {e}")
return False
def update_known_attacker(engine, ip_address, risk_level, port=None, message=None):
"""
Aggiorna o inserisce un attaccante noto
Incrementa il livello di rischio se l'IP viene visto più volte
"""
try:
conn = engine.connect()
trans = conn.begin() # Inizia una transazione esplicita
try:
# Verifica se l'IP esiste già
logging.debug(f"Aggiornamento attaccante noto: {ip_address}, rischio iniziale: {risk_level}")
query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
result = conn.execute(query, {"ip": ip_address}).fetchone()
if result:
# Ottieni il conteggio attacchi e il rischio attuale
attack_count = result[4] + 1 # Indice 4 = attack_count
current_risk = result[5] # Indice 5 = risk_level
current_ports = result[6] # Indice 6 = ports_used
logging.debug(f"IP {ip_address}: conteggio attuale={attack_count-1}, rischio attuale={current_risk}")
# Aggiorna la lista delle porte
new_ports = current_ports or ""
if port and port not in new_ports:
new_ports = f"{new_ports},{port}" if new_ports else port
# Incrementa il livello di rischio basato sul numero di rilevamenti
new_risk = risk_level
old_risk = current_risk # Teniamo traccia del rischio precedente
# Escalation del rischio in base al numero di rilevamenti
if risk_level == 'NORMALE' and attack_count >= 10:
new_risk = 'BASSO'
logging.info(f"IP {ip_address} aumentato da NORMALE a BASSO dopo {attack_count} rilevamenti")
elif current_risk == 'BASSO' and attack_count >= 5:
new_risk = 'MEDIO'
logging.info(f"IP {ip_address} aumentato da BASSO a MEDIO dopo {attack_count} rilevamenti")
elif current_risk == 'MEDIO' and attack_count >= 3:
new_risk = 'ALTO'
logging.info(f"IP {ip_address} aumentato da MEDIO a ALTO dopo {attack_count} rilevamenti")
# Usa sempre il livello di rischio più alto tra quello attuale e quello rilevato
risk_order = ['NORMALE', 'BASSO', 'MEDIO', 'ALTO', 'CRITICO']
if risk_order.index(current_risk) > risk_order.index(new_risk):
new_risk = current_risk # Mantiene il rischio più alto
# Se il rischio è cambiato, aggiorna i contatori
if new_risk != old_risk:
# Decrementa il contatore del vecchio livello se possibile
if old_risk == 'NORMALE':
update_counter('ip_normal', -1)
elif old_risk == 'BASSO':
update_counter('ip_low', -1)
elif old_risk == 'MEDIO':
update_counter('ip_medium', -1)
elif old_risk == 'ALTO':
update_counter('ip_high', -1)
elif old_risk == 'CRITICO':
update_counter('ip_critical', -1)
# Incrementa il contatore del nuovo livello
if new_risk == 'NORMALE':
update_counter('ip_normal')
elif new_risk == 'BASSO':
update_counter('ip_low')
elif new_risk == 'MEDIO':
update_counter('ip_medium')
elif new_risk == 'ALTO':
update_counter('ip_high')
elif new_risk == 'CRITICO':
update_counter('ip_critical')
logging.debug(f"IP {ip_address}: nuovo conteggio={attack_count}, nuovo rischio={new_risk}, porte={new_ports}")
# Aggiorna l'esistente
update_query = text("""
UPDATE known_attackers
SET last_seen = NOW(),
attack_count = attack_count + 1,
risk_level = :risk,
ports_used = :ports
WHERE ip_address = :ip
""")
conn.execute(update_query, {"ip": ip_address, "risk": new_risk, "ports": new_ports})
# Commit della transazione
trans.commit()
# Restituisci il nuovo livello di rischio
return new_risk
else:
# Inserisci nuovo
logging.debug(f"Inserimento nuovo attaccante: {ip_address}, rischio={risk_level}, porta={port}")
insert_query = text("""
INSERT INTO known_attackers
(ip_address, risk_level, ports_used, attack_patterns, is_blocked)
VALUES (:ip, :risk, :port, :message, 0)
""")
result = conn.execute(insert_query, {"ip": ip_address, "risk": risk_level, "port": port, "message": message})
logging.debug(f"Risultato inserimento: {result.rowcount} righe inserite")
# Commit della transazione
trans.commit()
# Inizializza il contatore al nuovo livello di rischio
if risk_level == 'NORMALE':
update_counter('ip_normal')
elif risk_level == 'BASSO':
update_counter('ip_low')
elif risk_level == 'MEDIO':
update_counter('ip_medium')
elif risk_level == 'ALTO':
update_counter('ip_high')
elif risk_level == 'CRITICO':
update_counter('ip_critical')
return risk_level
except Exception as e:
# Rollback in caso di errore
trans.rollback()
logging.error(f"Errore nell'aggiornare l'attaccante noto {ip_address}: {e}")
logging.error(f"Dettagli errore: {str(e)}")
return risk_level
finally:
# Chiudi la connessione
conn.close()
except Exception as e:
logging.error(f"Errore nel creare la connessione per {ip_address}: {e}")
return risk_level
def should_block_ip(risk_level):
"""
Determina se un IP dovrebbe essere bloccato in base al suo livello di rischio
"""
should_block = risk_level in ['ALTO', 'CRITICO']
logging.debug(f"IP con rischio {risk_level} dovrebbe essere bloccato: {should_block}")
return should_block
def insert_anomaly_to_db(engine, ip_address, risk_level, list_name):
"""
Inserisce o aggiorna un'anomalia nel database
"""
try:
with engine.connect() as conn:
# Verifica se la colonna risk_level esiste nella tabella ip_list
try:
# Verifica la struttura della tabella
check_column_query = text("""
SELECT COUNT(*) AS column_exists
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = :db_name
AND TABLE_NAME = 'ip_list'
AND COLUMN_NAME = 'risk_level'
""")
db_name = os.environ.get('MYSQL_DATABASE', 'LOG_MIKROTIK')
result = conn.execute(check_column_query, {"db_name": db_name}).fetchone()
if result[0] == 0:
# La colonna non esiste, aggiungiamola
logging.info("La colonna risk_level non esiste nella tabella ip_list. Aggiunta in corso...")
alter_query = text("""
ALTER TABLE ip_list
ADD COLUMN risk_level VARCHAR(20) DEFAULT 'MEDIO'
""")
conn.execute(alter_query)
logging.info("Colonna risk_level aggiunta con successo.")
else:
logging.debug("La colonna risk_level esiste già nella tabella ip_list.")
except Exception as e:
logging.error(f"Errore nella verifica/aggiunta della colonna risk_level: {e}")
# Continua comunque, potrebbe fallire nell'insert ma almeno ci abbiamo provato
# Ora procediamo con l'inserimento
upsert_query = text("""
INSERT INTO ip_list (list_name, ip_address, retrieved_at, risk_level)
VALUES (:list_name, :ip_address, NOW(), :risk_level)
ON DUPLICATE KEY UPDATE
retrieved_at = NOW(),
risk_level = :risk_level
""")
conn.execute(upsert_query, {
"list_name": list_name,
"ip_address": ip_address,
"risk_level": risk_level
})
logging.info(f"IP {ip_address} inserito/aggiornato nella lista {list_name} con rischio {risk_level}")
return True
except Exception as e:
logging.error(f"Errore nell'inserire l'IP {ip_address} nel database: {e}")
return False
def handle_anomaly(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'):
"""
Gestisce un'anomalia rilevata aggiornando le informazioni nel database
"""
logging.debug(f"Gestione anomalia per IP: {ip_address}, rischio: {risk_level}")
try:
# Aggiorna le informazioni sull'attaccante nel database
final_risk = update_known_attacker(engine, ip_address, risk_level, port, message)
# Se il rischio è ALTO o CRITICO, aggiungi IP alla lista di blocco
if should_block_ip(final_risk):
insert_anomaly_to_db(engine, ip_address, final_risk, list_name)
except Exception as e:
logging.error(f"Errore nella gestione dell'anomalia per {ip_address}: {e}")
def main():
"""
Funzione principale per il rilevamento DDoS
"""
# Parsing degli argomenti
parser = argparse.ArgumentParser(description='Rilevamento DDoS in tempo reale')
parser.add_argument('--debug', action='store_true', help='Abilita logging di debug dettagliato')
parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione del batch di dati da analizzare')
parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso del file whitelist')
parser.add_argument('--ciclo', action='store_true', help='Esegui in un ciclo infinito fino all\'interruzione')
parser.add_argument('--pausa', type=int, default=60, help='Secondi di pausa tra un ciclo e l\'altro (con --ciclo)')
args = parser.parse_args()
# Imposta il livello di logging in base agli argomenti
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("Modalità debug attivata")
# Gestisce l'interruzione con CTRL+C
def handle_interrupt(signum, frame):
logging.info("Ricevuto segnale di interruzione. Chiusura del programma...")
end_progress_tracking()
exit(0)
# Registra il gestore per SIGINT (CTRL+C)
signal.signal(signal.SIGINT, handle_interrupt)
# Ciclo infinito quando --ciclo è specificato
ciclo_count = 0
def esegui_analisi():
# Visualizza informazioni di avvio
if args.ciclo:
ciclo_txt = f" (ciclo {ciclo_count})"
else:
ciclo_txt = ""
logging.info(f"Avvio rilevamento DDoS{ciclo_txt}...")
start_progress_tracking(f"rilevamento DDoS{ciclo_txt}")
# Verifica percorsi e autorizzazioni dei file di modello
logging.info("Verifica dei percorsi dei modelli...")
model_files = {
"Modello principale": MODEL_PATH,
"Preprocessor": PREPROCESSOR_PATH
}
for name, path in model_files.items():
if os.path.exists(path):
logging.info(f"{name} trovato: {path}")
try:
# Verifica che il file sia leggibile
with open(path, 'rb') as f:
f.read(1)
logging.info(f"{name} è leggibile")
except Exception as e:
logging.error(f"{name} esiste ma non è leggibile: {e}")
else:
logging.warning(f"{name} non trovato: {path}")
# Test connessione database
if not test_database_connection():
logging.error("Impossibile connettersi al database. Verificare le credenziali e la disponibilità del server.")
end_progress_tracking()
return False
try:
# Connessione al database
logging.info("Connessione al database...")
engine = create_engine_with_retry(CONN_STRING)
# Caricamento del modello
model, preprocessor = load_models()
if model is None:
logging.error("Impossibile caricare il modello. Arresto del programma.")
end_progress_tracking()
return False
# Verifica che il modello sia valido
if not hasattr(model, 'predict'):
logging.error("Il modello caricato non è valido (manca il metodo predict). Arresto del programma.")
end_progress_tracking()
return False
# Carica la whitelist e processa i dati
whitelist = load_whitelist(args.whitelist)
# Verifica se ci sono dati nuovi
last_id = load_last_analyzed_id()
logging.info(f"Last analyzed ID: {last_id}")
# SEMPLIFICA IL FLUSSO PRINCIPALE
try:
# 1. Estrai dati
logging.info("--DEBUG-- Estrazione dati dal database...")
new_data = extract_data(engine, last_id, args.batch_size)
if new_data.empty:
logging.info("Nessun nuovo dato da analizzare.")
end_progress_tracking()
return True
# 2. Estrai e prepara gli IP attaccanti
logging.info("--DEBUG-- Preparazione IP attaccanti...")
# Converti in pochi passaggi per evitare blocchi
new_data['IP_Attaccante'] = new_data['Messaggio2'].apply(
lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None
)
# 3. Conta solo un campione di IP whitelistati per evitare blocchi
whitelisted_ips = set()
if len(new_data) > 100:
# Verifichiamo tutti gli IP per la whitelist
all_ips = new_data['IP_Attaccante'].dropna().unique()
logging.info(f"--DEBUG-- Verifica di {len(all_ips)} IP unici contro la whitelist...")
for ip in all_ips:
if is_ip_whitelisted(ip, whitelist):
whitelisted_ips.add(ip)
logging.info(f"Trovati {len(whitelisted_ips)} IP in whitelist su {len(all_ips)} IP unici")
# 4. Prepara i dati con metodo minimalista
logging.info("--DEBUG-- Preparazione dati per predizione...")
features = prepare_data(new_data, preprocessor)
if features is None or features.shape[0] == 0:
logging.error("Nessuna feature generata per la predizione.")
end_progress_tracking()
return True
# 5. Fai la predizione
logging.info("--DEBUG-- Predizione anomalie...")
predictions = predict_anomalies(model, features)
if len(predictions) == 0:
logging.warning("Nessuna predizione generata, uso array di zeri")
predictions = np.zeros(new_data.shape[0])
# 6. Aggiusta le dimensioni se necessario
if len(predictions) != len(new_data):
logging.warning(f"Dimensioni differenti: predizioni {len(predictions)}, dati {len(new_data)}")
if len(predictions) < len(new_data):
# Estendi l'array delle predizioni
predictions = np.append(predictions, np.zeros(len(new_data) - len(predictions)))
else:
# Tronca l'array delle predizioni
predictions = predictions[:len(new_data)]
# 7. Applica risultati e conta
new_data['anomaly'] = predictions
# 8. Aggiorna i contatori per ogni IP (senza duplicati)
ip_counter = {}
for idx, row in new_data.iterrows():
ip = row.get('IP_Attaccante')
if pd.notna(ip) and ip not in ip_counter:
ip_counter[ip] = True
if ip not in whitelisted_ips:
# Un nuovo IP analizzato
update_counter('ip_analyzed')
# Classifica in base al risultato della predizione
if row['anomaly'] == -1:
# Anomalo - non incrementiamo qui, lo farà update_known_attacker
pass
else:
# Normale
update_counter('ip_normal')
logging.info(f"Analizzati {len(ip_counter)} IP unici, {len(whitelisted_ips)} in whitelist")
# 9. Gestisci anomalie
anomalies = new_data[new_data['anomaly'] == -1]
logging.info(f"Rilevate {len(anomalies)} anomalie su {len(new_data)} eventi")
if not anomalies.empty:
# Mostra un esempio delle anomalie nei log
sample_size = min(5, len(anomalies))
logging.info(f"Esempio di {sample_size} anomalie rilevate:")
for idx, row in anomalies.head(sample_size).iterrows():
ip = row.get('IP_Attaccante')
msg = row.get('Messaggio2', 'N/A')
logging.info(f" - [{idx}] IP: {ip}, Messaggio: {msg}")
# Conteggia gli IP anomali per IP
anomaly_ips = {}
for idx, row in anomalies.iterrows():
ip = row.get('IP_Attaccante')
if pd.notna(ip):
if ip not in anomaly_ips:
anomaly_ips[ip] = 0
anomaly_ips[ip] += 1
logging.info(f"Trovati {len(anomaly_ips)} IP unici con anomalie")
# Mostra i top 10 IP con più anomalie
top_anomalies = sorted(anomaly_ips.items(), key=lambda x: x[1], reverse=True)[:10]
if top_anomalies:
logging.info("Top 10 IP con più anomalie:")
for ip, count in top_anomalies:
logging.info(f" - IP: {ip}, Anomalie: {count}")
# Processa ciascun IP una sola volta
processed_ips = set()
for idx, row in anomalies.iterrows():
ip = row.get('IP_Attaccante')
if pd.notna(ip) and ip not in processed_ips and not is_ip_whitelisted(ip, whitelist):
processed_ips.add(ip)
# Assegna un livello di rischio base
risk_level = 'MEDIO' # Default
port = None
if 'Porta_Attaccante' in row and pd.notna(row['Porta_Attaccante']):
port = row['Porta_Attaccante']
# Crea un messaggio informativo
msg = f"Anomalia rilevata da {row.get('Messaggio2', 'N/A')}"
# Gestisci l'anomalia
handle_anomaly(engine, ip, risk_level, port, msg, 'ddos_ia')
logging.info(f"Gestite anomalie per {len(processed_ips)} IP unici")
# 10. Salva l'ultimo ID analizzato
if not new_data.empty:
last_analyzed_id = new_data['ID'].max()
save_last_analyzed_id(last_analyzed_id)
# 11. Segnala il completamento
logging.info(f"Analisi completata: processati {len(new_data)} eventi, trovate {len(anomalies)} anomalie")
end_progress_tracking()
return True
except Exception as e:
logging.error(f"Errore durante l'analisi: {e}")
import traceback
logging.error(f"Traceback completo: {traceback.format_exc()}")
end_progress_tracking()
return False
except Exception as e:
logging.error(f"Errore generale: {e}")
import traceback
logging.error(f"Traceback completo: {traceback.format_exc()}")
end_progress_tracking()
return False
# Esegui una singola analisi o in ciclo
if args.ciclo:
logging.info(f"Esecuzione in modalità ciclo. Per interrompere premere CTRL+C")
while True:
ciclo_count += 1
success = esegui_analisi()
if success:
logging.info(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...")
time.sleep(args.pausa)
else:
logging.error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi prima di riprovare...")
time.sleep(args.pausa * 2) # Pausa più lunga in caso di errore
else:
# Modalità singola
esegui_analisi()
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--test":
logging.info("MODALITÀ TEST: verifica delle funzioni principali")
try:
engine = create_engine_with_retry(CONN_STRING)
test_ip = "192.168.1.1"
logging.info(f"Test 1: Verifica tabella known_attackers")
is_known = is_known_attacker(engine, test_ip)
logging.info(f"IP {test_ip} è un attaccante noto: {is_known}")
logging.info(f"Test 2: Aggiornamento known_attacker")
new_risk = update_known_attacker(engine, test_ip, "NORMALE", "80", "Test message")
logging.info(f"Nuovo livello di rischio: {new_risk}")
logging.info(f"Test 3: Verifica se ora è un attaccante noto")
is_known = is_known_attacker(engine, test_ip)
logging.info(f"IP {test_ip} è ora un attaccante noto: {is_known}")
logging.info("MODALITÀ TEST completata")
sys.exit(0)
except Exception as e:
logging.error(f"Errore nei test: {e}")
sys.exit(1)
else:
main()