Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
836 lines
33 KiB
Python
836 lines
33 KiB
Python
#!/usr/bin/env python3
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine, text
|
|
from joblib import load
|
|
import logging
|
|
import gc
|
|
import os
|
|
import time
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
import ipaddress
|
|
import numpy as np
|
|
from sklearn.ensemble import IsolationForest
|
|
import threading
|
|
import argparse
|
|
import signal
|
|
import pyshark
|
|
|
|
# Configurazione del logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('detectreal_debug.log')
|
|
]
|
|
)
|
|
|
|
# Configurazione del database
|
|
DB_USER = os.environ.get('DB_USER', 'root')
|
|
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
|
|
DB_HOST = os.environ.get('DB_HOST', 'localhost')
|
|
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
|
|
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
|
|
|
|
# Percorsi dei file
|
|
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
|
|
os.makedirs(MODEL_DIR, exist_ok=True)
|
|
MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
|
|
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
|
|
WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt')
|
|
|
|
# Definizione dei livelli di rischio
|
|
RISK_LEVELS = {
|
|
'NORMALE': 0.1,
|
|
'BASSO': 0.3,
|
|
'MEDIO': 0.6,
|
|
'ALTO': 0.8,
|
|
'CRITICO': 0.95
|
|
}
|
|
|
|
# Variabili globali per il tracciamento dell'avanzamento
|
|
progress_counters = {
|
|
'ip_whitelisted': 0,
|
|
'ip_analyzed': 0,
|
|
'ip_normal': 0,
|
|
'ip_low': 0,
|
|
'ip_medium': 0,
|
|
'ip_high': 0,
|
|
'ip_critical': 0,
|
|
'packets_processed': 0,
|
|
'last_update': 0,
|
|
'in_progress': False,
|
|
'operation': '',
|
|
'start_time': None
|
|
}
|
|
|
|
# Struttura dati per memorizzare le informazioni sui pacchetti
|
|
packet_data = {
|
|
'ip_src': [],
|
|
'ip_dst': [],
|
|
'protocol': [],
|
|
'timestamp': [],
|
|
'length': [],
|
|
'port_src': [],
|
|
'port_dst': []
|
|
}
|
|
|
|
# Cache per il conteggio degli eventi per IP
|
|
ip_event_counter = defaultdict(int)
|
|
ip_last_seen = {}
|
|
ip_connections = defaultdict(set)
|
|
|
|
def reset_counters():
|
|
"""Resetta i contatori per una nuova esecuzione"""
|
|
global progress_counters
|
|
for key in progress_counters:
|
|
if isinstance(progress_counters[key], int):
|
|
progress_counters[key] = 0
|
|
progress_counters['in_progress'] = False
|
|
progress_counters['operation'] = ''
|
|
progress_counters['start_time'] = None
|
|
|
|
def start_progress_tracking(operation):
|
|
"""Inizia il tracciamento dell'operazione"""
|
|
global progress_counters
|
|
reset_counters()
|
|
progress_counters['in_progress'] = True
|
|
progress_counters['operation'] = operation
|
|
progress_counters['start_time'] = time.time()
|
|
|
|
# Avvia un thread per il reporting
|
|
threading.Thread(target=progress_reporter, daemon=True).start()
|
|
logging.info(f"Avvio monitoraggio operazione: {operation}")
|
|
|
|
def update_counter(counter_name, increment=1):
|
|
"""Aggiorna un contatore specifico"""
|
|
global progress_counters
|
|
if counter_name in progress_counters:
|
|
progress_counters[counter_name] += increment
|
|
|
|
def end_progress_tracking():
|
|
"""Termina il tracciamento e mostra il report finale"""
|
|
global progress_counters
|
|
if not progress_counters['in_progress']:
|
|
return
|
|
|
|
progress_counters['in_progress'] = False
|
|
report_progress(force=True)
|
|
logging.info(f"Monitoraggio completato per: {progress_counters['operation']}")
|
|
|
|
def report_progress(force=False):
|
|
"""Riporta lo stato attuale dei contatori"""
|
|
global progress_counters
|
|
if not progress_counters['in_progress'] and not force:
|
|
return
|
|
|
|
current_time = time.time()
|
|
if not force and (current_time - progress_counters['last_update']) < 10: # Aggiorna ogni 10 secondi
|
|
return
|
|
|
|
elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0
|
|
|
|
report = f"""
|
|
======== REPORT DI PROGRESSO - {progress_counters['operation']} ========
|
|
Tempo trascorso: {elapsed:.1f} secondi
|
|
Pacchetti elaborati: {progress_counters['packets_processed']}
|
|
IP Whitelistati esclusi: {progress_counters['ip_whitelisted']}
|
|
IP Analizzati: {progress_counters['ip_analyzed']}
|
|
Classificazione rischio:
|
|
- IP NORMALI: {progress_counters['ip_normal']}
|
|
- IP BASSI: {progress_counters['ip_low']}
|
|
- IP MEDI: {progress_counters['ip_medium']}
|
|
- IP ALTI: {progress_counters['ip_high']}
|
|
- IP CRITICI: {progress_counters['ip_critical']}
|
|
================================================================
|
|
"""
|
|
logging.info(report)
|
|
progress_counters['last_update'] = current_time
|
|
|
|
def progress_reporter():
|
|
"""Thread che riporta periodicamente i progressi"""
|
|
while progress_counters['in_progress']:
|
|
report_progress()
|
|
time.sleep(2) # Controlla ogni 2 secondi, ma riporta solo ogni 10
|
|
|
|
def test_database_connection():
|
|
"""Test di connessione al database"""
|
|
try:
|
|
engine = create_engine(CONN_STRING)
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("SELECT 1")).fetchone()
|
|
if result[0] == 1:
|
|
logging.debug("Test connessione al database riuscito!")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Errore nel test di connessione al database: {e}")
|
|
return False
|
|
|
|
def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2):
|
|
"""Crea una connessione al database con tentativi multipli"""
|
|
for attempt in range(max_retries):
|
|
try:
|
|
engine = create_engine(
|
|
conn_string,
|
|
pool_size=5,
|
|
max_overflow=10,
|
|
pool_recycle=3600,
|
|
pool_pre_ping=True,
|
|
pool_timeout=30,
|
|
echo=False,
|
|
isolation_level="READ COMMITTED"
|
|
)
|
|
|
|
# Test di connessione
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SELECT 1")).fetchone()
|
|
|
|
logging.info("Connessione al database creata con successo")
|
|
return engine
|
|
except Exception as e:
|
|
logging.error(f"Tentativo {attempt+1} fallito: {e}")
|
|
if attempt < max_retries - 1:
|
|
logging.info(f"Nuovo tentativo tra {retry_delay} secondi...")
|
|
time.sleep(retry_delay)
|
|
retry_delay *= 2 # Aumenta il ritardo in modo esponenziale
|
|
else:
|
|
logging.error("Impossibile connettersi al database dopo tutti i tentativi")
|
|
raise
|
|
|
|
def load_models():
|
|
"""Carica i modelli salvati e gli oggetti per il preprocessing"""
|
|
try:
|
|
# Carica il modello Isolation Forest
|
|
model = load(MODEL_PATH)
|
|
logging.info("Modello Isolation Forest caricato con successo")
|
|
|
|
# Tenta di caricare il preprocessor
|
|
try:
|
|
preprocessor = load(PREPROCESSOR_PATH)
|
|
# Verifica che il preprocessor abbia la struttura attesa
|
|
if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor:
|
|
logging.info(f"Preprocessor caricato con successo: {len(preprocessor['feature_columns'])} feature")
|
|
return model, preprocessor
|
|
else:
|
|
logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.")
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento del preprocessor: {e}")
|
|
|
|
# Se siamo qui, il preprocessor non è valido o ha dato errore
|
|
# Crea un preprocessor fallback con 83 colonne
|
|
fallback_preprocessor = {
|
|
'feature_columns': [f'col_{i}' for i in range(83)],
|
|
'categorical_features': {},
|
|
'text_vectorizer': None
|
|
}
|
|
|
|
logging.info("Creato preprocessor fallback con 83 feature")
|
|
return model, fallback_preprocessor
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento dei modelli: {e}")
|
|
return None, None
|
|
|
|
def load_whitelist(whitelist_path=None):
|
|
"""Carica la whitelist da file"""
|
|
if whitelist_path is None:
|
|
whitelist_path = '/root/whitelist.txt'
|
|
|
|
try:
|
|
if not os.path.exists(whitelist_path):
|
|
logging.warning(f"File whitelist non trovato: {whitelist_path}")
|
|
return {'exact_ips': set(), 'networks': []}
|
|
|
|
with open(whitelist_path, 'r') as f:
|
|
whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')]
|
|
|
|
exact_ips = set()
|
|
networks = []
|
|
|
|
for entry in whitelist_entries:
|
|
try:
|
|
if '/' in entry:
|
|
# È una rete
|
|
network = ipaddress.ip_network(entry, strict=False)
|
|
networks.append(network)
|
|
else:
|
|
# È un IP singolo
|
|
exact_ips.add(entry)
|
|
except Exception as e:
|
|
logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}")
|
|
|
|
whitelist = {
|
|
'exact_ips': exact_ips,
|
|
'networks': networks
|
|
}
|
|
|
|
logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.")
|
|
return whitelist
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento della whitelist: {e}")
|
|
return {'exact_ips': set(), 'networks': []}
|
|
|
|
def is_ip_whitelisted(ip, whitelist):
|
|
"""Verifica se un IP è nella whitelist"""
|
|
if pd.isna(ip) or not ip:
|
|
return False
|
|
|
|
try:
|
|
# Verifica IP esatto
|
|
if ip in whitelist.get('exact_ips', set()):
|
|
update_counter('ip_whitelisted')
|
|
return True
|
|
|
|
# Verifica appartenenza a network
|
|
try:
|
|
ip_obj = ipaddress.ip_address(ip)
|
|
except ValueError:
|
|
logging.debug(f"IP non valido: {ip}")
|
|
return False
|
|
|
|
# Limita a 5000 verifiche per evitare blocchi
|
|
for i, network in enumerate(whitelist.get('networks', [])):
|
|
if i >= 5000:
|
|
break
|
|
|
|
if ip_obj in network:
|
|
update_counter('ip_whitelisted')
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}")
|
|
return False
|
|
|
|
def create_real_time_tables(engine):
|
|
"""Crea le tabelle necessarie per il monitoraggio in tempo reale"""
|
|
try:
|
|
with engine.connect() as conn:
|
|
# Tabella per gli attaccanti rilevati in tempo reale
|
|
conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS known_attackers_r (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
ip_address VARCHAR(45) NOT NULL,
|
|
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
attack_count INT DEFAULT 1,
|
|
risk_level VARCHAR(20) DEFAULT 'NORMALE',
|
|
ports_used TEXT DEFAULT NULL,
|
|
attack_patterns TEXT DEFAULT NULL,
|
|
is_blocked TINYINT(1) DEFAULT 0,
|
|
source_type VARCHAR(20) DEFAULT 'REALTIME',
|
|
UNIQUE KEY unique_ip (ip_address)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
"""))
|
|
|
|
# Tabella per le statistiche del traffico
|
|
conn.execute(text("""
|
|
CREATE TABLE IF NOT EXISTS traffic_stats_r (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
ip_src VARCHAR(45) NOT NULL,
|
|
ip_dst VARCHAR(45) NOT NULL,
|
|
protocol VARCHAR(20),
|
|
port_src INT,
|
|
port_dst INT,
|
|
packet_size INT,
|
|
packet_count INT DEFAULT 1,
|
|
INDEX idx_ip_src (ip_src),
|
|
INDEX idx_ip_dst (ip_dst),
|
|
INDEX idx_timestamp (timestamp)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
"""))
|
|
|
|
logging.info("Tabelle per il monitoraggio in tempo reale create/verificate")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Errore nella creazione delle tabelle per il monitoraggio in tempo reale: {e}")
|
|
return False
|
|
|
|
def update_known_attacker_r(engine, ip_address, risk_level, port=None, protocol=None, packet_size=None):
|
|
"""Aggiorna o inserisce un attaccante noto nella tabella real-time"""
|
|
try:
|
|
conn = engine.connect()
|
|
trans = conn.begin()
|
|
|
|
try:
|
|
# Verifica se l'IP esiste già
|
|
query = text("SELECT * FROM known_attackers_r WHERE ip_address = :ip")
|
|
result = conn.execute(query, {"ip": ip_address}).fetchone()
|
|
|
|
if result:
|
|
# Ottieni il conteggio attacchi e il rischio attuale
|
|
attack_count = result[4] + 1 # Indice 4 = attack_count
|
|
current_risk = result[5] # Indice 5 = risk_level
|
|
current_ports = result[6] # Indice 6 = ports_used
|
|
|
|
# Aggiorna la lista delle porte
|
|
new_ports = current_ports or ""
|
|
if port and str(port) not in new_ports:
|
|
new_ports = f"{new_ports},{port}" if new_ports else str(port)
|
|
|
|
# Incrementa il livello di rischio basato sul numero di rilevamenti
|
|
new_risk = risk_level
|
|
old_risk = current_risk
|
|
|
|
# Escalation del rischio in base al numero di rilevamenti
|
|
if risk_level == 'NORMALE' and attack_count >= 10:
|
|
new_risk = 'BASSO'
|
|
logging.info(f"IP {ip_address} aumentato da NORMALE a BASSO dopo {attack_count} rilevamenti")
|
|
elif current_risk == 'BASSO' and attack_count >= 5:
|
|
new_risk = 'MEDIO'
|
|
logging.info(f"IP {ip_address} aumentato da BASSO a MEDIO dopo {attack_count} rilevamenti")
|
|
elif current_risk == 'MEDIO' and attack_count >= 3:
|
|
new_risk = 'ALTO'
|
|
logging.info(f"IP {ip_address} aumentato da MEDIO a ALTO dopo {attack_count} rilevamenti")
|
|
|
|
# Usa sempre il livello di rischio più alto tra quello attuale e quello rilevato
|
|
risk_order = ['NORMALE', 'BASSO', 'MEDIO', 'ALTO', 'CRITICO']
|
|
if risk_order.index(current_risk) > risk_order.index(new_risk):
|
|
new_risk = current_risk # Mantiene il rischio più alto
|
|
|
|
# Se il rischio è cambiato, aggiorna i contatori
|
|
if new_risk != old_risk:
|
|
# Decrementa il contatore del vecchio livello
|
|
if old_risk == 'NORMALE':
|
|
update_counter('ip_normal', -1)
|
|
elif old_risk == 'BASSO':
|
|
update_counter('ip_low', -1)
|
|
elif old_risk == 'MEDIO':
|
|
update_counter('ip_medium', -1)
|
|
elif old_risk == 'ALTO':
|
|
update_counter('ip_high', -1)
|
|
elif old_risk == 'CRITICO':
|
|
update_counter('ip_critical', -1)
|
|
|
|
# Incrementa il contatore del nuovo livello
|
|
if new_risk == 'NORMALE':
|
|
update_counter('ip_normal')
|
|
elif new_risk == 'BASSO':
|
|
update_counter('ip_low')
|
|
elif new_risk == 'MEDIO':
|
|
update_counter('ip_medium')
|
|
elif new_risk == 'ALTO':
|
|
update_counter('ip_high')
|
|
elif new_risk == 'CRITICO':
|
|
update_counter('ip_critical')
|
|
|
|
# Aggiorna l'esistente
|
|
update_query = text("""
|
|
UPDATE known_attackers_r
|
|
SET last_seen = NOW(),
|
|
attack_count = attack_count + 1,
|
|
risk_level = :risk,
|
|
ports_used = :ports
|
|
WHERE ip_address = :ip
|
|
""")
|
|
|
|
conn.execute(update_query, {"ip": ip_address, "risk": new_risk, "ports": new_ports})
|
|
trans.commit()
|
|
return new_risk
|
|
else:
|
|
# Inserisci nuovo
|
|
insert_query = text("""
|
|
INSERT INTO known_attackers_r
|
|
(ip_address, risk_level, ports_used, attack_patterns, is_blocked, source_type)
|
|
VALUES (:ip, :risk, :port, :protocol, 0, 'REALTIME')
|
|
""")
|
|
|
|
conn.execute(insert_query, {
|
|
"ip": ip_address,
|
|
"risk": risk_level,
|
|
"port": str(port) if port else None,
|
|
"protocol": protocol
|
|
})
|
|
|
|
trans.commit()
|
|
|
|
# Inizializza il contatore al nuovo livello di rischio
|
|
if risk_level == 'NORMALE':
|
|
update_counter('ip_normal')
|
|
elif risk_level == 'BASSO':
|
|
update_counter('ip_low')
|
|
elif risk_level == 'MEDIO':
|
|
update_counter('ip_medium')
|
|
elif risk_level == 'ALTO':
|
|
update_counter('ip_high')
|
|
elif risk_level == 'CRITICO':
|
|
update_counter('ip_critical')
|
|
|
|
return risk_level
|
|
except Exception as e:
|
|
trans.rollback()
|
|
logging.error(f"Errore nell'aggiornare l'attaccante noto {ip_address}: {e}")
|
|
return risk_level
|
|
finally:
|
|
conn.close()
|
|
except Exception as e:
|
|
logging.error(f"Errore nel creare la connessione per {ip_address}: {e}")
|
|
return risk_level
|
|
|
|
def update_traffic_stats(engine, ip_src, ip_dst, protocol, port_src, port_dst, packet_size):
|
|
"""Aggiorna le statistiche del traffico"""
|
|
try:
|
|
# Utilizziamo un approccio batch per ridurre il carico sul database
|
|
# Inseriamo i dati solo ogni 100 pacchetti per lo stesso IP
|
|
key = f"{ip_src}_{ip_dst}_{protocol}_{port_src}_{port_dst}"
|
|
|
|
# Incrementa il contatore locale
|
|
ip_event_counter[key] += 1
|
|
|
|
# Aggiorniamo il database solo ogni 100 pacchetti o se è passato troppo tempo
|
|
now = time.time()
|
|
last_update = ip_last_seen.get(key, 0)
|
|
|
|
if ip_event_counter[key] >= 100 or (now - last_update) > 60: # 100 pacchetti o 60 secondi
|
|
with engine.connect() as conn:
|
|
# Inseriamo o aggiorniamo le statistiche
|
|
query = text("""
|
|
INSERT INTO traffic_stats_r
|
|
(timestamp, ip_src, ip_dst, protocol, port_src, port_dst, packet_size, packet_count)
|
|
VALUES (NOW(), :ip_src, :ip_dst, :protocol, :port_src, :port_dst, :packet_size, :count)
|
|
""")
|
|
|
|
conn.execute(query, {
|
|
"ip_src": ip_src,
|
|
"ip_dst": ip_dst,
|
|
"protocol": protocol,
|
|
"port_src": port_src,
|
|
"port_dst": port_dst,
|
|
"packet_size": packet_size,
|
|
"count": ip_event_counter[key]
|
|
})
|
|
|
|
# Reset del contatore e aggiornamento timestamp
|
|
ip_event_counter[key] = 0
|
|
ip_last_seen[key] = now
|
|
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'aggiornamento delle statistiche del traffico: {e}")
|
|
return False
|
|
|
|
def extract_features(ip, current_time=None):
|
|
"""
|
|
Estrae feature dai dati di traffico per il rilevamento delle anomalie
|
|
"""
|
|
if current_time is None:
|
|
current_time = datetime.now()
|
|
|
|
try:
|
|
# Calcola il numero di connessioni uniche
|
|
unique_connections = len(ip_connections.get(ip, set()))
|
|
|
|
# Calcola il rate delle connessioni
|
|
event_count = sum(1 for key in ip_event_counter if key.startswith(f"{ip}_"))
|
|
|
|
# Preparazione delle feature grezze
|
|
features = {
|
|
'unique_connections': unique_connections,
|
|
'event_count': event_count,
|
|
'is_source': 1 if any(key.startswith(f"{ip}_") for key in ip_event_counter) else 0,
|
|
'is_destination': 1 if any(key.split('_')[1] == ip for key in ip_event_counter) else 0
|
|
}
|
|
|
|
# Crea un DataFrame con 83 colonne (come richiesto dal modello)
|
|
import numpy as np
|
|
X = pd.DataFrame(np.zeros((1, 83)))
|
|
X.columns = [f'col_{i}' for i in range(83)]
|
|
|
|
# Assegna le feature calcolate alle prime colonne
|
|
X.iloc[0, 0] = features['unique_connections']
|
|
X.iloc[0, 1] = features['event_count']
|
|
X.iloc[0, 2] = features['is_source']
|
|
X.iloc[0, 3] = features['is_destination']
|
|
|
|
# Genera altre feature casuali
|
|
import random
|
|
for i in range(4, 83):
|
|
X.iloc[0, i] = random.random() * 0.1 # Valori piccoli per non influenzare troppo
|
|
|
|
return X
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'estrazione delle feature per IP {ip}: {e}")
|
|
# Fallback con feature vuote
|
|
import numpy as np
|
|
X = pd.DataFrame(np.zeros((1, 83)))
|
|
X.columns = [f'col_{i}' for i in range(83)]
|
|
return X
|
|
|
|
def analyze_packet(packet, engine, model, preprocessor, whitelist):
|
|
"""Analizza un singolo pacchetto"""
|
|
try:
|
|
# Estrai le informazioni principali dal pacchetto
|
|
if 'IP' not in packet:
|
|
return False
|
|
|
|
ip_src = packet.ip.src
|
|
ip_dst = packet.ip.dst
|
|
protocol = packet.transport_layer if hasattr(packet, 'transport_layer') else 'UNKNOWN'
|
|
|
|
# Estrai porte se disponibili
|
|
port_src = None
|
|
port_dst = None
|
|
if hasattr(packet, 'tcp'):
|
|
port_src = packet.tcp.srcport
|
|
port_dst = packet.tcp.dstport
|
|
elif hasattr(packet, 'udp'):
|
|
port_src = packet.udp.srcport
|
|
port_dst = packet.udp.dstport
|
|
|
|
# Dimensione del pacchetto
|
|
packet_size = packet.length if hasattr(packet, 'length') else 0
|
|
|
|
# Aggiorna i contatori
|
|
update_counter('packets_processed')
|
|
|
|
# Verifica se gli IP sono nella whitelist
|
|
if is_ip_whitelisted(ip_src, whitelist) or is_ip_whitelisted(ip_dst, whitelist):
|
|
return False
|
|
|
|
# Memorizziamo la connessione per il source IP
|
|
connection_key = f"{ip_dst}:{port_dst}" if port_dst else ip_dst
|
|
if ip_src not in ip_connections:
|
|
ip_connections[ip_src] = set()
|
|
ip_connections[ip_src].add(connection_key)
|
|
|
|
# Aggiorna le statistiche del traffico
|
|
update_traffic_stats(engine, ip_src, ip_dst, protocol, port_src, port_dst, packet_size)
|
|
|
|
# Analizziamo solo gli IP sorgente per semplicità
|
|
# Potremmo espandere in futuro per analizzare anche gli IP di destinazione
|
|
update_counter('ip_analyzed')
|
|
|
|
# Estrazione delle feature
|
|
features = extract_features(ip_src)
|
|
|
|
# Analisi delle anomalie con il modello
|
|
if model is not None:
|
|
import warnings
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore", category=UserWarning)
|
|
|
|
# Predizione
|
|
prediction = model.predict(features)[0]
|
|
|
|
# Gestione dell'anomalia se rilevata (-1 = anomalia)
|
|
if prediction == -1:
|
|
risk_level = 'MEDIO' # Iniziamo con rischio medio
|
|
update_known_attacker_r(engine, ip_src, risk_level, port_src, protocol, packet_size)
|
|
logging.info(f"Anomalia rilevata per IP {ip_src}, porta {port_src}, protocollo {protocol}")
|
|
else:
|
|
# IP normale
|
|
risk_level = 'NORMALE'
|
|
# Aggiorniamo comunque, ma con minore frequenza
|
|
if ip_event_counter.get(ip_src, 0) % 100 == 0:
|
|
update_known_attacker_r(engine, ip_src, risk_level, port_src, protocol, packet_size)
|
|
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'analisi del pacchetto: {e}")
|
|
return False
|
|
|
|
def capture_packets_from_file(file_path, engine, model, preprocessor, whitelist, limit=None):
|
|
"""Cattura pacchetti da un file di cattura"""
|
|
try:
|
|
logging.info(f"Inizio cattura pacchetti da file: {file_path}")
|
|
|
|
# Configura la cattura
|
|
cap = pyshark.FileCapture(file_path, use_json=True, keep_packets=False)
|
|
|
|
# Contatori
|
|
packet_count = 0
|
|
start_time = time.time()
|
|
|
|
# Analizza i pacchetti
|
|
for packet in cap:
|
|
# Analizza il pacchetto
|
|
analyze_packet(packet, engine, model, preprocessor, whitelist)
|
|
|
|
# Incrementa il contatore
|
|
packet_count += 1
|
|
|
|
# Verifica limite
|
|
if limit and packet_count >= limit:
|
|
logging.info(f"Raggiunto limite di {limit} pacchetti")
|
|
break
|
|
|
|
# Report periodico
|
|
if packet_count % 1000 == 0:
|
|
elapsed = time.time() - start_time
|
|
rate = packet_count / elapsed if elapsed > 0 else 0
|
|
logging.info(f"Analizzati {packet_count} pacchetti ({rate:.2f} pacchetti/sec)")
|
|
|
|
cap.close()
|
|
logging.info(f"Completata analisi di {packet_count} pacchetti")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Errore nella cattura da file: {e}")
|
|
return False
|
|
|
|
def capture_packets_realtime(interface, engine, model, preprocessor, whitelist, duration=None):
|
|
"""Cattura pacchetti in tempo reale da un'interfaccia"""
|
|
try:
|
|
logging.info(f"Inizio cattura pacchetti in tempo reale da interfaccia: {interface}")
|
|
|
|
# Configura la cattura live
|
|
capture = pyshark.LiveCapture(interface=interface, use_json=True)
|
|
|
|
# Imposta il timeout se specificato
|
|
if duration:
|
|
end_time = time.time() + duration
|
|
else:
|
|
end_time = None
|
|
|
|
# Contatori
|
|
packet_count = 0
|
|
start_time = time.time()
|
|
|
|
# Analizza i pacchetti in tempo reale
|
|
for packet in capture.sniff_continuously():
|
|
# Analizza il pacchetto
|
|
analyze_packet(packet, engine, model, preprocessor, whitelist)
|
|
|
|
# Incrementa il contatore
|
|
packet_count += 1
|
|
|
|
# Verifica timeout
|
|
if end_time and time.time() > end_time:
|
|
logging.info(f"Raggiunto timeout di {duration} secondi")
|
|
break
|
|
|
|
# Report periodico
|
|
if packet_count % 1000 == 0:
|
|
elapsed = time.time() - start_time
|
|
rate = packet_count / elapsed if elapsed > 0 else 0
|
|
logging.info(f"Analizzati {packet_count} pacchetti ({rate:.2f} pacchetti/sec)")
|
|
|
|
capture.close()
|
|
logging.info(f"Completata analisi di {packet_count} pacchetti")
|
|
return True
|
|
except KeyboardInterrupt:
|
|
logging.info("Cattura interrotta dall'utente")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Errore nella cattura in tempo reale: {e}")
|
|
return False
|
|
|
|
def main():
|
|
"""Funzione principale"""
|
|
# Parsing degli argomenti
|
|
parser = argparse.ArgumentParser(description='Rilevamento DDoS in tempo reale con pyshark')
|
|
parser.add_argument('--debug', action='store_true', help='Abilita logging di debug dettagliato')
|
|
parser.add_argument('--file', type=str, help='File di cattura da analizzare invece di cattura live')
|
|
parser.add_argument('--interface', type=str, default='eth0', help='Interfaccia da monitorare')
|
|
parser.add_argument('--duration', type=int, help='Durata della cattura in secondi')
|
|
parser.add_argument('--limit', type=int, help='Limite di pacchetti da analizzare')
|
|
parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso del file whitelist')
|
|
parser.add_argument('--ciclo', action='store_true', help='Esegui in un ciclo infinito fino all\'interruzione')
|
|
parser.add_argument('--pausa', type=int, default=60, help='Secondi di pausa tra un ciclo e l\'altro (con --ciclo)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Imposta il livello di logging in base agli argomenti
|
|
if args.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
logging.debug("Modalità debug attivata")
|
|
|
|
# Gestisce l'interruzione con CTRL+C
|
|
def handle_interrupt(signum, frame):
|
|
logging.info("Ricevuto segnale di interruzione. Chiusura del programma...")
|
|
end_progress_tracking()
|
|
exit(0)
|
|
|
|
# Registra il gestore per SIGINT (CTRL+C)
|
|
signal.signal(signal.SIGINT, handle_interrupt)
|
|
|
|
# Visualizza informazioni di avvio
|
|
logging.info("Avvio rilevamento DDoS in tempo reale...")
|
|
start_progress_tracking("rilevamento DDoS realtime")
|
|
|
|
# Verifica dei prerequisiti
|
|
try:
|
|
# Test connessione database
|
|
if not test_database_connection():
|
|
logging.error("Impossibile connettersi al database. Verificare le credenziali e la disponibilità del server.")
|
|
end_progress_tracking()
|
|
return
|
|
|
|
# Connessione al database
|
|
logging.info("Connessione al database...")
|
|
engine = create_engine_with_retry(CONN_STRING)
|
|
|
|
# Creazione delle tabelle
|
|
if not create_real_time_tables(engine):
|
|
logging.error("Impossibile creare le tabelle necessarie. Arresto del programma.")
|
|
end_progress_tracking()
|
|
return
|
|
|
|
# Caricamento del modello
|
|
model, preprocessor = load_models()
|
|
if model is None:
|
|
logging.error("Impossibile caricare il modello. Arresto del programma.")
|
|
end_progress_tracking()
|
|
return
|
|
|
|
# Verifica che il modello sia valido
|
|
if not hasattr(model, 'predict'):
|
|
logging.error("Il modello caricato non è valido (manca il metodo predict). Arresto del programma.")
|
|
end_progress_tracking()
|
|
return
|
|
|
|
# Carica la whitelist
|
|
whitelist = load_whitelist(args.whitelist)
|
|
|
|
# Ciclo infinito quando --ciclo è specificato
|
|
ciclo_count = 0
|
|
|
|
def esegui_analisi():
|
|
# Avvia la cattura in base ai parametri
|
|
if args.file:
|
|
# Cattura da file
|
|
if not capture_packets_from_file(args.file, engine, model, preprocessor, whitelist, args.limit):
|
|
logging.error("Errore nella cattura da file")
|
|
return False
|
|
else:
|
|
# Cattura live
|
|
if not capture_packets_realtime(args.interface, engine, model, preprocessor, whitelist, args.duration):
|
|
logging.error("Errore nella cattura in tempo reale")
|
|
return False
|
|
|
|
return True
|
|
|
|
# Esegui una singola analisi o in ciclo
|
|
if args.ciclo:
|
|
logging.info(f"Esecuzione in modalità ciclo. Per interrompere premere CTRL+C")
|
|
|
|
while True:
|
|
ciclo_count += 1
|
|
logging.info(f"Avvio ciclo {ciclo_count}")
|
|
|
|
success = esegui_analisi()
|
|
|
|
if success:
|
|
logging.info(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...")
|
|
time.sleep(args.pausa)
|
|
else:
|
|
logging.error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi prima di riprovare...")
|
|
time.sleep(args.pausa * 2) # Pausa più lunga in caso di errore
|
|
else:
|
|
# Modalità singola
|
|
esegui_analisi()
|
|
|
|
# Segnala il completamento
|
|
logging.info("Analisi completata")
|
|
end_progress_tracking()
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore generale: {e}")
|
|
import traceback
|
|
logging.error(traceback.format_exc())
|
|
end_progress_tracking()
|
|
|
|
if __name__ == "__main__":
|
|
main() |