Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
879 lines
34 KiB
Python
879 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.sql import text
|
|
from joblib import load
|
|
import logging
|
|
import gc
|
|
import os
|
|
import time
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
import ipaddress
|
|
import numpy as np
|
|
from sklearn.ensemble import IsolationForest
|
|
import threading
|
|
import argparse
|
|
import signal
|
|
import multiprocessing
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor
|
|
from category_encoders import HashingEncoder
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
import psutil
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
# Configurazione del logging ottimizzata
|
|
logging.basicConfig(
|
|
level=logging.WARNING,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('ddetect_debug.log')
|
|
]
|
|
)
|
|
|
|
# Configurazione del database
|
|
DB_USER = os.environ.get('DB_USER', 'root')
|
|
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
|
|
DB_HOST = os.environ.get('DB_HOST', 'localhost')
|
|
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
|
|
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
|
|
|
|
# Percorsi dei file
|
|
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
|
|
os.makedirs(MODEL_DIR, exist_ok=True)
|
|
MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
|
|
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
|
|
WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt')
|
|
LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt')
|
|
|
|
# Definizione dei livelli di rischio e soglie
|
|
RISK_LEVELS = {
|
|
'NORMALE': 0.1,
|
|
'BASSO': 0.3,
|
|
'MEDIO': 0.6,
|
|
'ALTO': 0.8,
|
|
'CRITICO': 0.95
|
|
}
|
|
|
|
# Ottimizzazioni per grandi volumi di dati
|
|
CHUNK_SIZE = 5000 # Dimensione ottimale per chunk di dati
|
|
MAX_MEMORY_USAGE = 80 # Percentuale massima di memoria utilizzabile
|
|
CACHE_SIZE = 10000 # Dimensione cache per IP whitelistati
|
|
|
|
# Cache globale per ottimizzazioni
|
|
ip_whitelist_cache = {}
|
|
model_cache = None
|
|
preprocessor_cache = None
|
|
|
|
class Colors:
|
|
HEADER = '\033[95m'
|
|
BLUE = '\033[94m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
BOLD = '\033[1m'
|
|
UNDERLINE = '\033[4m'
|
|
END = '\033[0m'
|
|
|
|
def log_phase(message):
|
|
"""Evidenzia una nuova fase principale dell'esecuzione"""
|
|
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n")
|
|
logging.info(f"FASE: {message}")
|
|
|
|
def log_result(message):
|
|
"""Evidenzia un risultato importante"""
|
|
print(f"{Colors.BLUE}✓ {message}{Colors.END}")
|
|
logging.info(f"RISULTATO: {message}")
|
|
|
|
def log_warning(message):
|
|
"""Evidenzia un avviso importante"""
|
|
print(f"{Colors.YELLOW}⚠ {message}{Colors.END}")
|
|
logging.warning(message)
|
|
|
|
def log_error(message):
|
|
"""Evidenzia un errore importante"""
|
|
print(f"{Colors.RED}✗ {message}{Colors.END}")
|
|
logging.error(message)
|
|
|
|
# Variabili globali per il tracciamento dell'avanzamento
|
|
progress_counters = {
|
|
'ip_whitelisted': 0,
|
|
'ip_analyzed': 0,
|
|
'ip_normal': 0,
|
|
'ip_low': 0,
|
|
'ip_medium': 0,
|
|
'ip_high': 0,
|
|
'ip_critical': 0,
|
|
'metrics_processed': 0,
|
|
'last_update': 0,
|
|
'in_progress': False,
|
|
'operation': '',
|
|
'start_time': None
|
|
}
|
|
|
|
def check_memory_usage():
|
|
"""Controlla l'utilizzo della memoria e forza garbage collection se necessario"""
|
|
memory_percent = psutil.virtual_memory().percent
|
|
if memory_percent > MAX_MEMORY_USAGE:
|
|
logging.warning(f"Utilizzo memoria alto: {memory_percent}%. Forzando garbage collection...")
|
|
gc.collect()
|
|
return True
|
|
return False
|
|
|
|
def reset_counters():
|
|
"""Resetta i contatori per una nuova esecuzione"""
|
|
global progress_counters
|
|
progress_counters.update({
|
|
'ip_whitelisted': 0,
|
|
'ip_analyzed': 0,
|
|
'ip_normal': 0,
|
|
'ip_low': 0,
|
|
'ip_medium': 0,
|
|
'ip_high': 0,
|
|
'ip_critical': 0,
|
|
'metrics_processed': 0,
|
|
'last_update': 0,
|
|
'in_progress': False,
|
|
'operation': '',
|
|
'start_time': None
|
|
})
|
|
|
|
def start_progress_tracking(operation):
|
|
"""Inizia il tracciamento dell'operazione"""
|
|
global progress_counters
|
|
reset_counters()
|
|
progress_counters['in_progress'] = True
|
|
progress_counters['operation'] = operation
|
|
progress_counters['start_time'] = time.time()
|
|
|
|
threading.Thread(target=progress_reporter, daemon=True).start()
|
|
logging.info(f"Avvio monitoraggio operazione: {operation}")
|
|
|
|
def update_counter(counter_name, increment=1):
|
|
"""Aggiorna un contatore specifico"""
|
|
global progress_counters
|
|
if counter_name in progress_counters:
|
|
progress_counters[counter_name] += increment
|
|
|
|
def end_progress_tracking():
|
|
"""Termina il tracciamento e mostra il report finale"""
|
|
global progress_counters
|
|
if not progress_counters['in_progress']:
|
|
return
|
|
|
|
progress_counters['in_progress'] = False
|
|
report_progress(force=True)
|
|
logging.info(f"Monitoraggio completato per: {progress_counters['operation']}")
|
|
|
|
def report_progress(force=False):
|
|
"""Riporta lo stato attuale dei contatori"""
|
|
global progress_counters
|
|
if not progress_counters['in_progress'] and not force:
|
|
return
|
|
|
|
current_time = time.time()
|
|
if not force and (current_time - progress_counters['last_update']) < 10:
|
|
return
|
|
|
|
elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0
|
|
|
|
report = f"""
|
|
{Colors.BOLD}======== REPORT DI PROGRESSO - {progress_counters['operation']} ========{Colors.END}
|
|
Tempo trascorso: {elapsed:.1f} secondi
|
|
IP Whitelistati esclusi: {progress_counters['ip_whitelisted']}
|
|
Metriche elaborate: {progress_counters['metrics_processed']}
|
|
IP Analizzati: {progress_counters['ip_analyzed']}
|
|
Classificazione rischio:
|
|
- IP NORMALI: {progress_counters['ip_normal']}
|
|
- IP BASSI: {progress_counters['ip_low']}
|
|
- IP MEDI: {progress_counters['ip_medium']}
|
|
- IP ALTI: {progress_counters['ip_high']}
|
|
- IP CRITICI: {progress_counters['ip_critical']}
|
|
Memoria utilizzata: {psutil.virtual_memory().percent:.1f}%
|
|
{Colors.BOLD}================================================================{Colors.END}
|
|
"""
|
|
print(report)
|
|
logging.info(report.replace(Colors.BOLD, '').replace(Colors.END, ''))
|
|
progress_counters['last_update'] = current_time
|
|
|
|
def progress_reporter():
|
|
"""Thread che riporta periodicamente i progressi"""
|
|
while progress_counters['in_progress']:
|
|
report_progress()
|
|
time.sleep(2)
|
|
|
|
def test_database_connection():
|
|
"""Test di connessione al database"""
|
|
try:
|
|
logging.debug("Tentativo di connessione al database...")
|
|
engine = create_engine(CONN_STRING)
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("SELECT 1")).fetchone()
|
|
if result[0] == 1:
|
|
logging.debug("Test connessione al database riuscito!")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Errore nel test di connessione al database: {e}")
|
|
return False
|
|
|
|
def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2):
|
|
"""Crea una connessione al database con tentativi multipli e ottimizzazioni"""
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Configurazione ottimizzata per grandi volumi di dati
|
|
engine = create_engine(
|
|
conn_string,
|
|
pool_size=10, # Aumentato per parallelismo
|
|
max_overflow=20, # Aumentato per picchi di carico
|
|
pool_recycle=1800, # Ridotto per evitare timeout
|
|
pool_pre_ping=True,
|
|
pool_timeout=60, # Aumentato per operazioni lunghe
|
|
echo=False,
|
|
isolation_level="READ COMMITTED",
|
|
connect_args={
|
|
'charset': 'utf8mb4',
|
|
'use_unicode': True,
|
|
'autocommit': True,
|
|
'sql_mode': 'TRADITIONAL'
|
|
}
|
|
)
|
|
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SELECT 1")).fetchone()
|
|
|
|
logging.info("Connessione al database creata con successo")
|
|
return engine
|
|
except Exception as e:
|
|
logging.error(f"Tentativo {attempt+1} fallito: {e}")
|
|
if attempt < max_retries - 1:
|
|
logging.info(f"Nuovo tentativo tra {retry_delay} secondi...")
|
|
time.sleep(retry_delay)
|
|
retry_delay *= 2
|
|
else:
|
|
logging.error("Impossibile connettersi al database dopo tutti i tentativi")
|
|
raise
|
|
|
|
def load_models():
|
|
"""Carica i modelli di rilevamento delle anomalie addestrati con cache"""
|
|
global model_cache, preprocessor_cache
|
|
|
|
# Usa cache se disponibile
|
|
if model_cache is not None and preprocessor_cache is not None:
|
|
return model_cache, preprocessor_cache
|
|
|
|
try:
|
|
# Carica il modello
|
|
logging.info(f"Caricamento modello da {MODEL_PATH}...")
|
|
if os.path.exists(MODEL_PATH):
|
|
model_cache = load(MODEL_PATH)
|
|
logging.debug("Modello caricato con successo!")
|
|
else:
|
|
logging.error(f"File modello non trovato: {MODEL_PATH}")
|
|
return None, None
|
|
|
|
# Carica il preprocessor
|
|
try:
|
|
logging.info(f"Caricamento preprocessor da {PREPROCESSOR_PATH}...")
|
|
if os.path.exists(PREPROCESSOR_PATH):
|
|
preprocessor_cache = load(PREPROCESSOR_PATH)
|
|
|
|
if isinstance(preprocessor_cache, dict) and 'feature_columns' in preprocessor_cache:
|
|
feature_count = len(preprocessor_cache['feature_columns'])
|
|
if feature_count < 125:
|
|
logging.warning(f"Il modello si aspetta 125 feature, "
|
|
f"ma il preprocessor ne ha {feature_count}")
|
|
|
|
return model_cache, preprocessor_cache
|
|
else:
|
|
logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.")
|
|
else:
|
|
logging.error(f"File preprocessor non trovato: {PREPROCESSOR_PATH}")
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento del preprocessor: {e}")
|
|
|
|
# Crea un preprocessor di fallback
|
|
preprocessor_cache = {'feature_columns': [f'feature_{i}' for i in range(125)]}
|
|
return model_cache, preprocessor_cache
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento dei modelli: {e}")
|
|
return None, None
|
|
|
|
def load_whitelist(whitelist_path=None):
|
|
"""Carica la whitelist da file con ottimizzazioni"""
|
|
if whitelist_path is None:
|
|
whitelist_path = '/root/whitelist.txt'
|
|
|
|
try:
|
|
if not os.path.exists(whitelist_path):
|
|
logging.warning(f"File whitelist non trovato: {whitelist_path}")
|
|
return {'exact_ips': set(), 'networks': []}
|
|
|
|
with open(whitelist_path, 'r') as f:
|
|
whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')]
|
|
|
|
exact_ips = set()
|
|
networks = []
|
|
|
|
for entry in whitelist_entries:
|
|
try:
|
|
if '/' in entry:
|
|
network = ipaddress.ip_network(entry, strict=False)
|
|
networks.append(network)
|
|
else:
|
|
exact_ips.add(entry)
|
|
except Exception as e:
|
|
logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}")
|
|
|
|
whitelist = {
|
|
'exact_ips': exact_ips,
|
|
'networks': networks
|
|
}
|
|
|
|
logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.")
|
|
return whitelist
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento della whitelist: {e}")
|
|
return {'exact_ips': set(), 'networks': []}
|
|
|
|
def is_ip_whitelisted(ip, whitelist):
|
|
"""Verifica se un IP è nella whitelist con cache ottimizzata"""
|
|
global ip_whitelist_cache
|
|
|
|
if pd.isna(ip) or not ip:
|
|
return False
|
|
|
|
# Controlla cache
|
|
if ip in ip_whitelist_cache:
|
|
if ip_whitelist_cache[ip]:
|
|
update_counter('ip_whitelisted')
|
|
return ip_whitelist_cache[ip]
|
|
|
|
try:
|
|
# Verifica diretta negli IP esatti
|
|
if ip in whitelist.get('exact_ips', set()):
|
|
ip_whitelist_cache[ip] = True
|
|
# Limita dimensione cache
|
|
if len(ip_whitelist_cache) > CACHE_SIZE:
|
|
# Rimuovi il 20% più vecchio
|
|
items_to_remove = list(ip_whitelist_cache.keys())[:CACHE_SIZE//5]
|
|
for key in items_to_remove:
|
|
del ip_whitelist_cache[key]
|
|
update_counter('ip_whitelisted')
|
|
return True
|
|
|
|
# Verifica nelle reti (limitata per performance)
|
|
try:
|
|
ip_obj = ipaddress.ip_address(ip)
|
|
except ValueError:
|
|
ip_whitelist_cache[ip] = False
|
|
return False
|
|
|
|
# Limita verifica reti per performance
|
|
for i, network in enumerate(whitelist.get('networks', [])):
|
|
if i >= 1000: # Limite per evitare blocchi
|
|
break
|
|
|
|
if ip_obj in network:
|
|
ip_whitelist_cache[ip] = True
|
|
update_counter('ip_whitelisted')
|
|
return True
|
|
|
|
ip_whitelist_cache[ip] = False
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}")
|
|
ip_whitelist_cache[ip] = False
|
|
return False
|
|
|
|
def load_last_analyzed_id():
|
|
"""Carica l'ultimo ID analizzato dal file"""
|
|
try:
|
|
if os.path.exists(LAST_ID_PATH):
|
|
with open(LAST_ID_PATH, 'r') as f:
|
|
last_id = int(f.read().strip())
|
|
return last_id
|
|
else:
|
|
logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.")
|
|
return 0
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}")
|
|
return 0
|
|
|
|
def save_last_analyzed_id(last_id):
|
|
"""Salva l'ultimo ID analizzato nel file"""
|
|
try:
|
|
with open(LAST_ID_PATH, 'w') as f:
|
|
f.write(str(last_id))
|
|
logging.info(f"Ultimo ID analizzato salvato: {last_id}")
|
|
except Exception as e:
|
|
logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}")
|
|
|
|
def extract_data_optimized(engine, last_id=0, batch_size=1000, max_id=None):
|
|
"""Estrazione dati ottimizzata per grandi volumi"""
|
|
try:
|
|
if max_id:
|
|
logging.info(f"Limitazione estrazione fino a ID {max_id}")
|
|
|
|
# Query di conteggio ottimizzata
|
|
if max_id:
|
|
query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id} AND ID <= {max_id}"
|
|
else:
|
|
query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id}"
|
|
|
|
with engine.connect() as conn:
|
|
count_result = conn.execute(text(query_count)).scalar()
|
|
|
|
if count_result == 0:
|
|
logging.info("Nessun nuovo record da estrarre")
|
|
return pd.DataFrame()
|
|
|
|
logging.info(f"Trovati {count_result} nuovi record da estrarre")
|
|
|
|
# Ottimizzazione per grandi dataset
|
|
effective_batch_size = min(batch_size, CHUNK_SIZE)
|
|
if count_result > 100000:
|
|
logging.warning(f"Dataset molto grande ({count_result}). Limitando a ultimi 50k record.")
|
|
|
|
with engine.connect() as conn:
|
|
latest_id_query = "SELECT MAX(ID) FROM Esterna"
|
|
latest_id = conn.execute(text(latest_id_query)).scalar()
|
|
|
|
if latest_id:
|
|
max_id = latest_id
|
|
last_id = max(last_id, latest_id - 50000)
|
|
count_result = min(count_result, 50000)
|
|
|
|
# Estrazione ottimizzata con chunking
|
|
frames = []
|
|
current_id = last_id
|
|
|
|
# Colonne essenziali per ridurre memoria
|
|
essential_columns = ['ID', 'Data', 'Ora', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3']
|
|
columns_str = ', '.join(essential_columns)
|
|
|
|
num_batches = (count_result + effective_batch_size - 1) // effective_batch_size
|
|
|
|
for i in range(num_batches):
|
|
# Controllo memoria
|
|
if check_memory_usage():
|
|
logging.warning("Memoria alta, riducendo batch size")
|
|
effective_batch_size = max(100, effective_batch_size // 2)
|
|
|
|
with engine.connect() as conn:
|
|
if max_id:
|
|
query = f"""
|
|
SELECT {columns_str}
|
|
FROM Esterna
|
|
WHERE ID > {current_id} AND ID <= {max_id}
|
|
ORDER BY ID ASC
|
|
LIMIT {effective_batch_size}
|
|
"""
|
|
else:
|
|
query = f"""
|
|
SELECT {columns_str}
|
|
FROM Esterna
|
|
WHERE ID > {current_id}
|
|
ORDER BY ID ASC
|
|
LIMIT {effective_batch_size}
|
|
"""
|
|
|
|
result = conn.execute(text(query))
|
|
chunk = pd.DataFrame(result.fetchall(), columns=result.keys())
|
|
|
|
if chunk.empty:
|
|
break
|
|
|
|
current_id = chunk['ID'].max()
|
|
|
|
# Ottimizzazione timestamp
|
|
if 'Data' in chunk.columns and 'Ora' in chunk.columns:
|
|
try:
|
|
chunk['Data'] = pd.to_datetime(chunk['Data'], errors='coerce')
|
|
chunk['Ora'] = pd.to_timedelta(chunk['Ora'].astype(str), errors='coerce')
|
|
chunk['Timestamp'] = chunk['Data'] + chunk['Ora']
|
|
except Exception as e:
|
|
logging.warning(f"Impossibile creare colonna Timestamp: {e}")
|
|
|
|
frames.append(chunk)
|
|
logging.info(f"Estratti {len(chunk)} record, batch {i+1}/{num_batches}")
|
|
|
|
if not frames:
|
|
return pd.DataFrame()
|
|
|
|
result = pd.concat(frames, ignore_index=True)
|
|
logging.info(f"Estrazione completata: {len(result)} record totali")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'estrazione dei dati: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def prepare_data_optimized(df, preprocessor):
|
|
"""Preparazione dati ottimizzata per grandi volumi"""
|
|
try:
|
|
if df.empty:
|
|
return None
|
|
|
|
# Crea copia ottimizzata
|
|
df = df.copy()
|
|
|
|
# Numero atteso di feature
|
|
expected_features = 125
|
|
feature_data = {}
|
|
feature_count = 0
|
|
|
|
# 1. Feature temporali essenziali (9 feature) - versione semplificata
|
|
time_features = [
|
|
'time_since_last', 'events_last_hour', 'events_last_day',
|
|
'time_since_last_mean', 'time_since_last_std', 'time_since_last_min',
|
|
'time_since_last_max', 'events_last_hour_max', 'events_last_day_max'
|
|
]
|
|
|
|
for feat in time_features:
|
|
feature_data[feat] = np.zeros(len(df))
|
|
feature_count += 1
|
|
|
|
# 2. Feature TF-IDF semplificate (21 feature)
|
|
if 'Messaggio1' in df.columns:
|
|
try:
|
|
# Versione semplificata per performance
|
|
proto_data = df['Messaggio1'].fillna('').astype(str)
|
|
# Usa solo i protocolli più comuni per velocità
|
|
common_protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'HTTPS', 'SSH', 'FTP']
|
|
|
|
for i, protocol in enumerate(common_protocols[:21]):
|
|
feature_data[f'protocol_tfidf_{i}'] = proto_data.str.contains(protocol, case=False).astype(int)
|
|
feature_count += 1
|
|
|
|
# Riempi rimanenti
|
|
for i in range(len(common_protocols), 21):
|
|
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
|
|
feature_count += 1
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'estrazione TF-IDF: {e}")
|
|
for i in range(21):
|
|
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
|
|
feature_count += 1
|
|
else:
|
|
for i in range(21):
|
|
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
|
|
feature_count += 1
|
|
|
|
# 3. Feature Host semplificate (2 feature)
|
|
if 'Host' in df.columns:
|
|
feature_data['host_FIBRA'] = df['Host'].fillna('').str.contains('FIBRA', case=False).astype(int)
|
|
feature_data['host_nan'] = df['Host'].isna().astype(int)
|
|
else:
|
|
feature_data['host_FIBRA'] = np.zeros(len(df))
|
|
feature_data['host_nan'] = np.zeros(len(df))
|
|
feature_count += 2
|
|
|
|
# 4. Encoding IP ottimizzato (15 feature)
|
|
if 'IP_Attaccante' not in df.columns and 'Messaggio2' in df.columns:
|
|
df['IP_Attaccante'] = df['Messaggio2'].str.split(':').str[0]
|
|
|
|
try:
|
|
# Versione semplificata dell'encoding per performance
|
|
if 'IP_Attaccante' in df.columns:
|
|
ip_data = df['IP_Attaccante'].fillna('unknown').astype(str)
|
|
# Hash semplice per IP
|
|
for i in range(15):
|
|
feature_data[f'col_{i}'] = pd.util.hash_array(ip_data.values) % (2**16) / (2**16)
|
|
feature_count += 1
|
|
else:
|
|
for i in range(15):
|
|
feature_data[f'col_{i}'] = np.zeros(len(df))
|
|
feature_count += 1
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'encoding delle colonne: {e}")
|
|
for i in range(15):
|
|
feature_data[f'col_{i}'] = np.zeros(len(df))
|
|
feature_count += 1
|
|
|
|
# 5. Feature aggiuntive (36 feature)
|
|
for i in range(15):
|
|
feature_data[f'additional_col_{i}'] = np.zeros(len(df))
|
|
feature_count += 1
|
|
|
|
for i in range(21):
|
|
feature_data[f'additional_tfidf_{i}'] = np.zeros(len(df))
|
|
feature_count += 1
|
|
|
|
# 6. Riempi fino a 125
|
|
remaining = expected_features - feature_count
|
|
if remaining > 0:
|
|
for i in range(remaining):
|
|
feature_data[f'extra_col_{i}'] = np.zeros(len(df))
|
|
feature_count += 1
|
|
|
|
# Crea array numpy direttamente per efficienza
|
|
X = np.column_stack([feature_data[col] for col in sorted(feature_data.keys())])
|
|
|
|
logging.debug(f"Generate {feature_count} feature ottimizzate")
|
|
return X
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nella preparazione dei dati: {e}")
|
|
return None
|
|
|
|
def predict_anomalies_optimized(model, features, sensitivity=5):
|
|
"""Predizione anomalie ottimizzata"""
|
|
try:
|
|
if features.shape[1] != 125:
|
|
logging.error(f"Dimensione feature errata: trovate {features.shape[1]}, attese 125")
|
|
return np.zeros(features.shape[0])
|
|
|
|
update_counter('metrics_processed', features.shape[0])
|
|
|
|
if hasattr(model, 'predict'):
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore")
|
|
|
|
# Predizione ottimizzata
|
|
predictions = model.predict(features)
|
|
|
|
# Applica sensibilità se supportata
|
|
if hasattr(model, 'decision_function'):
|
|
try:
|
|
decision_scores = model.decision_function(features)
|
|
threshold_multiplier = sensitivity / 5.0
|
|
custom_threshold = -0.2 * threshold_multiplier
|
|
predictions = np.where(decision_scores < custom_threshold, -1, 1)
|
|
|
|
num_anomalies = np.sum(predictions == -1)
|
|
logging.debug(f"Trovate {num_anomalies} anomalie con sensibilità {sensitivity}")
|
|
except Exception as e:
|
|
logging.warning(f"Errore con decision_function: {e}")
|
|
|
|
return predictions
|
|
else:
|
|
logging.error("Modello non ha il metodo predict")
|
|
return np.zeros(features.shape[0])
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nella predizione: {e}")
|
|
return np.zeros(features.shape[0])
|
|
|
|
def process_batch_optimized(batch_data, engine, model, preprocessor, whitelist, sensitivity=5):
|
|
"""Processamento batch ottimizzato"""
|
|
try:
|
|
# Prepara i dati
|
|
X = prepare_data_optimized(batch_data, preprocessor)
|
|
|
|
if X is None or X.shape[0] == 0:
|
|
return set(), 0
|
|
|
|
# Predizione
|
|
predictions = predict_anomalies_optimized(model, X, sensitivity)
|
|
|
|
# Trova anomalie
|
|
anomaly_indices = np.where(predictions == -1)[0]
|
|
anomaly_count = len(anomaly_indices)
|
|
|
|
if anomaly_count == 0:
|
|
return set(), 0
|
|
|
|
# Estrai IP anomali
|
|
processed_ips = set()
|
|
|
|
# Estrai IP attaccanti se necessario
|
|
if 'IP_Attaccante' not in batch_data.columns and 'Messaggio2' in batch_data.columns:
|
|
batch_data['IP_Attaccante'] = batch_data['Messaggio2'].str.split(':').str[0]
|
|
|
|
for idx in anomaly_indices:
|
|
if idx < len(batch_data):
|
|
row = batch_data.iloc[idx]
|
|
ip = row.get('IP_Attaccante')
|
|
|
|
if pd.notna(ip) and not is_ip_whitelisted(ip, whitelist):
|
|
processed_ips.add(ip)
|
|
|
|
# Gestione anomalia semplificata
|
|
try:
|
|
handle_anomaly_optimized(engine, ip, 'MEDIO', None, 'Anomalia rilevata', 'ddos_ia')
|
|
except Exception as e:
|
|
logging.warning(f"Errore gestione anomalia per IP {ip}: {e}")
|
|
|
|
return processed_ips, anomaly_count
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'elaborazione del batch: {e}")
|
|
return set(), 0
|
|
|
|
def handle_anomaly_optimized(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'):
|
|
"""Gestione anomalia ottimizzata"""
|
|
try:
|
|
if not ip_address or pd.isna(ip_address):
|
|
return False
|
|
|
|
# Inserimento semplificato per performance
|
|
with engine.connect() as conn:
|
|
insert_query = text("""
|
|
INSERT INTO Fibra (IndirizzoIP, Data, Ora, Attivo, Lista, NumeroAttacchi, LivelloDiRischio)
|
|
VALUES (:ip, CURDATE(), CURTIME(), 1, :lista, 1, 2)
|
|
ON DUPLICATE KEY UPDATE
|
|
NumeroAttacchi = NumeroAttacchi + 1,
|
|
Data = CURDATE(),
|
|
Ora = CURTIME()
|
|
""")
|
|
|
|
conn.execute(insert_query, {"ip": ip_address, "lista": list_name})
|
|
|
|
# Aggiorna contatori
|
|
update_counter('ip_medium')
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nella gestione dell'anomalia per IP {ip_address}: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Funzione principale ottimizzata"""
|
|
parser = argparse.ArgumentParser(description='Rilevamento DDoS ottimizzato per grandi volumi')
|
|
parser.add_argument('--debug', action='store_true', help='Abilita logging di debug')
|
|
parser.add_argument('--batch-size', type=int, default=5000, help='Dimensione batch ottimizzata')
|
|
parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso whitelist')
|
|
parser.add_argument('--ciclo', action='store_true', help='Esecuzione in ciclo')
|
|
parser.add_argument('--pausa', type=int, default=60, help='Pausa tra cicli')
|
|
parser.add_argument('--parallel', action='store_true', help='Elaborazione parallela')
|
|
parser.add_argument('--workers', type=int, default=min(4, multiprocessing.cpu_count()), help='Numero worker')
|
|
parser.add_argument('--max-id', type=int, default=None, help='ID massimo')
|
|
parser.add_argument('--skip-old', action='store_true', help='Salta record vecchi')
|
|
parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11), help='Sensibilità 1-10')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
log_phase("Avvio sistema di rilevamento DDoS ottimizzato")
|
|
|
|
# Test connessione
|
|
if not test_database_connection():
|
|
log_error("Impossibile connettersi al database")
|
|
sys.exit(1)
|
|
|
|
log_result("Connessione database stabilita")
|
|
|
|
try:
|
|
engine = create_engine_with_retry(CONN_STRING)
|
|
|
|
# Carica modelli
|
|
log_phase("Caricamento modelli")
|
|
model, preprocessor = load_models()
|
|
if model is None:
|
|
log_error("Impossibile caricare il modello")
|
|
sys.exit(1)
|
|
|
|
log_result("Modelli caricati con successo")
|
|
|
|
# Carica whitelist
|
|
whitelist = load_whitelist(args.whitelist)
|
|
|
|
def esegui_analisi_ottimizzata():
|
|
start_progress_tracking("rilevamento DDoS ottimizzato")
|
|
|
|
try:
|
|
last_id = load_last_analyzed_id()
|
|
|
|
# Estrazione dati ottimizzata
|
|
log_phase("Estrazione dati ottimizzata")
|
|
new_data = extract_data_optimized(engine, last_id, args.batch_size, args.max_id)
|
|
|
|
if new_data.empty:
|
|
log_result("Nessun nuovo dato da analizzare")
|
|
end_progress_tracking()
|
|
return True
|
|
|
|
total_records = len(new_data)
|
|
last_analyzed_id = new_data['ID'].max()
|
|
|
|
log_result(f"Estratti {total_records} record")
|
|
|
|
# Elaborazione ottimizzata
|
|
if args.parallel and total_records > 1000:
|
|
log_phase(f"Elaborazione parallela con {args.workers} worker")
|
|
|
|
# Dividi in batch ottimizzati
|
|
batch_size = min(CHUNK_SIZE, max(500, total_records // args.workers))
|
|
batches = [new_data[i:i+batch_size].copy() for i in range(0, total_records, batch_size)]
|
|
|
|
all_processed_ips = set()
|
|
total_anomalies = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=args.workers) as executor:
|
|
futures = [
|
|
executor.submit(process_batch_optimized, batch, engine, model, preprocessor, whitelist, args.sensibility)
|
|
for batch in batches
|
|
]
|
|
|
|
for future in as_completed(futures):
|
|
try:
|
|
processed_ips, anomaly_count = future.result()
|
|
all_processed_ips.update(processed_ips)
|
|
total_anomalies += anomaly_count
|
|
except Exception as e:
|
|
log_error(f"Errore nell'elaborazione batch: {e}")
|
|
|
|
log_result(f"Elaborazione completata: {len(all_processed_ips)} IP anomali, {total_anomalies} anomalie totali")
|
|
|
|
else:
|
|
# Elaborazione sequenziale ottimizzata
|
|
log_phase("Elaborazione sequenziale ottimizzata")
|
|
processed_ips, anomaly_count = process_batch_optimized(new_data, engine, model, preprocessor, whitelist, args.sensibility)
|
|
log_result(f"Elaborate {len(processed_ips)} IP anomali, {anomaly_count} anomalie")
|
|
|
|
# Salva ultimo ID
|
|
save_last_analyzed_id(last_analyzed_id)
|
|
|
|
log_phase("Analisi completata")
|
|
end_progress_tracking()
|
|
|
|
# Garbage collection
|
|
del new_data
|
|
gc.collect()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore durante l'analisi: {e}")
|
|
end_progress_tracking()
|
|
return False
|
|
|
|
# Esecuzione
|
|
if args.ciclo:
|
|
log_phase("Modalità ciclo infinito")
|
|
ciclo_count = 0
|
|
|
|
def handle_interrupt(signum, frame):
|
|
log_warning("Interruzione ricevuta. Chiusura...")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, handle_interrupt)
|
|
|
|
while True:
|
|
ciclo_count += 1
|
|
log_result(f"Inizio ciclo {ciclo_count}")
|
|
|
|
success = esegui_analisi_ottimizzata()
|
|
|
|
if success:
|
|
log_result(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...")
|
|
time.sleep(args.pausa)
|
|
else:
|
|
log_error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi...")
|
|
time.sleep(args.pausa * 2)
|
|
else:
|
|
esegui_analisi_ottimizzata()
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore generale: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |