Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
813 lines
30 KiB
Python
813 lines
30 KiB
Python
#!/usr/bin/env python3
|
||
import pandas as pd
|
||
from sqlalchemy import create_engine
|
||
from sqlalchemy.sql import text
|
||
from joblib import load
|
||
import logging
|
||
import gc
|
||
import os
|
||
import time
|
||
import sys
|
||
from collections import defaultdict
|
||
from datetime import datetime, timedelta
|
||
import numpy as np
|
||
import threading
|
||
import argparse
|
||
import signal
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
|
||
# Configurazione del logging semplificata
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler(sys.stdout),
|
||
logging.FileHandler('detect_debug.log')
|
||
]
|
||
)
|
||
|
||
# Configurazione del database
|
||
try:
|
||
from config_database import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD
|
||
print(f"✅ Config caricata: {DB_HOST}:{DB_PORT}/{DB_NAME}")
|
||
except ImportError:
|
||
# Fallback se config_database.py non esiste
|
||
DB_USER = os.environ.get('DB_USER', 'root')
|
||
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
|
||
DB_HOST = os.environ.get('DB_HOST', 'localhost')
|
||
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
|
||
DB_PORT = '3306'
|
||
|
||
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
|
||
|
||
# Percorsi dei file
|
||
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
|
||
MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
|
||
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
|
||
WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt')
|
||
LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt')
|
||
|
||
# Parametri semplificati
|
||
RISK_LEVELS = {
|
||
'NORMALE': 0.1,
|
||
'BASSO': 0.3,
|
||
'MEDIO': 0.6,
|
||
'ALTO': 0.8,
|
||
'CRITICO': 0.95
|
||
}
|
||
|
||
# Colori per output
|
||
class Colors:
|
||
BLUE = '\033[94m'
|
||
GREEN = '\033[92m'
|
||
YELLOW = '\033[93m'
|
||
RED = '\033[91m'
|
||
BOLD = '\033[1m'
|
||
CYAN = '\033[96m'
|
||
MAGENTA = '\033[95m'
|
||
WHITE = '\033[97m'
|
||
END = '\033[0m'
|
||
|
||
def log_phase(message):
|
||
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ FASE: {message}{Colors.END}\n")
|
||
logging.info(f"FASE: {message}")
|
||
|
||
def log_result(message):
|
||
print(f"{Colors.BLUE}✓ {message}{Colors.END}")
|
||
logging.info(f"RISULTATO: {message}")
|
||
|
||
def log_warning(message):
|
||
print(f"{Colors.YELLOW}⚠ {message}{Colors.END}")
|
||
logging.warning(message)
|
||
|
||
def log_error(message):
|
||
print(f"{Colors.RED}✗ {message}{Colors.END}")
|
||
logging.error(message)
|
||
|
||
def log_info(message):
|
||
print(f"{Colors.CYAN}i {message}{Colors.END}")
|
||
logging.info(message)
|
||
|
||
def log_anomaly(message):
|
||
print(f"{Colors.BOLD}{Colors.RED}! {message}{Colors.END}")
|
||
logging.warning(message)
|
||
|
||
def log_success(message):
|
||
print(f"{Colors.BOLD}{Colors.GREEN}* {message}{Colors.END}")
|
||
logging.info(message)
|
||
|
||
# Variabili globali per statistiche in tempo reale
|
||
live_stats = {
|
||
'records_processed': 0,
|
||
'anomalies_found': 0,
|
||
'ips_analyzed': 0,
|
||
'ips_blocked': 0,
|
||
'start_time': None,
|
||
'last_update': 0,
|
||
'current_batch': 0,
|
||
'total_batches': 0,
|
||
'processing_rate': 0,
|
||
'anomaly_rate': 0
|
||
}
|
||
|
||
def reset_stats():
|
||
"""Reset delle statistiche"""
|
||
global live_stats
|
||
live_stats['records_processed'] = 0
|
||
live_stats['anomalies_found'] = 0
|
||
live_stats['ips_analyzed'] = 0
|
||
live_stats['ips_blocked'] = 0
|
||
live_stats['start_time'] = time.time()
|
||
live_stats['last_update'] = 0
|
||
live_stats['current_batch'] = 0
|
||
live_stats['total_batches'] = 0
|
||
live_stats['processing_rate'] = 0
|
||
live_stats['anomaly_rate'] = 0
|
||
|
||
def update_stats(records=0, anomalies=0, ips=0, blocked=0):
|
||
"""Aggiorna le statistiche"""
|
||
global live_stats
|
||
live_stats['records_processed'] += records
|
||
live_stats['anomalies_found'] += anomalies
|
||
live_stats['ips_analyzed'] += ips
|
||
live_stats['ips_blocked'] += blocked
|
||
|
||
# Calcola rate
|
||
elapsed = time.time() - live_stats['start_time'] if live_stats['start_time'] else 1
|
||
live_stats['processing_rate'] = live_stats['records_processed'] / elapsed
|
||
live_stats['anomaly_rate'] = (live_stats['anomalies_found'] / max(1, live_stats['records_processed'])) * 100
|
||
|
||
def show_live_stats(force=False):
|
||
"""Mostra statistiche in tempo reale"""
|
||
global live_stats
|
||
current_time = time.time()
|
||
|
||
# Aggiorna ogni 3 secondi o se forzato
|
||
if not force and (current_time - live_stats['last_update']) < 3:
|
||
return
|
||
|
||
elapsed = current_time - live_stats['start_time'] if live_stats['start_time'] else 0
|
||
|
||
# Calcola ETA se abbiamo info sui batch
|
||
eta_str = "N/A"
|
||
if live_stats['total_batches'] > 0 and live_stats['current_batch'] > 0:
|
||
progress = live_stats['current_batch'] / live_stats['total_batches']
|
||
if progress > 0:
|
||
remaining_time = (elapsed / progress) - elapsed
|
||
if remaining_time > 0:
|
||
eta_str = f"{remaining_time:.0f}s"
|
||
|
||
# Header colorato
|
||
print(f"\n{Colors.BOLD}{Colors.WHITE}{'='*70}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.CYAN}📊 STATISTICHE RILEVAMENTO DDoS - TEMPO REALE{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.WHITE}{'='*70}{Colors.END}")
|
||
|
||
# Statistiche principali
|
||
print(f"{Colors.BOLD}⏱️ Tempo trascorso:{Colors.END} {Colors.GREEN}{elapsed:.1f}s{Colors.END}")
|
||
print(f"{Colors.BOLD}📈 Record processati:{Colors.END} {Colors.BLUE}{live_stats['records_processed']:,}{Colors.END}")
|
||
print(f"{Colors.BOLD}🚨 Anomalie trovate:{Colors.END} {Colors.RED}{live_stats['anomalies_found']:,}{Colors.END}")
|
||
print(f"{Colors.BOLD}🔍 IP analizzati:{Colors.END} {Colors.CYAN}{live_stats['ips_analyzed']:,}{Colors.END}")
|
||
print(f"{Colors.BOLD}🛡️ IP bloccati:{Colors.END} {Colors.YELLOW}{live_stats['ips_blocked']:,}{Colors.END}")
|
||
|
||
# Metriche di performance
|
||
print(f"{Colors.BOLD}⚡ Velocità:{Colors.END} {Colors.MAGENTA}{live_stats['processing_rate']:.1f} record/sec{Colors.END}")
|
||
print(f"{Colors.BOLD}📊 Tasso anomalie:{Colors.END} {Colors.RED}{live_stats['anomaly_rate']:.2f}%{Colors.END}")
|
||
|
||
# Progress batch se disponibile
|
||
if live_stats['total_batches'] > 0:
|
||
batch_progress = (live_stats['current_batch'] / live_stats['total_batches']) * 100
|
||
print(f"{Colors.BOLD}📦 Batch:{Colors.END} {Colors.GREEN}{live_stats['current_batch']}/{live_stats['total_batches']}{Colors.END} ({batch_progress:.1f}%) - ETA: {eta_str}")
|
||
|
||
print(f"{Colors.BOLD}{Colors.WHITE}{'='*70}{Colors.END}\n")
|
||
|
||
live_stats['last_update'] = current_time
|
||
|
||
def show_spinner(message, duration=1):
|
||
"""Mostra uno spinner animato"""
|
||
spinner_chars = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
|
||
end_time = time.time() + duration
|
||
i = 0
|
||
|
||
while time.time() < end_time:
|
||
print(f"\r{Colors.CYAN}{spinner_chars[i % len(spinner_chars)]} {message}{Colors.END}", end='')
|
||
sys.stdout.flush()
|
||
time.sleep(0.1)
|
||
i += 1
|
||
|
||
print(f"\r{Colors.GREEN}✓ {message}{Colors.END}")
|
||
|
||
def create_engine_simple():
|
||
"""Crea connessione database con feedback"""
|
||
try:
|
||
log_info("Configurazione connessione database...")
|
||
log_info(f"Host: {DB_HOST}, Database: {DB_NAME}, User: {DB_USER}")
|
||
|
||
show_spinner("Creazione engine database...", 1)
|
||
|
||
engine = create_engine(
|
||
CONN_STRING,
|
||
pool_size=3,
|
||
max_overflow=5,
|
||
pool_recycle=1800,
|
||
pool_pre_ping=True,
|
||
pool_timeout=30,
|
||
echo=False
|
||
)
|
||
|
||
show_spinner("Test connessione...", 1)
|
||
|
||
# Test connessione
|
||
with engine.connect() as conn:
|
||
conn.execute(text("SELECT 1")).fetchone()
|
||
|
||
log_result("Connessione database stabilita")
|
||
return engine
|
||
except Exception as e:
|
||
log_error(f"Errore connessione database: {e}")
|
||
return None
|
||
|
||
def load_models_simple():
|
||
"""Carica i modelli con feedback dettagliato"""
|
||
try:
|
||
log_phase("Caricamento modelli di machine learning")
|
||
|
||
# Verifica esistenza file
|
||
log_info(f"Verifica file modello: {MODEL_PATH}")
|
||
if not os.path.exists(MODEL_PATH):
|
||
log_error(f"Modello non trovato: {MODEL_PATH}")
|
||
return None, None
|
||
|
||
file_size = os.path.getsize(MODEL_PATH) / 1024
|
||
log_info(f"Dimensione file modello: {file_size:.1f} KB")
|
||
|
||
show_spinner("Caricamento Isolation Forest...", 2)
|
||
model = load(MODEL_PATH)
|
||
log_result("✓ Isolation Forest caricato")
|
||
|
||
# Carica preprocessor se disponibile
|
||
preprocessor = None
|
||
if os.path.exists(PREPROCESSOR_PATH):
|
||
show_spinner("Caricamento preprocessor...", 1)
|
||
preprocessor = load(PREPROCESSOR_PATH)
|
||
n_features = len(preprocessor.get('feature_columns', []))
|
||
log_result(f"✓ Preprocessor caricato ({n_features} feature)")
|
||
else:
|
||
log_warning("Preprocessor non trovato, usando fallback")
|
||
preprocessor = {'feature_columns': [f'feature_{i}' for i in range(50)]}
|
||
|
||
log_success("Tutti i modelli caricati con successo")
|
||
return model, preprocessor
|
||
|
||
except Exception as e:
|
||
log_error(f"Errore caricamento modelli: {e}")
|
||
return None, None
|
||
|
||
def load_whitelist_simple():
|
||
"""Carica whitelist con feedback"""
|
||
try:
|
||
log_info("Caricamento whitelist IP...")
|
||
|
||
if not os.path.exists(WHITELIST_PATH):
|
||
log_warning(f"File whitelist non trovato: {WHITELIST_PATH}")
|
||
log_info("Creazione whitelist vuota")
|
||
return set()
|
||
|
||
show_spinner("Lettura file whitelist...", 1)
|
||
|
||
with open(WHITELIST_PATH, 'r') as f:
|
||
lines = f.readlines()
|
||
|
||
whitelist = set()
|
||
for line in lines:
|
||
line = line.strip()
|
||
if line and not line.startswith('#'):
|
||
whitelist.add(line)
|
||
|
||
log_result(f"Whitelist caricata: {len(whitelist)} IP")
|
||
|
||
# Mostra alcuni esempi se disponibili
|
||
if whitelist:
|
||
examples = list(whitelist)[:3]
|
||
log_info(f"Esempi IP whitelistati: {', '.join(examples)}")
|
||
|
||
return whitelist
|
||
|
||
except Exception as e:
|
||
log_warning(f"Errore caricamento whitelist: {e}")
|
||
return set()
|
||
|
||
def load_last_analyzed_id():
|
||
"""Carica ultimo ID analizzato con feedback"""
|
||
try:
|
||
if os.path.exists(LAST_ID_PATH):
|
||
with open(LAST_ID_PATH, 'r') as f:
|
||
last_id = int(f.read().strip())
|
||
log_info(f"Ultimo ID analizzato: {last_id:,}")
|
||
return last_id
|
||
else:
|
||
log_info("Nessun ID precedente trovato, partendo da 0")
|
||
return 0
|
||
except Exception as e:
|
||
log_warning(f"Errore caricamento ultimo ID: {e}")
|
||
return 0
|
||
|
||
def save_last_analyzed_id(last_id):
|
||
"""Salva ultimo ID analizzato"""
|
||
try:
|
||
with open(LAST_ID_PATH, 'w') as f:
|
||
f.write(str(last_id))
|
||
log_info(f"Ultimo ID salvato: {last_id:,}")
|
||
except Exception as e:
|
||
log_warning(f"Errore salvataggio ultimo ID: {e}")
|
||
|
||
def extract_data_simple(engine, last_id=0, batch_size=10000):
|
||
"""Estrazione dati con feedback dettagliato"""
|
||
try:
|
||
log_phase(f"Estrazione dati da ID {last_id:,}")
|
||
|
||
log_info(f"Parametri: batch_size={batch_size:,}, last_id={last_id:,}")
|
||
|
||
show_spinner("Preparazione query di estrazione...", 1)
|
||
|
||
# Query semplice
|
||
query = text("""
|
||
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3
|
||
FROM Esterna
|
||
WHERE ID > :last_id
|
||
ORDER BY ID ASC
|
||
LIMIT :batch_size
|
||
""")
|
||
|
||
log_info("Esecuzione query sul database...")
|
||
start_time = time.time()
|
||
|
||
df = pd.read_sql(query, engine, params={
|
||
"last_id": last_id,
|
||
"batch_size": batch_size
|
||
})
|
||
|
||
elapsed = time.time() - start_time
|
||
|
||
if df.empty:
|
||
log_warning("Nessun nuovo record trovato")
|
||
else:
|
||
log_result(f"Estratti {len(df):,} record in {elapsed:.1f} secondi")
|
||
log_info(f"Range ID: {df['ID'].min():,} - {df['ID'].max():,}")
|
||
log_info(f"Colonne disponibili: {list(df.columns)}")
|
||
|
||
# Analisi rapida dei dati
|
||
if 'Messaggio2' in df.columns:
|
||
unique_ips = df['Messaggio2'].str.split(':').str[0].nunique()
|
||
log_info(f"IP unici nel batch: {unique_ips:,}")
|
||
|
||
return df
|
||
|
||
except Exception as e:
|
||
log_error(f"Errore estrazione dati: {e}")
|
||
return pd.DataFrame()
|
||
|
||
def prepare_data_simple(df):
|
||
"""Preparazione dati compatibile con analisys_02.py"""
|
||
try:
|
||
if df.empty:
|
||
return None
|
||
|
||
log_info(f"Preparazione feature per {len(df):,} record...")
|
||
|
||
# Stessa logica di analisys_02.py per compatibilità
|
||
feature_data = {}
|
||
n_samples = len(df)
|
||
|
||
show_spinner("Estrazione feature temporali...", 1)
|
||
|
||
# 1. Feature temporali (10 feature)
|
||
if 'Data' in df.columns and 'Ora' in df.columns:
|
||
try:
|
||
df['Data'] = pd.to_datetime(df['Data'], errors='coerce')
|
||
df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce')
|
||
df['Timestamp'] = df['Data'] + df['Ora']
|
||
feature_data['hour'] = df['Timestamp'].dt.hour.fillna(0).values
|
||
feature_data['day'] = df['Timestamp'].dt.dayofweek.fillna(0).values
|
||
feature_data['minute'] = df['Timestamp'].dt.minute.fillna(0).values
|
||
log_info("✓ Feature temporali estratte")
|
||
except:
|
||
feature_data['hour'] = np.zeros(n_samples)
|
||
feature_data['day'] = np.zeros(n_samples)
|
||
feature_data['minute'] = np.zeros(n_samples)
|
||
log_warning("⚠ Fallback feature temporali")
|
||
else:
|
||
feature_data['hour'] = np.zeros(n_samples)
|
||
feature_data['day'] = np.zeros(n_samples)
|
||
feature_data['minute'] = np.zeros(n_samples)
|
||
|
||
# 7 feature temporali aggiuntive
|
||
for i in range(7):
|
||
feature_data[f'time_{i}'] = np.random.random(n_samples) * 0.1
|
||
|
||
show_spinner("Analisi protocolli di rete...", 1)
|
||
|
||
# 2. Feature protocollo (15 feature)
|
||
if 'Messaggio1' in df.columns:
|
||
proto_data = df['Messaggio1'].fillna('').astype(str)
|
||
protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'SSH', 'FTP', 'DNS']
|
||
|
||
protocol_counts = {}
|
||
for i, protocol in enumerate(protocols):
|
||
matches = proto_data.str.contains(protocol, case=False).astype(int)
|
||
feature_data[f'proto_{i}'] = matches.values
|
||
protocol_counts[protocol] = matches.sum()
|
||
|
||
if any(protocol_counts.values()):
|
||
log_info(f"✓ Protocolli rilevati: {protocol_counts}")
|
||
|
||
for i in range(len(protocols), 15):
|
||
feature_data[f'proto_{i}'] = np.zeros(n_samples)
|
||
else:
|
||
for i in range(15):
|
||
feature_data[f'proto_{i}'] = np.zeros(n_samples)
|
||
|
||
show_spinner("Elaborazione host e IP...", 1)
|
||
|
||
# 3. Feature Host (5 feature)
|
||
if 'Host' in df.columns:
|
||
host_data = df['Host'].fillna('').astype(str)
|
||
feature_data['host_fibra'] = host_data.str.contains('FIBRA', case=False).astype(int).values
|
||
feature_data['host_empty'] = df['Host'].isna().astype(int).values
|
||
feature_data['host_len'] = host_data.str.len().values / 100.0
|
||
else:
|
||
feature_data['host_fibra'] = np.zeros(n_samples)
|
||
feature_data['host_empty'] = np.zeros(n_samples)
|
||
feature_data['host_len'] = np.zeros(n_samples)
|
||
|
||
for i in range(3, 5):
|
||
feature_data[f'host_{i}'] = np.zeros(n_samples)
|
||
|
||
# 4. Feature IP (10 feature)
|
||
if 'Messaggio2' in df.columns:
|
||
ip_data = df['Messaggio2'].str.split(':').str[0].fillna('unknown').astype(str)
|
||
for i in range(10):
|
||
feature_data[f'ip_{i}'] = (pd.util.hash_array(ip_data.values) % (2**(i+3))) / (2**(i+3))
|
||
else:
|
||
for i in range(10):
|
||
feature_data[f'ip_{i}'] = np.zeros(n_samples)
|
||
|
||
# 5. Feature ID (10 feature)
|
||
if 'ID' in df.columns:
|
||
id_values = df['ID'].fillna(0).values
|
||
id_normalized = (id_values - id_values.min()) / (id_values.max() - id_values.min() + 1)
|
||
|
||
for i in range(10):
|
||
feature_data[f'id_{i}'] = np.roll(id_normalized, i) * (0.9 ** i)
|
||
else:
|
||
for i in range(10):
|
||
feature_data[f'id_{i}'] = np.zeros(n_samples)
|
||
|
||
# Assicura 50 feature totali
|
||
total_features = len(feature_data)
|
||
if total_features < 50:
|
||
for i in range(total_features, 50):
|
||
feature_data[f'extra_{i}'] = np.zeros(n_samples)
|
||
elif total_features > 50:
|
||
keys_to_remove = list(feature_data.keys())[50:]
|
||
for key in keys_to_remove:
|
||
del feature_data[key]
|
||
|
||
# Crea array numpy
|
||
feature_names = sorted(feature_data.keys())
|
||
X = np.column_stack([feature_data[name] for name in feature_names])
|
||
|
||
log_result(f"Matrice feature preparata: {X.shape[0]:,} × {X.shape[1]}")
|
||
return X
|
||
|
||
except Exception as e:
|
||
log_error(f"Errore preparazione dati: {e}")
|
||
return None
|
||
|
||
def predict_anomalies_simple(model, features, sensitivity=5):
|
||
"""Predizione anomalie con feedback"""
|
||
try:
|
||
if features is None or features.shape[0] == 0:
|
||
return np.array([])
|
||
|
||
log_info(f"Predizione su {features.shape[0]:,} campioni (sensibilità: {sensitivity}/10)")
|
||
|
||
show_spinner("Esecuzione predizione ML...", 1)
|
||
|
||
# Predizione base
|
||
predictions = model.predict(features)
|
||
|
||
# Applica sensibilità se supportata
|
||
if hasattr(model, 'decision_function'):
|
||
try:
|
||
scores = model.decision_function(features)
|
||
threshold = -0.2 * (sensitivity / 5.0)
|
||
predictions = np.where(scores < threshold, -1, 1)
|
||
log_info(f"✓ Applicata sensibilità personalizzata (threshold: {threshold:.3f})")
|
||
except:
|
||
log_warning("⚠ Fallback a predizione standard")
|
||
|
||
anomaly_count = np.sum(predictions == -1)
|
||
normal_count = np.sum(predictions == 1)
|
||
|
||
log_result(f"Predizione completata: {anomaly_count:,} anomalie, {normal_count:,} normali")
|
||
|
||
return predictions
|
||
|
||
except Exception as e:
|
||
log_error(f"Errore predizione: {e}")
|
||
return np.array([])
|
||
|
||
def handle_anomaly_simple(engine, ip_address, risk_level='ALTO'):
|
||
"""Gestione anomalia con feedback - Usa tabella ip_list"""
|
||
try:
|
||
if not ip_address or pd.isna(ip_address):
|
||
return False
|
||
|
||
log_anomaly(f"Gestione anomalia per IP: {ip_address}")
|
||
|
||
# Inserimento nella tabella ip_list (struttura corretta)
|
||
with engine.connect() as conn:
|
||
insert_query = text("""
|
||
INSERT INTO ip_list (list_name, ip_address, risk_level)
|
||
VALUES ('ddos_detect_v03', :ip, :risk_level)
|
||
ON DUPLICATE KEY UPDATE
|
||
retrieved_at = CURRENT_TIMESTAMP,
|
||
risk_level = :risk_level
|
||
""")
|
||
|
||
conn.execute(insert_query, {
|
||
"ip": ip_address,
|
||
"risk_level": risk_level
|
||
})
|
||
conn.commit()
|
||
|
||
log_success(f"IP {ip_address} inserito/aggiornato nella tabella ip_list")
|
||
return True
|
||
|
||
except Exception as e:
|
||
log_warning(f"Errore gestione anomalia per {ip_address}: {e}")
|
||
return False
|
||
|
||
def process_batch_simple(df, engine, model, whitelist, sensitivity=5):
|
||
"""Processamento batch con feedback dettagliato"""
|
||
try:
|
||
if df.empty:
|
||
return 0, 0
|
||
|
||
log_info(f"Processamento batch di {len(df):,} record...")
|
||
|
||
# Prepara dati
|
||
X = prepare_data_simple(df)
|
||
if X is None:
|
||
log_warning("Preparazione dati fallita")
|
||
return 0, 0
|
||
|
||
# Predizione
|
||
predictions = predict_anomalies_simple(model, X, sensitivity)
|
||
if len(predictions) == 0:
|
||
log_warning("Nessuna predizione ottenuta")
|
||
return 0, 0
|
||
|
||
# Trova anomalie
|
||
anomaly_indices = np.where(predictions == -1)[0]
|
||
anomaly_count = len(anomaly_indices)
|
||
|
||
log_info(f"Anomalie rilevate nel batch: {anomaly_count:,}")
|
||
|
||
if anomaly_count == 0:
|
||
return len(df), 0
|
||
|
||
# Estrai IP dalle anomalie
|
||
processed_ips = 0
|
||
blocked_ips = []
|
||
|
||
for idx in anomaly_indices:
|
||
if 'Messaggio2' in df.columns:
|
||
msg2 = df.iloc[idx]['Messaggio2']
|
||
if pd.notna(msg2) and ':' in str(msg2):
|
||
ip = str(msg2).split(':')[0]
|
||
|
||
# Controlla whitelist
|
||
if ip not in whitelist:
|
||
if handle_anomaly_simple(engine, ip, 'ALTO'):
|
||
processed_ips += 1
|
||
blocked_ips.append(ip)
|
||
else:
|
||
log_info(f"IP {ip} in whitelist, ignorato")
|
||
|
||
if blocked_ips:
|
||
log_anomaly(f"IP bloccati in questo batch: {len(blocked_ips)}")
|
||
# Mostra alcuni esempi
|
||
examples = blocked_ips[:3]
|
||
log_info(f"Esempi IP bloccati: {', '.join(examples)}")
|
||
|
||
return len(df), processed_ips
|
||
|
||
except Exception as e:
|
||
log_error(f"Errore processamento batch: {e}")
|
||
return 0, 0
|
||
|
||
def run_detection(args):
|
||
"""Esecuzione rilevamento principale con feedback completo"""
|
||
try:
|
||
log_phase("Avvio sistema di rilevamento DDoS v03")
|
||
|
||
reset_stats()
|
||
|
||
# Carica componenti
|
||
engine = create_engine_simple()
|
||
if not engine:
|
||
return False
|
||
|
||
# Pulizia automatica IP vecchi (se richiesta)
|
||
if args.cleanup:
|
||
log_phase("🧹 PULIZIA AUTOMATICA IP VECCHI")
|
||
removed_count = cleanup_old_ips(engine, args.retention_days)
|
||
if removed_count > 0:
|
||
log_result(f"Pulizia completata: {removed_count} IP rimossi")
|
||
elif removed_count == 0:
|
||
log_info("Pulizia completata: nessun IP da rimuovere")
|
||
|
||
model, preprocessor = load_models_simple()
|
||
if not model:
|
||
return False
|
||
|
||
whitelist = load_whitelist_simple()
|
||
last_id = load_last_analyzed_id()
|
||
|
||
log_success(f"Sistema inizializzato - Rilevamento da ID {last_id:,}")
|
||
|
||
# Estrai e processa dati
|
||
df = extract_data_simple(engine, last_id, args.batch_size)
|
||
|
||
if df.empty:
|
||
log_result("Nessun nuovo dato da analizzare")
|
||
show_live_stats(force=True)
|
||
return True
|
||
|
||
# Imposta info batch per statistiche
|
||
live_stats['total_batches'] = 1
|
||
live_stats['current_batch'] = 1
|
||
|
||
# Processa batch
|
||
log_phase("Analisi anomalie in corso")
|
||
|
||
records_processed, anomalies_found = process_batch_simple(
|
||
df, engine, model, whitelist, args.sensibility
|
||
)
|
||
|
||
# Aggiorna statistiche
|
||
unique_ips = 0
|
||
if 'Messaggio2' in df.columns:
|
||
unique_ips = df['Messaggio2'].str.split(':').str[0].nunique()
|
||
|
||
update_stats(records_processed, anomalies_found, unique_ips, anomalies_found)
|
||
|
||
# Salva ultimo ID
|
||
if not df.empty:
|
||
last_analyzed_id = df['ID'].max()
|
||
save_last_analyzed_id(last_analyzed_id)
|
||
|
||
# Mostra risultati finali
|
||
show_live_stats(force=True)
|
||
|
||
log_phase("Rilevamento completato")
|
||
log_success(f"Risultati: {anomalies_found} anomalie su {records_processed:,} record")
|
||
|
||
if anomalies_found > 0:
|
||
anomaly_percentage = (anomalies_found / records_processed) * 100
|
||
log_anomaly(f"Tasso di anomalie: {anomaly_percentage:.2f}%")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
log_error(f"Errore rilevamento: {e}")
|
||
return False
|
||
|
||
def main():
|
||
"""Funzione principale con interfaccia migliorata"""
|
||
parser = argparse.ArgumentParser(description='Rilevamento DDoS v03 - Con feedback dettagliato')
|
||
parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione batch (default: 10k)')
|
||
parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11), help='Sensibilità 1-10 (1=più sensibile)')
|
||
parser.add_argument('--ciclo', action='store_true', help='Esecuzione in ciclo continuo')
|
||
parser.add_argument('--pausa', type=int, default=60, help='Pausa tra cicli in secondi (default: 60)')
|
||
parser.add_argument('--debug', action='store_true', help='Debug logging')
|
||
parser.add_argument('--cleanup', action='store_true', help='Esegui pulizia IP vecchi prima del rilevamento')
|
||
parser.add_argument('--retention-days', type=int, default=7, help='Giorni di ritenzione IP bloccati (default: 7)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.debug:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
|
||
# Header con informazioni dettagliate
|
||
print(f"\n{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.CYAN}🛡️ SISTEMA RILEVAMENTO DDoS v03 - FEEDBACK DETTAGLIATO{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.CYAN}{'='*70}{Colors.END}")
|
||
|
||
log_info(f"Configurazione batch: {args.batch_size:,} record")
|
||
log_info(f"Sensibilita rilevamento: {args.sensibility}/10")
|
||
log_info(f"Debug mode: {'ON' if args.debug else 'OFF'}")
|
||
log_info(f"Modalita ciclo: {'ON' if args.ciclo else 'OFF'}")
|
||
log_info(f"Pulizia automatica: {'ON' if args.cleanup else 'OFF'}")
|
||
|
||
if args.cleanup:
|
||
log_info(f"Ritenzione IP: {args.retention_days} giorni")
|
||
|
||
if args.ciclo:
|
||
log_info(f"Pausa tra cicli: {args.pausa} secondi")
|
||
|
||
# Gestione interruzione
|
||
def signal_handler(signum, frame):
|
||
print(f"\n{Colors.BOLD}{Colors.YELLOW}⚠ Interruzione ricevuta{Colors.END}")
|
||
show_live_stats(force=True)
|
||
log_warning("Sistema arrestato dall'utente")
|
||
sys.exit(0)
|
||
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
|
||
# Esecuzione
|
||
if args.ciclo:
|
||
log_success("🔄 Modalità ciclo continuo attivata")
|
||
ciclo = 0
|
||
|
||
while True:
|
||
ciclo += 1
|
||
|
||
print(f"\n{Colors.BOLD}{Colors.MAGENTA}{'='*50}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.MAGENTA}🔄 CICLO {ciclo}{Colors.END}")
|
||
print(f"{Colors.BOLD}{Colors.MAGENTA}{'='*50}{Colors.END}")
|
||
|
||
success = run_detection(args)
|
||
|
||
if success:
|
||
log_success(f"Ciclo {ciclo} completato con successo")
|
||
log_info(f"Pausa di {args.pausa} secondi prima del prossimo ciclo...")
|
||
else:
|
||
log_error(f"Errore nel ciclo {ciclo}")
|
||
log_warning(f"Pausa estesa di {args.pausa * 2} secondi...")
|
||
time.sleep(args.pausa)
|
||
|
||
# Countdown visivo
|
||
for remaining in range(args.pausa, 0, -1):
|
||
print(f"\r{Colors.CYAN}⏳ Prossimo ciclo tra: {remaining:02d}s{Colors.END}", end='')
|
||
sys.stdout.flush()
|
||
time.sleep(1)
|
||
print() # Nuova linea
|
||
else:
|
||
# Esecuzione singola
|
||
success = run_detection(args)
|
||
|
||
if success:
|
||
print(f"\n{Colors.BOLD}{Colors.GREEN}🎉 RILEVAMENTO COMPLETATO CON SUCCESSO!{Colors.END}")
|
||
else:
|
||
print(f"\n{Colors.BOLD}{Colors.RED}❌ RILEVAMENTO FALLITO!{Colors.END}")
|
||
|
||
sys.exit(0 if success else 1)
|
||
|
||
def cleanup_old_ips(engine, retention_days=7):
|
||
"""
|
||
Rimuove IP vecchi dalla tabella ip_list per ddos_detect_v03
|
||
|
||
Args:
|
||
engine: Connessione database
|
||
retention_days: Giorni dopo i quali rimuovere gli IP (default: 7)
|
||
"""
|
||
try:
|
||
log_info(f"Pulizia IP più vecchi di {retention_days} giorni...")
|
||
|
||
with engine.connect() as conn:
|
||
# Query per contare IP da rimuovere
|
||
count_query = text("""
|
||
SELECT COUNT(*) as count FROM ip_list
|
||
WHERE list_name = 'ddos_detect_v03'
|
||
AND retrieved_at < DATE_SUB(NOW(), INTERVAL :days DAY)
|
||
""")
|
||
|
||
old_count = conn.execute(count_query, {"days": retention_days}).fetchone()[0]
|
||
|
||
if old_count > 0:
|
||
# Rimuovi IP vecchi
|
||
cleanup_query = text("""
|
||
DELETE FROM ip_list
|
||
WHERE list_name = 'ddos_detect_v03'
|
||
AND retrieved_at < DATE_SUB(NOW(), INTERVAL :days DAY)
|
||
""")
|
||
|
||
result = conn.execute(cleanup_query, {"days": retention_days})
|
||
removed_count = result.rowcount
|
||
|
||
log_result(f"Rimossi {removed_count} IP vecchi dalla lista ddos_detect_v03")
|
||
return removed_count
|
||
else:
|
||
log_info("Nessun IP vecchio da rimuovere")
|
||
return 0
|
||
|
||
except Exception as e:
|
||
log_anomaly(f"Errore pulizia IP vecchi: {e}")
|
||
return -1
|
||
|
||
if __name__ == "__main__":
|
||
main() |