ids.alfacom.it/extracted_idf/detect_multi_optimized.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

879 lines
34 KiB
Python

#!/usr/bin/env python3
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from joblib import load
import logging
import gc
import os
import time
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import ipaddress
import numpy as np
from sklearn.ensemble import IsolationForest
import threading
import argparse
import signal
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor
from category_encoders import HashingEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
import psutil
import warnings
warnings.filterwarnings('ignore')
# Configurazione del logging ottimizzata
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('ddetect_debug.log')
]
)
# Configurazione del database
DB_USER = os.environ.get('DB_USER', 'root')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
# Percorsi dei file
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
os.makedirs(MODEL_DIR, exist_ok=True)
MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt')
LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt')
# Definizione dei livelli di rischio e soglie
RISK_LEVELS = {
'NORMALE': 0.1,
'BASSO': 0.3,
'MEDIO': 0.6,
'ALTO': 0.8,
'CRITICO': 0.95
}
# Ottimizzazioni per grandi volumi di dati
CHUNK_SIZE = 5000 # Dimensione ottimale per chunk di dati
MAX_MEMORY_USAGE = 80 # Percentuale massima di memoria utilizzabile
CACHE_SIZE = 10000 # Dimensione cache per IP whitelistati
# Cache globale per ottimizzazioni
ip_whitelist_cache = {}
model_cache = None
preprocessor_cache = None
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def log_phase(message):
"""Evidenzia una nuova fase principale dell'esecuzione"""
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n")
logging.info(f"FASE: {message}")
def log_result(message):
"""Evidenzia un risultato importante"""
print(f"{Colors.BLUE}{message}{Colors.END}")
logging.info(f"RISULTATO: {message}")
def log_warning(message):
"""Evidenzia un avviso importante"""
print(f"{Colors.YELLOW}{message}{Colors.END}")
logging.warning(message)
def log_error(message):
"""Evidenzia un errore importante"""
print(f"{Colors.RED}{message}{Colors.END}")
logging.error(message)
# Variabili globali per il tracciamento dell'avanzamento
progress_counters = {
'ip_whitelisted': 0,
'ip_analyzed': 0,
'ip_normal': 0,
'ip_low': 0,
'ip_medium': 0,
'ip_high': 0,
'ip_critical': 0,
'metrics_processed': 0,
'last_update': 0,
'in_progress': False,
'operation': '',
'start_time': None
}
def check_memory_usage():
"""Controlla l'utilizzo della memoria e forza garbage collection se necessario"""
memory_percent = psutil.virtual_memory().percent
if memory_percent > MAX_MEMORY_USAGE:
logging.warning(f"Utilizzo memoria alto: {memory_percent}%. Forzando garbage collection...")
gc.collect()
return True
return False
def reset_counters():
"""Resetta i contatori per una nuova esecuzione"""
global progress_counters
progress_counters.update({
'ip_whitelisted': 0,
'ip_analyzed': 0,
'ip_normal': 0,
'ip_low': 0,
'ip_medium': 0,
'ip_high': 0,
'ip_critical': 0,
'metrics_processed': 0,
'last_update': 0,
'in_progress': False,
'operation': '',
'start_time': None
})
def start_progress_tracking(operation):
"""Inizia il tracciamento dell'operazione"""
global progress_counters
reset_counters()
progress_counters['in_progress'] = True
progress_counters['operation'] = operation
progress_counters['start_time'] = time.time()
threading.Thread(target=progress_reporter, daemon=True).start()
logging.info(f"Avvio monitoraggio operazione: {operation}")
def update_counter(counter_name, increment=1):
"""Aggiorna un contatore specifico"""
global progress_counters
if counter_name in progress_counters:
progress_counters[counter_name] += increment
def end_progress_tracking():
"""Termina il tracciamento e mostra il report finale"""
global progress_counters
if not progress_counters['in_progress']:
return
progress_counters['in_progress'] = False
report_progress(force=True)
logging.info(f"Monitoraggio completato per: {progress_counters['operation']}")
def report_progress(force=False):
"""Riporta lo stato attuale dei contatori"""
global progress_counters
if not progress_counters['in_progress'] and not force:
return
current_time = time.time()
if not force and (current_time - progress_counters['last_update']) < 10:
return
elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0
report = f"""
{Colors.BOLD}======== REPORT DI PROGRESSO - {progress_counters['operation']} ========{Colors.END}
Tempo trascorso: {elapsed:.1f} secondi
IP Whitelistati esclusi: {progress_counters['ip_whitelisted']}
Metriche elaborate: {progress_counters['metrics_processed']}
IP Analizzati: {progress_counters['ip_analyzed']}
Classificazione rischio:
- IP NORMALI: {progress_counters['ip_normal']}
- IP BASSI: {progress_counters['ip_low']}
- IP MEDI: {progress_counters['ip_medium']}
- IP ALTI: {progress_counters['ip_high']}
- IP CRITICI: {progress_counters['ip_critical']}
Memoria utilizzata: {psutil.virtual_memory().percent:.1f}%
{Colors.BOLD}================================================================{Colors.END}
"""
print(report)
logging.info(report.replace(Colors.BOLD, '').replace(Colors.END, ''))
progress_counters['last_update'] = current_time
def progress_reporter():
"""Thread che riporta periodicamente i progressi"""
while progress_counters['in_progress']:
report_progress()
time.sleep(2)
def test_database_connection():
"""Test di connessione al database"""
try:
logging.debug("Tentativo di connessione al database...")
engine = create_engine(CONN_STRING)
with engine.connect() as conn:
result = conn.execute(text("SELECT 1")).fetchone()
if result[0] == 1:
logging.debug("Test connessione al database riuscito!")
return True
return False
except Exception as e:
logging.error(f"Errore nel test di connessione al database: {e}")
return False
def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2):
"""Crea una connessione al database con tentativi multipli e ottimizzazioni"""
for attempt in range(max_retries):
try:
# Configurazione ottimizzata per grandi volumi di dati
engine = create_engine(
conn_string,
pool_size=10, # Aumentato per parallelismo
max_overflow=20, # Aumentato per picchi di carico
pool_recycle=1800, # Ridotto per evitare timeout
pool_pre_ping=True,
pool_timeout=60, # Aumentato per operazioni lunghe
echo=False,
isolation_level="READ COMMITTED",
connect_args={
'charset': 'utf8mb4',
'use_unicode': True,
'autocommit': True,
'sql_mode': 'TRADITIONAL'
}
)
with engine.connect() as conn:
conn.execute(text("SELECT 1")).fetchone()
logging.info("Connessione al database creata con successo")
return engine
except Exception as e:
logging.error(f"Tentativo {attempt+1} fallito: {e}")
if attempt < max_retries - 1:
logging.info(f"Nuovo tentativo tra {retry_delay} secondi...")
time.sleep(retry_delay)
retry_delay *= 2
else:
logging.error("Impossibile connettersi al database dopo tutti i tentativi")
raise
def load_models():
"""Carica i modelli di rilevamento delle anomalie addestrati con cache"""
global model_cache, preprocessor_cache
# Usa cache se disponibile
if model_cache is not None and preprocessor_cache is not None:
return model_cache, preprocessor_cache
try:
# Carica il modello
logging.info(f"Caricamento modello da {MODEL_PATH}...")
if os.path.exists(MODEL_PATH):
model_cache = load(MODEL_PATH)
logging.debug("Modello caricato con successo!")
else:
logging.error(f"File modello non trovato: {MODEL_PATH}")
return None, None
# Carica il preprocessor
try:
logging.info(f"Caricamento preprocessor da {PREPROCESSOR_PATH}...")
if os.path.exists(PREPROCESSOR_PATH):
preprocessor_cache = load(PREPROCESSOR_PATH)
if isinstance(preprocessor_cache, dict) and 'feature_columns' in preprocessor_cache:
feature_count = len(preprocessor_cache['feature_columns'])
if feature_count < 125:
logging.warning(f"Il modello si aspetta 125 feature, "
f"ma il preprocessor ne ha {feature_count}")
return model_cache, preprocessor_cache
else:
logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.")
else:
logging.error(f"File preprocessor non trovato: {PREPROCESSOR_PATH}")
except Exception as e:
logging.error(f"Errore nel caricamento del preprocessor: {e}")
# Crea un preprocessor di fallback
preprocessor_cache = {'feature_columns': [f'feature_{i}' for i in range(125)]}
return model_cache, preprocessor_cache
except Exception as e:
logging.error(f"Errore nel caricamento dei modelli: {e}")
return None, None
def load_whitelist(whitelist_path=None):
"""Carica la whitelist da file con ottimizzazioni"""
if whitelist_path is None:
whitelist_path = '/root/whitelist.txt'
try:
if not os.path.exists(whitelist_path):
logging.warning(f"File whitelist non trovato: {whitelist_path}")
return {'exact_ips': set(), 'networks': []}
with open(whitelist_path, 'r') as f:
whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')]
exact_ips = set()
networks = []
for entry in whitelist_entries:
try:
if '/' in entry:
network = ipaddress.ip_network(entry, strict=False)
networks.append(network)
else:
exact_ips.add(entry)
except Exception as e:
logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}")
whitelist = {
'exact_ips': exact_ips,
'networks': networks
}
logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.")
return whitelist
except Exception as e:
logging.error(f"Errore nel caricamento della whitelist: {e}")
return {'exact_ips': set(), 'networks': []}
def is_ip_whitelisted(ip, whitelist):
"""Verifica se un IP è nella whitelist con cache ottimizzata"""
global ip_whitelist_cache
if pd.isna(ip) or not ip:
return False
# Controlla cache
if ip in ip_whitelist_cache:
if ip_whitelist_cache[ip]:
update_counter('ip_whitelisted')
return ip_whitelist_cache[ip]
try:
# Verifica diretta negli IP esatti
if ip in whitelist.get('exact_ips', set()):
ip_whitelist_cache[ip] = True
# Limita dimensione cache
if len(ip_whitelist_cache) > CACHE_SIZE:
# Rimuovi il 20% più vecchio
items_to_remove = list(ip_whitelist_cache.keys())[:CACHE_SIZE//5]
for key in items_to_remove:
del ip_whitelist_cache[key]
update_counter('ip_whitelisted')
return True
# Verifica nelle reti (limitata per performance)
try:
ip_obj = ipaddress.ip_address(ip)
except ValueError:
ip_whitelist_cache[ip] = False
return False
# Limita verifica reti per performance
for i, network in enumerate(whitelist.get('networks', [])):
if i >= 1000: # Limite per evitare blocchi
break
if ip_obj in network:
ip_whitelist_cache[ip] = True
update_counter('ip_whitelisted')
return True
ip_whitelist_cache[ip] = False
return False
except Exception as e:
logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}")
ip_whitelist_cache[ip] = False
return False
def load_last_analyzed_id():
"""Carica l'ultimo ID analizzato dal file"""
try:
if os.path.exists(LAST_ID_PATH):
with open(LAST_ID_PATH, 'r') as f:
last_id = int(f.read().strip())
return last_id
else:
logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.")
return 0
except Exception as e:
logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}")
return 0
def save_last_analyzed_id(last_id):
"""Salva l'ultimo ID analizzato nel file"""
try:
with open(LAST_ID_PATH, 'w') as f:
f.write(str(last_id))
logging.info(f"Ultimo ID analizzato salvato: {last_id}")
except Exception as e:
logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}")
def extract_data_optimized(engine, last_id=0, batch_size=1000, max_id=None):
"""Estrazione dati ottimizzata per grandi volumi"""
try:
if max_id:
logging.info(f"Limitazione estrazione fino a ID {max_id}")
# Query di conteggio ottimizzata
if max_id:
query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id} AND ID <= {max_id}"
else:
query_count = f"SELECT COUNT(*) FROM Esterna WHERE ID > {last_id}"
with engine.connect() as conn:
count_result = conn.execute(text(query_count)).scalar()
if count_result == 0:
logging.info("Nessun nuovo record da estrarre")
return pd.DataFrame()
logging.info(f"Trovati {count_result} nuovi record da estrarre")
# Ottimizzazione per grandi dataset
effective_batch_size = min(batch_size, CHUNK_SIZE)
if count_result > 100000:
logging.warning(f"Dataset molto grande ({count_result}). Limitando a ultimi 50k record.")
with engine.connect() as conn:
latest_id_query = "SELECT MAX(ID) FROM Esterna"
latest_id = conn.execute(text(latest_id_query)).scalar()
if latest_id:
max_id = latest_id
last_id = max(last_id, latest_id - 50000)
count_result = min(count_result, 50000)
# Estrazione ottimizzata con chunking
frames = []
current_id = last_id
# Colonne essenziali per ridurre memoria
essential_columns = ['ID', 'Data', 'Ora', 'Host', 'IndirizzoIP', 'Messaggio1', 'Messaggio2', 'Messaggio3']
columns_str = ', '.join(essential_columns)
num_batches = (count_result + effective_batch_size - 1) // effective_batch_size
for i in range(num_batches):
# Controllo memoria
if check_memory_usage():
logging.warning("Memoria alta, riducendo batch size")
effective_batch_size = max(100, effective_batch_size // 2)
with engine.connect() as conn:
if max_id:
query = f"""
SELECT {columns_str}
FROM Esterna
WHERE ID > {current_id} AND ID <= {max_id}
ORDER BY ID ASC
LIMIT {effective_batch_size}
"""
else:
query = f"""
SELECT {columns_str}
FROM Esterna
WHERE ID > {current_id}
ORDER BY ID ASC
LIMIT {effective_batch_size}
"""
result = conn.execute(text(query))
chunk = pd.DataFrame(result.fetchall(), columns=result.keys())
if chunk.empty:
break
current_id = chunk['ID'].max()
# Ottimizzazione timestamp
if 'Data' in chunk.columns and 'Ora' in chunk.columns:
try:
chunk['Data'] = pd.to_datetime(chunk['Data'], errors='coerce')
chunk['Ora'] = pd.to_timedelta(chunk['Ora'].astype(str), errors='coerce')
chunk['Timestamp'] = chunk['Data'] + chunk['Ora']
except Exception as e:
logging.warning(f"Impossibile creare colonna Timestamp: {e}")
frames.append(chunk)
logging.info(f"Estratti {len(chunk)} record, batch {i+1}/{num_batches}")
if not frames:
return pd.DataFrame()
result = pd.concat(frames, ignore_index=True)
logging.info(f"Estrazione completata: {len(result)} record totali")
return result
except Exception as e:
logging.error(f"Errore nell'estrazione dei dati: {e}")
return pd.DataFrame()
def prepare_data_optimized(df, preprocessor):
"""Preparazione dati ottimizzata per grandi volumi"""
try:
if df.empty:
return None
# Crea copia ottimizzata
df = df.copy()
# Numero atteso di feature
expected_features = 125
feature_data = {}
feature_count = 0
# 1. Feature temporali essenziali (9 feature) - versione semplificata
time_features = [
'time_since_last', 'events_last_hour', 'events_last_day',
'time_since_last_mean', 'time_since_last_std', 'time_since_last_min',
'time_since_last_max', 'events_last_hour_max', 'events_last_day_max'
]
for feat in time_features:
feature_data[feat] = np.zeros(len(df))
feature_count += 1
# 2. Feature TF-IDF semplificate (21 feature)
if 'Messaggio1' in df.columns:
try:
# Versione semplificata per performance
proto_data = df['Messaggio1'].fillna('').astype(str)
# Usa solo i protocolli più comuni per velocità
common_protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'HTTPS', 'SSH', 'FTP']
for i, protocol in enumerate(common_protocols[:21]):
feature_data[f'protocol_tfidf_{i}'] = proto_data.str.contains(protocol, case=False).astype(int)
feature_count += 1
# Riempi rimanenti
for i in range(len(common_protocols), 21):
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
feature_count += 1
except Exception as e:
logging.error(f"Errore nell'estrazione TF-IDF: {e}")
for i in range(21):
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
feature_count += 1
else:
for i in range(21):
feature_data[f'protocol_tfidf_{i}'] = np.zeros(len(df))
feature_count += 1
# 3. Feature Host semplificate (2 feature)
if 'Host' in df.columns:
feature_data['host_FIBRA'] = df['Host'].fillna('').str.contains('FIBRA', case=False).astype(int)
feature_data['host_nan'] = df['Host'].isna().astype(int)
else:
feature_data['host_FIBRA'] = np.zeros(len(df))
feature_data['host_nan'] = np.zeros(len(df))
feature_count += 2
# 4. Encoding IP ottimizzato (15 feature)
if 'IP_Attaccante' not in df.columns and 'Messaggio2' in df.columns:
df['IP_Attaccante'] = df['Messaggio2'].str.split(':').str[0]
try:
# Versione semplificata dell'encoding per performance
if 'IP_Attaccante' in df.columns:
ip_data = df['IP_Attaccante'].fillna('unknown').astype(str)
# Hash semplice per IP
for i in range(15):
feature_data[f'col_{i}'] = pd.util.hash_array(ip_data.values) % (2**16) / (2**16)
feature_count += 1
else:
for i in range(15):
feature_data[f'col_{i}'] = np.zeros(len(df))
feature_count += 1
except Exception as e:
logging.error(f"Errore nell'encoding delle colonne: {e}")
for i in range(15):
feature_data[f'col_{i}'] = np.zeros(len(df))
feature_count += 1
# 5. Feature aggiuntive (36 feature)
for i in range(15):
feature_data[f'additional_col_{i}'] = np.zeros(len(df))
feature_count += 1
for i in range(21):
feature_data[f'additional_tfidf_{i}'] = np.zeros(len(df))
feature_count += 1
# 6. Riempi fino a 125
remaining = expected_features - feature_count
if remaining > 0:
for i in range(remaining):
feature_data[f'extra_col_{i}'] = np.zeros(len(df))
feature_count += 1
# Crea array numpy direttamente per efficienza
X = np.column_stack([feature_data[col] for col in sorted(feature_data.keys())])
logging.debug(f"Generate {feature_count} feature ottimizzate")
return X
except Exception as e:
logging.error(f"Errore nella preparazione dei dati: {e}")
return None
def predict_anomalies_optimized(model, features, sensitivity=5):
"""Predizione anomalie ottimizzata"""
try:
if features.shape[1] != 125:
logging.error(f"Dimensione feature errata: trovate {features.shape[1]}, attese 125")
return np.zeros(features.shape[0])
update_counter('metrics_processed', features.shape[0])
if hasattr(model, 'predict'):
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
# Predizione ottimizzata
predictions = model.predict(features)
# Applica sensibilità se supportata
if hasattr(model, 'decision_function'):
try:
decision_scores = model.decision_function(features)
threshold_multiplier = sensitivity / 5.0
custom_threshold = -0.2 * threshold_multiplier
predictions = np.where(decision_scores < custom_threshold, -1, 1)
num_anomalies = np.sum(predictions == -1)
logging.debug(f"Trovate {num_anomalies} anomalie con sensibilità {sensitivity}")
except Exception as e:
logging.warning(f"Errore con decision_function: {e}")
return predictions
else:
logging.error("Modello non ha il metodo predict")
return np.zeros(features.shape[0])
except Exception as e:
logging.error(f"Errore nella predizione: {e}")
return np.zeros(features.shape[0])
def process_batch_optimized(batch_data, engine, model, preprocessor, whitelist, sensitivity=5):
"""Processamento batch ottimizzato"""
try:
# Prepara i dati
X = prepare_data_optimized(batch_data, preprocessor)
if X is None or X.shape[0] == 0:
return set(), 0
# Predizione
predictions = predict_anomalies_optimized(model, X, sensitivity)
# Trova anomalie
anomaly_indices = np.where(predictions == -1)[0]
anomaly_count = len(anomaly_indices)
if anomaly_count == 0:
return set(), 0
# Estrai IP anomali
processed_ips = set()
# Estrai IP attaccanti se necessario
if 'IP_Attaccante' not in batch_data.columns and 'Messaggio2' in batch_data.columns:
batch_data['IP_Attaccante'] = batch_data['Messaggio2'].str.split(':').str[0]
for idx in anomaly_indices:
if idx < len(batch_data):
row = batch_data.iloc[idx]
ip = row.get('IP_Attaccante')
if pd.notna(ip) and not is_ip_whitelisted(ip, whitelist):
processed_ips.add(ip)
# Gestione anomalia semplificata
try:
handle_anomaly_optimized(engine, ip, 'MEDIO', None, 'Anomalia rilevata', 'ddos_ia')
except Exception as e:
logging.warning(f"Errore gestione anomalia per IP {ip}: {e}")
return processed_ips, anomaly_count
except Exception as e:
logging.error(f"Errore nell'elaborazione del batch: {e}")
return set(), 0
def handle_anomaly_optimized(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'):
"""Gestione anomalia ottimizzata"""
try:
if not ip_address or pd.isna(ip_address):
return False
# Inserimento semplificato per performance
with engine.connect() as conn:
insert_query = text("""
INSERT INTO Fibra (IndirizzoIP, Data, Ora, Attivo, Lista, NumeroAttacchi, LivelloDiRischio)
VALUES (:ip, CURDATE(), CURTIME(), 1, :lista, 1, 2)
ON DUPLICATE KEY UPDATE
NumeroAttacchi = NumeroAttacchi + 1,
Data = CURDATE(),
Ora = CURTIME()
""")
conn.execute(insert_query, {"ip": ip_address, "lista": list_name})
# Aggiorna contatori
update_counter('ip_medium')
return True
except Exception as e:
logging.error(f"Errore nella gestione dell'anomalia per IP {ip_address}: {e}")
return False
def main():
"""Funzione principale ottimizzata"""
parser = argparse.ArgumentParser(description='Rilevamento DDoS ottimizzato per grandi volumi')
parser.add_argument('--debug', action='store_true', help='Abilita logging di debug')
parser.add_argument('--batch-size', type=int, default=5000, help='Dimensione batch ottimizzata')
parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso whitelist')
parser.add_argument('--ciclo', action='store_true', help='Esecuzione in ciclo')
parser.add_argument('--pausa', type=int, default=60, help='Pausa tra cicli')
parser.add_argument('--parallel', action='store_true', help='Elaborazione parallela')
parser.add_argument('--workers', type=int, default=min(4, multiprocessing.cpu_count()), help='Numero worker')
parser.add_argument('--max-id', type=int, default=None, help='ID massimo')
parser.add_argument('--skip-old', action='store_true', help='Salta record vecchi')
parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11), help='Sensibilità 1-10')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
log_phase("Avvio sistema di rilevamento DDoS ottimizzato")
# Test connessione
if not test_database_connection():
log_error("Impossibile connettersi al database")
sys.exit(1)
log_result("Connessione database stabilita")
try:
engine = create_engine_with_retry(CONN_STRING)
# Carica modelli
log_phase("Caricamento modelli")
model, preprocessor = load_models()
if model is None:
log_error("Impossibile caricare il modello")
sys.exit(1)
log_result("Modelli caricati con successo")
# Carica whitelist
whitelist = load_whitelist(args.whitelist)
def esegui_analisi_ottimizzata():
start_progress_tracking("rilevamento DDoS ottimizzato")
try:
last_id = load_last_analyzed_id()
# Estrazione dati ottimizzata
log_phase("Estrazione dati ottimizzata")
new_data = extract_data_optimized(engine, last_id, args.batch_size, args.max_id)
if new_data.empty:
log_result("Nessun nuovo dato da analizzare")
end_progress_tracking()
return True
total_records = len(new_data)
last_analyzed_id = new_data['ID'].max()
log_result(f"Estratti {total_records} record")
# Elaborazione ottimizzata
if args.parallel and total_records > 1000:
log_phase(f"Elaborazione parallela con {args.workers} worker")
# Dividi in batch ottimizzati
batch_size = min(CHUNK_SIZE, max(500, total_records // args.workers))
batches = [new_data[i:i+batch_size].copy() for i in range(0, total_records, batch_size)]
all_processed_ips = set()
total_anomalies = 0
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = [
executor.submit(process_batch_optimized, batch, engine, model, preprocessor, whitelist, args.sensibility)
for batch in batches
]
for future in as_completed(futures):
try:
processed_ips, anomaly_count = future.result()
all_processed_ips.update(processed_ips)
total_anomalies += anomaly_count
except Exception as e:
log_error(f"Errore nell'elaborazione batch: {e}")
log_result(f"Elaborazione completata: {len(all_processed_ips)} IP anomali, {total_anomalies} anomalie totali")
else:
# Elaborazione sequenziale ottimizzata
log_phase("Elaborazione sequenziale ottimizzata")
processed_ips, anomaly_count = process_batch_optimized(new_data, engine, model, preprocessor, whitelist, args.sensibility)
log_result(f"Elaborate {len(processed_ips)} IP anomali, {anomaly_count} anomalie")
# Salva ultimo ID
save_last_analyzed_id(last_analyzed_id)
log_phase("Analisi completata")
end_progress_tracking()
# Garbage collection
del new_data
gc.collect()
return True
except Exception as e:
log_error(f"Errore durante l'analisi: {e}")
end_progress_tracking()
return False
# Esecuzione
if args.ciclo:
log_phase("Modalità ciclo infinito")
ciclo_count = 0
def handle_interrupt(signum, frame):
log_warning("Interruzione ricevuta. Chiusura...")
sys.exit(0)
signal.signal(signal.SIGINT, handle_interrupt)
while True:
ciclo_count += 1
log_result(f"Inizio ciclo {ciclo_count}")
success = esegui_analisi_ottimizzata()
if success:
log_result(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...")
time.sleep(args.pausa)
else:
log_error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi...")
time.sleep(args.pausa * 2)
else:
esegui_analisi_ottimizzata()
except Exception as e:
log_error(f"Errore generale: {e}")
sys.exit(1)
if __name__ == "__main__":
main()