Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
513 lines
17 KiB
Python
513 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.sql import text
|
|
from joblib import load
|
|
import logging
|
|
import gc
|
|
import os
|
|
import time
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
import numpy as np
|
|
import threading
|
|
import argparse
|
|
import signal
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
# Configurazione del logging semplificata
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('detect_debug.log')
|
|
]
|
|
)
|
|
|
|
# Configurazione del database
|
|
DB_USER = os.environ.get('DB_USER', 'root')
|
|
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
|
|
DB_HOST = os.environ.get('DB_HOST', 'localhost')
|
|
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
|
|
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
|
|
|
|
# Percorsi dei file
|
|
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
|
|
MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
|
|
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
|
|
WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt')
|
|
LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt')
|
|
|
|
# Parametri semplificati
|
|
RISK_LEVELS = {
|
|
'NORMALE': 0.1,
|
|
'BASSO': 0.3,
|
|
'MEDIO': 0.6,
|
|
'ALTO': 0.8,
|
|
'CRITICO': 0.95
|
|
}
|
|
|
|
# Colori per output
|
|
class Colors:
|
|
BLUE = '\033[94m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
BOLD = '\033[1m'
|
|
END = '\033[0m'
|
|
|
|
def log_phase(message):
|
|
print(f"\n{Colors.BOLD}{Colors.GREEN}▶ {message}{Colors.END}\n")
|
|
logging.info(f"FASE: {message}")
|
|
|
|
def log_result(message):
|
|
print(f"{Colors.BLUE}✓ {message}{Colors.END}")
|
|
logging.info(f"RISULTATO: {message}")
|
|
|
|
def log_warning(message):
|
|
print(f"{Colors.YELLOW}⚠ {message}{Colors.END}")
|
|
logging.warning(message)
|
|
|
|
def log_error(message):
|
|
print(f"{Colors.RED}✗ {message}{Colors.END}")
|
|
logging.error(message)
|
|
|
|
# Variabili globali per statistiche
|
|
stats = {
|
|
'records_processed': 0,
|
|
'anomalies_found': 0,
|
|
'ips_analyzed': 0,
|
|
'start_time': None
|
|
}
|
|
|
|
def reset_stats():
|
|
global stats
|
|
stats['records_processed'] = 0
|
|
stats['anomalies_found'] = 0
|
|
stats['ips_analyzed'] = 0
|
|
stats['start_time'] = time.time()
|
|
|
|
def update_stats(records=0, anomalies=0, ips=0):
|
|
global stats
|
|
stats['records_processed'] += records
|
|
stats['anomalies_found'] += anomalies
|
|
stats['ips_analyzed'] += ips
|
|
|
|
def print_stats():
|
|
global stats
|
|
elapsed = time.time() - stats['start_time'] if stats['start_time'] else 0
|
|
print(f"""
|
|
{Colors.BOLD}======== STATISTICHE RILEVAMENTO ========{Colors.END}
|
|
Tempo trascorso: {elapsed:.1f} secondi
|
|
Record processati: {stats['records_processed']}
|
|
Anomalie trovate: {stats['anomalies_found']}
|
|
IP analizzati: {stats['ips_analyzed']}
|
|
{Colors.BOLD}========================================={Colors.END}
|
|
""")
|
|
|
|
def create_engine_simple():
|
|
"""Crea connessione database semplificata"""
|
|
try:
|
|
engine = create_engine(
|
|
CONN_STRING,
|
|
pool_size=3,
|
|
max_overflow=5,
|
|
pool_recycle=1800,
|
|
pool_pre_ping=True,
|
|
pool_timeout=30,
|
|
echo=False
|
|
)
|
|
|
|
# Test connessione
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SELECT 1")).fetchone()
|
|
|
|
return engine
|
|
except Exception as e:
|
|
log_error(f"Errore connessione database: {e}")
|
|
return None
|
|
|
|
def load_models_simple():
|
|
"""Carica i modelli in modo semplificato"""
|
|
try:
|
|
log_phase("Caricamento modelli")
|
|
|
|
if not os.path.exists(MODEL_PATH):
|
|
log_error(f"Modello non trovato: {MODEL_PATH}")
|
|
return None, None
|
|
|
|
model = load(MODEL_PATH)
|
|
log_result("Modello caricato")
|
|
|
|
# Carica preprocessor se disponibile
|
|
preprocessor = None
|
|
if os.path.exists(PREPROCESSOR_PATH):
|
|
preprocessor = load(PREPROCESSOR_PATH)
|
|
log_result("Preprocessor caricato")
|
|
else:
|
|
log_warning("Preprocessor non trovato, usando fallback")
|
|
preprocessor = {'feature_columns': [f'feature_{i}' for i in range(50)]}
|
|
|
|
return model, preprocessor
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore caricamento modelli: {e}")
|
|
return None, None
|
|
|
|
def load_whitelist_simple():
|
|
"""Carica whitelist semplificata"""
|
|
try:
|
|
if not os.path.exists(WHITELIST_PATH):
|
|
log_warning("Whitelist non trovata, usando lista vuota")
|
|
return set()
|
|
|
|
with open(WHITELIST_PATH, 'r') as f:
|
|
whitelist = set(line.strip() for line in f if line.strip() and not line.startswith('#'))
|
|
|
|
log_result(f"Whitelist caricata: {len(whitelist)} IP")
|
|
return whitelist
|
|
|
|
except Exception as e:
|
|
log_warning(f"Errore caricamento whitelist: {e}")
|
|
return set()
|
|
|
|
def load_last_analyzed_id():
|
|
"""Carica ultimo ID analizzato"""
|
|
try:
|
|
if os.path.exists(LAST_ID_PATH):
|
|
with open(LAST_ID_PATH, 'r') as f:
|
|
return int(f.read().strip())
|
|
return 0
|
|
except Exception as e:
|
|
log_warning(f"Errore caricamento ultimo ID: {e}")
|
|
return 0
|
|
|
|
def save_last_analyzed_id(last_id):
|
|
"""Salva ultimo ID analizzato"""
|
|
try:
|
|
with open(LAST_ID_PATH, 'w') as f:
|
|
f.write(str(last_id))
|
|
except Exception as e:
|
|
log_warning(f"Errore salvataggio ultimo ID: {e}")
|
|
|
|
def extract_data_simple(engine, last_id=0, batch_size=10000):
|
|
"""Estrazione dati semplificata"""
|
|
try:
|
|
log_phase(f"Estrazione dati da ID {last_id}")
|
|
|
|
# Query semplice
|
|
query = text("""
|
|
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3
|
|
FROM Esterna
|
|
WHERE ID > :last_id
|
|
ORDER BY ID ASC
|
|
LIMIT :batch_size
|
|
""")
|
|
|
|
df = pd.read_sql(query, engine, params={
|
|
"last_id": last_id,
|
|
"batch_size": batch_size
|
|
})
|
|
|
|
log_result(f"Estratti {len(df)} record")
|
|
return df
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore estrazione dati: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def prepare_data_simple(df):
|
|
"""Preparazione dati compatibile con analisys_01.py"""
|
|
try:
|
|
if df.empty:
|
|
return None
|
|
|
|
# Stessa logica di analisys_01.py per compatibilità
|
|
feature_data = {}
|
|
n_samples = len(df)
|
|
|
|
# 1. Feature temporali (10 feature)
|
|
if 'Data' in df.columns and 'Ora' in df.columns:
|
|
try:
|
|
df['Data'] = pd.to_datetime(df['Data'], errors='coerce')
|
|
df['Ora'] = pd.to_timedelta(df['Ora'].astype(str), errors='coerce')
|
|
df['Timestamp'] = df['Data'] + df['Ora']
|
|
feature_data['hour'] = df['Timestamp'].dt.hour.fillna(0).values
|
|
feature_data['day'] = df['Timestamp'].dt.dayofweek.fillna(0).values
|
|
feature_data['minute'] = df['Timestamp'].dt.minute.fillna(0).values
|
|
except:
|
|
feature_data['hour'] = np.zeros(n_samples)
|
|
feature_data['day'] = np.zeros(n_samples)
|
|
feature_data['minute'] = np.zeros(n_samples)
|
|
else:
|
|
feature_data['hour'] = np.zeros(n_samples)
|
|
feature_data['day'] = np.zeros(n_samples)
|
|
feature_data['minute'] = np.zeros(n_samples)
|
|
|
|
# 7 feature temporali aggiuntive
|
|
for i in range(7):
|
|
feature_data[f'time_{i}'] = np.random.random(n_samples) * 0.1
|
|
|
|
# 2. Feature protocollo (15 feature)
|
|
if 'Messaggio1' in df.columns:
|
|
proto_data = df['Messaggio1'].fillna('').astype(str)
|
|
protocols = ['TCP', 'UDP', 'ICMP', 'HTTP', 'SSH', 'FTP', 'DNS']
|
|
|
|
for i, protocol in enumerate(protocols):
|
|
feature_data[f'proto_{i}'] = proto_data.str.contains(protocol, case=False).astype(int).values
|
|
|
|
for i in range(len(protocols), 15):
|
|
feature_data[f'proto_{i}'] = np.zeros(n_samples)
|
|
else:
|
|
for i in range(15):
|
|
feature_data[f'proto_{i}'] = np.zeros(n_samples)
|
|
|
|
# 3. Feature Host (5 feature)
|
|
if 'Host' in df.columns:
|
|
host_data = df['Host'].fillna('').astype(str)
|
|
feature_data['host_fibra'] = host_data.str.contains('FIBRA', case=False).astype(int).values
|
|
feature_data['host_empty'] = df['Host'].isna().astype(int).values
|
|
feature_data['host_len'] = host_data.str.len().values / 100.0
|
|
else:
|
|
feature_data['host_fibra'] = np.zeros(n_samples)
|
|
feature_data['host_empty'] = np.zeros(n_samples)
|
|
feature_data['host_len'] = np.zeros(n_samples)
|
|
|
|
for i in range(3, 5):
|
|
feature_data[f'host_{i}'] = np.zeros(n_samples)
|
|
|
|
# 4. Feature IP (10 feature)
|
|
if 'Messaggio2' in df.columns:
|
|
ip_data = df['Messaggio2'].str.split(':').str[0].fillna('unknown').astype(str)
|
|
for i in range(10):
|
|
feature_data[f'ip_{i}'] = (pd.util.hash_array(ip_data.values) % (2**(i+3))) / (2**(i+3))
|
|
else:
|
|
for i in range(10):
|
|
feature_data[f'ip_{i}'] = np.zeros(n_samples)
|
|
|
|
# 5. Feature ID (10 feature)
|
|
if 'ID' in df.columns:
|
|
id_values = df['ID'].fillna(0).values
|
|
id_normalized = (id_values - id_values.min()) / (id_values.max() - id_values.min() + 1)
|
|
|
|
for i in range(10):
|
|
feature_data[f'id_{i}'] = np.roll(id_normalized, i) * (0.9 ** i)
|
|
else:
|
|
for i in range(10):
|
|
feature_data[f'id_{i}'] = np.zeros(n_samples)
|
|
|
|
# Assicura 50 feature totali
|
|
total_features = len(feature_data)
|
|
if total_features < 50:
|
|
for i in range(total_features, 50):
|
|
feature_data[f'extra_{i}'] = np.zeros(n_samples)
|
|
elif total_features > 50:
|
|
keys_to_remove = list(feature_data.keys())[50:]
|
|
for key in keys_to_remove:
|
|
del feature_data[key]
|
|
|
|
# Crea array numpy
|
|
feature_names = sorted(feature_data.keys())
|
|
X = np.column_stack([feature_data[name] for name in feature_names])
|
|
|
|
return X
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore preparazione dati: {e}")
|
|
return None
|
|
|
|
def predict_anomalies_simple(model, features, sensitivity=5):
|
|
"""Predizione anomalie semplificata"""
|
|
try:
|
|
if features is None or features.shape[0] == 0:
|
|
return np.array([])
|
|
|
|
# Predizione base
|
|
predictions = model.predict(features)
|
|
|
|
# Applica sensibilità se supportata
|
|
if hasattr(model, 'decision_function'):
|
|
try:
|
|
scores = model.decision_function(features)
|
|
threshold = -0.2 * (sensitivity / 5.0)
|
|
predictions = np.where(scores < threshold, -1, 1)
|
|
except:
|
|
pass # Usa predizioni standard
|
|
|
|
return predictions
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore predizione: {e}")
|
|
return np.array([])
|
|
|
|
def handle_anomaly_simple(engine, ip_address, risk_level='MEDIO'):
|
|
"""Gestione anomalia semplificata"""
|
|
try:
|
|
if not ip_address or pd.isna(ip_address):
|
|
return False
|
|
|
|
# Inserimento semplificato nella tabella Fibra
|
|
with engine.connect() as conn:
|
|
insert_query = text("""
|
|
INSERT INTO Fibra (IndirizzoIP, Data, Ora, Host, Attivo, Lista, NumeroAttacchi, LivelloDiRischio)
|
|
VALUES (:ip, CURDATE(), CURTIME(), '', 1, 'ddos_detect', 1, 2)
|
|
ON DUPLICATE KEY UPDATE
|
|
Attivo = 1,
|
|
NumeroAttacchi = NumeroAttacchi + 1,
|
|
Data = CURDATE(),
|
|
Ora = CURTIME()
|
|
""")
|
|
|
|
conn.execute(insert_query, {"ip": ip_address})
|
|
conn.commit()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
log_warning(f"Errore gestione anomalia per {ip_address}: {e}")
|
|
return False
|
|
|
|
def process_batch_simple(df, engine, model, whitelist, sensitivity=5):
|
|
"""Processamento batch semplificato"""
|
|
try:
|
|
if df.empty:
|
|
return 0, 0
|
|
|
|
# Prepara dati
|
|
X = prepare_data_simple(df)
|
|
if X is None:
|
|
return 0, 0
|
|
|
|
# Predizione
|
|
predictions = predict_anomalies_simple(model, X, sensitivity)
|
|
if len(predictions) == 0:
|
|
return 0, 0
|
|
|
|
# Trova anomalie
|
|
anomaly_indices = np.where(predictions == -1)[0]
|
|
anomaly_count = len(anomaly_indices)
|
|
|
|
if anomaly_count == 0:
|
|
return len(df), 0
|
|
|
|
# Estrai IP dalle anomalie
|
|
processed_ips = 0
|
|
for idx in anomaly_indices:
|
|
if 'Messaggio2' in df.columns:
|
|
msg2 = df.iloc[idx]['Messaggio2']
|
|
if pd.notna(msg2) and ':' in str(msg2):
|
|
ip = str(msg2).split(':')[0]
|
|
|
|
# Controlla whitelist
|
|
if ip not in whitelist:
|
|
if handle_anomaly_simple(engine, ip):
|
|
processed_ips += 1
|
|
|
|
return len(df), processed_ips
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore processamento batch: {e}")
|
|
return 0, 0
|
|
|
|
def run_detection(args):
|
|
"""Esecuzione rilevamento principale"""
|
|
try:
|
|
reset_stats()
|
|
|
|
# Carica componenti
|
|
engine = create_engine_simple()
|
|
if not engine:
|
|
return False
|
|
|
|
model, preprocessor = load_models_simple()
|
|
if not model:
|
|
return False
|
|
|
|
whitelist = load_whitelist_simple()
|
|
last_id = load_last_analyzed_id()
|
|
|
|
log_result(f"Avvio rilevamento da ID {last_id}")
|
|
|
|
# Estrai e processa dati
|
|
df = extract_data_simple(engine, last_id, args.batch_size)
|
|
|
|
if df.empty:
|
|
log_result("Nessun nuovo dato da analizzare")
|
|
return True
|
|
|
|
# Processa batch
|
|
records_processed, anomalies_found = process_batch_simple(
|
|
df, engine, model, whitelist, args.sensibility
|
|
)
|
|
|
|
# Aggiorna statistiche
|
|
update_stats(records_processed, anomalies_found, len(df['Messaggio2'].dropna().unique()) if 'Messaggio2' in df.columns else 0)
|
|
|
|
# Salva ultimo ID
|
|
if not df.empty:
|
|
last_analyzed_id = df['ID'].max()
|
|
save_last_analyzed_id(last_analyzed_id)
|
|
|
|
# Mostra risultati
|
|
print_stats()
|
|
log_result(f"Rilevamento completato: {anomalies_found} anomalie su {records_processed} record")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
log_error(f"Errore rilevamento: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Funzione principale semplificata"""
|
|
parser = argparse.ArgumentParser(description='Rilevamento DDoS semplificato v02')
|
|
parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione batch (default: 10k)')
|
|
parser.add_argument('--sensibility', type=int, default=5, choices=range(1, 11), help='Sensibilità 1-10 (default: 5)')
|
|
parser.add_argument('--ciclo', action='store_true', help='Esecuzione in ciclo continuo')
|
|
parser.add_argument('--pausa', type=int, default=60, help='Pausa tra cicli in secondi (default: 60)')
|
|
parser.add_argument('--debug', action='store_true', help='Debug logging')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
log_phase("Sistema rilevamento DDoS SEMPLIFICATO v02")
|
|
log_result(f"Config: batch {args.batch_size}, sensibilità {args.sensibility}")
|
|
|
|
# Gestione interruzione
|
|
def signal_handler(signum, frame):
|
|
log_warning("Interruzione ricevuta")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
# Esecuzione
|
|
if args.ciclo:
|
|
log_result("Modalità ciclo continuo attivata")
|
|
ciclo = 0
|
|
|
|
while True:
|
|
ciclo += 1
|
|
log_phase(f"Ciclo {ciclo}")
|
|
|
|
success = run_detection(args)
|
|
|
|
if success:
|
|
log_result(f"Ciclo {ciclo} completato. Pausa {args.pausa} secondi...")
|
|
else:
|
|
log_error(f"Errore nel ciclo {ciclo}. Pausa {args.pausa * 2} secondi...")
|
|
time.sleep(args.pausa)
|
|
|
|
time.sleep(args.pausa)
|
|
else:
|
|
# Esecuzione singola
|
|
success = run_detection(args)
|
|
sys.exit(0 if success else 1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |