Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
1140 lines
48 KiB
Python
1140 lines
48 KiB
Python
#!/usr/bin/env python3
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.sql import text
|
|
from joblib import load
|
|
import logging
|
|
import gc
|
|
import os
|
|
import time
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta, timezone
|
|
import ipaddress
|
|
import numpy as np
|
|
from sklearn.ensemble import IsolationForest
|
|
import threading
|
|
import argparse
|
|
import signal
|
|
|
|
# Configurazione del logging avanzata per il debug
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Modificato da DEBUG a INFO per default
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler('ddetect_debug.log')
|
|
]
|
|
)
|
|
|
|
# Configurazione del database
|
|
DB_USER = os.environ.get('DB_USER', 'root')
|
|
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
|
|
DB_HOST = os.environ.get('DB_HOST', 'localhost')
|
|
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
|
|
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
|
|
|
|
# Percorsi dei file
|
|
MODEL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'models')
|
|
os.makedirs(MODEL_DIR, exist_ok=True)
|
|
MODEL_PATH = os.path.join(MODEL_DIR, 'isolation_forest.joblib')
|
|
PREPROCESSOR_PATH = os.path.join(MODEL_DIR, 'preprocessor.joblib')
|
|
WHITELIST_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'whitelist.txt')
|
|
LAST_ID_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'last_analyzed_id.txt')
|
|
|
|
# Definizione dei livelli di rischio e soglie
|
|
RISK_LEVELS = {
|
|
'NORMALE': 0.1,
|
|
'BASSO': 0.3,
|
|
'MEDIO': 0.6,
|
|
'ALTO': 0.8,
|
|
'CRITICO': 0.95
|
|
}
|
|
|
|
logging.debug(f"Percorsi: MODEL_DIR={MODEL_DIR}, WHITELIST_PATH={WHITELIST_PATH}")
|
|
logging.debug(f"Livelli di rischio configurati: {RISK_LEVELS}")
|
|
|
|
# Variabili globali per il tracciamento dell'avanzamento
|
|
progress_counters = {
|
|
'ip_whitelisted': 0,
|
|
'ip_analyzed': 0,
|
|
'ip_normal': 0,
|
|
'ip_low': 0,
|
|
'ip_medium': 0,
|
|
'ip_high': 0,
|
|
'ip_critical': 0,
|
|
'metrics_processed': 0,
|
|
'last_update': 0,
|
|
'in_progress': False,
|
|
'operation': '',
|
|
'start_time': None
|
|
}
|
|
|
|
def reset_counters():
|
|
"""
|
|
Resetta i contatori per una nuova esecuzione
|
|
"""
|
|
global progress_counters
|
|
progress_counters['ip_whitelisted'] = 0
|
|
progress_counters['ip_analyzed'] = 0
|
|
progress_counters['ip_normal'] = 0
|
|
progress_counters['ip_low'] = 0
|
|
progress_counters['ip_medium'] = 0
|
|
progress_counters['ip_high'] = 0
|
|
progress_counters['ip_critical'] = 0
|
|
progress_counters['metrics_processed'] = 0
|
|
progress_counters['last_update'] = 0
|
|
progress_counters['in_progress'] = False
|
|
progress_counters['operation'] = ''
|
|
progress_counters['start_time'] = None
|
|
|
|
def start_progress_tracking(operation):
|
|
"""
|
|
Inizia il tracciamento dell'operazione
|
|
"""
|
|
global progress_counters
|
|
reset_counters()
|
|
progress_counters['in_progress'] = True
|
|
progress_counters['operation'] = operation
|
|
progress_counters['start_time'] = time.time()
|
|
|
|
# Avvia un thread per il reporting
|
|
threading.Thread(target=progress_reporter, daemon=True).start()
|
|
logging.info(f"Avvio monitoraggio operazione: {operation}")
|
|
|
|
def update_counter(counter_name, increment=1):
|
|
"""
|
|
Aggiorna un contatore specifico
|
|
"""
|
|
global progress_counters
|
|
if counter_name in progress_counters:
|
|
progress_counters[counter_name] += increment
|
|
|
|
def end_progress_tracking():
|
|
"""
|
|
Termina il tracciamento e mostra il report finale
|
|
"""
|
|
global progress_counters
|
|
if not progress_counters['in_progress']:
|
|
return
|
|
|
|
progress_counters['in_progress'] = False
|
|
report_progress(force=True)
|
|
logging.info(f"Monitoraggio completato per: {progress_counters['operation']}")
|
|
|
|
def report_progress(force=False):
|
|
"""
|
|
Riporta lo stato attuale dei contatori
|
|
"""
|
|
global progress_counters
|
|
if not progress_counters['in_progress'] and not force:
|
|
return
|
|
|
|
current_time = time.time()
|
|
if not force and (current_time - progress_counters['last_update']) < 10: # Aggiorna ogni 10 secondi
|
|
return
|
|
|
|
elapsed = current_time - progress_counters['start_time'] if progress_counters['start_time'] else 0
|
|
|
|
report = f"""
|
|
======== REPORT DI PROGRESSO - {progress_counters['operation']} ========
|
|
Tempo trascorso: {elapsed:.1f} secondi
|
|
IP Whitelistati esclusi: {progress_counters['ip_whitelisted']}
|
|
Metriche elaborate: {progress_counters['metrics_processed']}
|
|
IP Analizzati: {progress_counters['ip_analyzed']}
|
|
Classificazione rischio:
|
|
- IP NORMALI: {progress_counters['ip_normal']}
|
|
- IP BASSI: {progress_counters['ip_low']}
|
|
- IP MEDI: {progress_counters['ip_medium']}
|
|
- IP ALTI: {progress_counters['ip_high']}
|
|
- IP CRITICI: {progress_counters['ip_critical']}
|
|
================================================================
|
|
"""
|
|
logging.info(report)
|
|
progress_counters['last_update'] = current_time
|
|
|
|
def progress_reporter():
|
|
"""
|
|
Thread che riporta periodicamente i progressi
|
|
"""
|
|
while progress_counters['in_progress']:
|
|
report_progress()
|
|
time.sleep(2) # Controlla ogni 2 secondi, ma riporta solo ogni 10
|
|
|
|
def test_database_connection():
|
|
"""
|
|
Test di connessione al database
|
|
"""
|
|
try:
|
|
logging.debug("Tentativo di connessione al database...")
|
|
engine = create_engine(CONN_STRING)
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("SELECT 1")).fetchone()
|
|
if result[0] == 1:
|
|
logging.debug("Test connessione al database riuscito!")
|
|
tables = conn.execute(text("SHOW TABLES")).fetchall()
|
|
logging.debug(f"Tabelle disponibili: {[t[0] for t in tables]}")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Errore nel test di connessione al database: {e}")
|
|
return False
|
|
|
|
def create_engine_with_retry(conn_string, max_retries=3, retry_delay=2):
|
|
"""
|
|
Crea una connessione al database con tentativi multipli
|
|
"""
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# Configurazione ottimizzata per SQLAlchemy
|
|
engine = create_engine(
|
|
conn_string,
|
|
pool_size=5,
|
|
max_overflow=10,
|
|
pool_recycle=3600,
|
|
pool_pre_ping=True,
|
|
pool_timeout=30,
|
|
echo=False,
|
|
isolation_level="READ COMMITTED"
|
|
)
|
|
|
|
# Test di connessione
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SELECT 1")).fetchone()
|
|
|
|
logging.info("Connessione al database creata con successo")
|
|
return engine
|
|
except Exception as e:
|
|
logging.error(f"Tentativo {attempt+1} fallito: {e}")
|
|
if attempt < max_retries - 1:
|
|
logging.info(f"Nuovo tentativo tra {retry_delay} secondi...")
|
|
time.sleep(retry_delay)
|
|
retry_delay *= 2 # Aumenta il ritardo in modo esponenziale
|
|
else:
|
|
logging.error("Impossibile connettersi al database dopo tutti i tentativi")
|
|
raise
|
|
|
|
def load_models():
|
|
"""
|
|
Carica i modelli salvati e gli oggetti per il preprocessing
|
|
"""
|
|
try:
|
|
# Carica il modello Isolation Forest
|
|
model = load(MODEL_PATH)
|
|
logging.info("Modello Isolation Forest caricato con successo")
|
|
|
|
# Tenta di caricare il preprocessor
|
|
try:
|
|
preprocessor = load(PREPROCESSOR_PATH)
|
|
# Verifica che il preprocessor abbia la struttura attesa
|
|
if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor:
|
|
logging.info(f"Preprocessor caricato con successo: {len(preprocessor['feature_columns'])} feature")
|
|
return model, preprocessor
|
|
else:
|
|
logging.error("Preprocessor non ha la struttura attesa. Utilizzo fallback.")
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento del preprocessor: {e}")
|
|
|
|
# Se siamo qui, il preprocessor non è valido o ha dato errore
|
|
logging.error("Preprocessor non valido o non contiene le informazioni sulle feature. Utilizzo metodo fallback.")
|
|
|
|
# Crea un preprocessor fallback con 53 colonne (numero atteso dal modello)
|
|
fallback_preprocessor = {
|
|
'feature_columns': [f'col_{i}' for i in range(53)],
|
|
'categorical_features': {},
|
|
'text_vectorizer': None
|
|
}
|
|
|
|
logging.info("Creato preprocessor fallback con 53 feature")
|
|
return model, fallback_preprocessor
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento dei modelli: {e}")
|
|
return None, None
|
|
|
|
def load_whitelist(whitelist_path=None):
|
|
"""
|
|
Carica la whitelist da file
|
|
"""
|
|
if whitelist_path is None:
|
|
whitelist_path = '/root/whitelist.txt'
|
|
|
|
try:
|
|
if not os.path.exists(whitelist_path):
|
|
logging.warning(f"File whitelist non trovato: {whitelist_path}")
|
|
return {'exact_ips': set(), 'networks': []}
|
|
|
|
with open(whitelist_path, 'r') as f:
|
|
whitelist_entries = [line.strip() for line in f if line.strip() and not line.startswith('#')]
|
|
|
|
exact_ips = set()
|
|
networks = []
|
|
|
|
for entry in whitelist_entries:
|
|
try:
|
|
if '/' in entry:
|
|
# È una rete
|
|
network = ipaddress.ip_network(entry, strict=False)
|
|
networks.append(network)
|
|
else:
|
|
# È un IP singolo
|
|
exact_ips.add(entry)
|
|
except Exception as e:
|
|
logging.warning(f"Ignorata entry nella whitelist non valida: {entry} - {e}")
|
|
|
|
whitelist = {
|
|
'exact_ips': exact_ips,
|
|
'networks': networks
|
|
}
|
|
|
|
logging.info(f"Whitelisted {len(exact_ips)} IP esatti e {len(networks)} network.")
|
|
return whitelist
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento della whitelist: {e}")
|
|
return {'exact_ips': set(), 'networks': []}
|
|
|
|
def is_ip_whitelisted(ip, whitelist):
|
|
"""
|
|
Verifica se un IP è nella whitelist
|
|
"""
|
|
if pd.isna(ip) or not ip:
|
|
return False
|
|
|
|
try:
|
|
# Ottimizzazione: utilizziamo un set per la verifica di appartenenza diretta
|
|
if ip in whitelist.get('exact_ips', set()):
|
|
update_counter('ip_whitelisted')
|
|
return True
|
|
|
|
# Per le reti dobbiamo convertire l'IP in un oggetto
|
|
try:
|
|
ip_obj = ipaddress.ip_address(ip)
|
|
except ValueError:
|
|
logging.debug(f"IP non valido: {ip}")
|
|
return False
|
|
|
|
# Limitiamo la verifica a massimo 5000 reti per evitare blocchi
|
|
# Verifichiamo le reti solo se l'IP non è già stato trovato come esatto
|
|
for i, network in enumerate(whitelist.get('networks', [])):
|
|
if i >= 5000: # Limitiamo per evitare blocchi
|
|
# Usiamo una variabile globale per loggare questo warning solo una volta per IP
|
|
if not hasattr(is_ip_whitelisted, 'warned_ips'):
|
|
is_ip_whitelisted.warned_ips = set()
|
|
|
|
if ip not in is_ip_whitelisted.warned_ips:
|
|
logging.debug(f"Limite di 5000 reti raggiunto nella verifica whitelist per IP {ip}")
|
|
is_ip_whitelisted.warned_ips.add(ip)
|
|
|
|
# Limita la dimensione del set per evitare crescita incontrollata
|
|
if len(is_ip_whitelisted.warned_ips) > 100:
|
|
is_ip_whitelisted.warned_ips.clear()
|
|
break
|
|
|
|
if ip_obj in network:
|
|
update_counter('ip_whitelisted')
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
logging.error(f"Errore nel controllo whitelist per IP {ip}: {e}")
|
|
return False
|
|
|
|
def load_last_analyzed_id():
|
|
"""
|
|
Carica l'ultimo ID analizzato dal file
|
|
"""
|
|
try:
|
|
if os.path.exists(LAST_ID_PATH):
|
|
with open(LAST_ID_PATH, 'r') as f:
|
|
last_id = int(f.read().strip())
|
|
return last_id
|
|
else:
|
|
logging.info(f"File {LAST_ID_PATH} non trovato. Inizializzo last_analyzed_id a 0.")
|
|
return 0
|
|
except Exception as e:
|
|
logging.error(f"Errore nel caricamento dell'ultimo ID analizzato: {e}")
|
|
return 0
|
|
|
|
def save_last_analyzed_id(last_id):
|
|
"""
|
|
Salva l'ultimo ID analizzato nel file
|
|
"""
|
|
try:
|
|
with open(LAST_ID_PATH, 'w') as f:
|
|
f.write(str(last_id))
|
|
logging.info(f"Ultimo ID analizzato salvato: {last_id}")
|
|
except Exception as e:
|
|
logging.error(f"Errore nel salvataggio dell'ultimo ID analizzato: {e}")
|
|
|
|
def extract_data(engine, last_analyzed_id, batch_size=10000):
|
|
"""
|
|
Estrae i dati dal database a partire dall'ultimo ID analizzato
|
|
Utilizzando la tabella Esterna
|
|
"""
|
|
try:
|
|
query = text("""
|
|
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4
|
|
FROM Esterna
|
|
WHERE ID > :last_id
|
|
ORDER BY ID ASC
|
|
LIMIT :batch_size
|
|
""")
|
|
|
|
new_data = pd.read_sql(query, engine, params={"last_id": last_analyzed_id, "batch_size": batch_size})
|
|
logging.info(f"Dati estratti: {len(new_data)} record dalla tabella Esterna.")
|
|
return new_data
|
|
except Exception as e:
|
|
logging.error(f"Errore durante l'esecuzione della query SQL: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def prepare_data(new_data, preprocessor=None):
|
|
"""
|
|
Prepara i dati per la predizione con un approccio compatibile con il modello
|
|
addestrato in analisys_fixed.py, che si aspetta 83 feature
|
|
"""
|
|
logging.info("--DEBUG-- Inizio preparazione dati compatibile...")
|
|
try:
|
|
# Ottieni i nomi delle feature dal preprocessor se disponibile
|
|
feature_names = []
|
|
if isinstance(preprocessor, dict) and 'feature_columns' in preprocessor:
|
|
feature_names = preprocessor['feature_columns']
|
|
logging.info(f"Trovate {len(feature_names)} feature nel preprocessor")
|
|
else:
|
|
# Feature predefinite osservate nei log
|
|
feature_names = [
|
|
'time_since_last', 'events_last_hour', 'events_last_day', 'time_since_last_mean',
|
|
'time_since_last_std', 'time_since_last_min', 'time_since_last_max',
|
|
'events_last_hour_max', 'events_last_day_max'
|
|
]
|
|
|
|
# Aggiungi feature TF-IDF del protocollo
|
|
for i in range(21):
|
|
feature_names.append(f'protocol_tfidf_{i}')
|
|
|
|
# Aggiungi colonne numeriche
|
|
for i in range(15):
|
|
feature_names.append(f'col_{i}')
|
|
|
|
# Aggiungi feature categoriche per Host
|
|
feature_names.extend(['host_FIBRA', 'host_nan'])
|
|
|
|
# Aggiungi altre colonne come osservato nei log
|
|
for i in range(15):
|
|
feature_names.append(f'col_{i}')
|
|
|
|
# Aggiungi di nuovo le TF-IDF
|
|
for i in range(21):
|
|
feature_names.append(f'protocol_tfidf_{i}')
|
|
|
|
logging.info(f"Usando {len(feature_names)} feature predefinite")
|
|
|
|
# Crea un dataframe con tutte le feature richieste, inizializzate a zero
|
|
import numpy as np
|
|
X = pd.DataFrame(np.zeros((len(new_data), len(feature_names))), index=new_data.index)
|
|
|
|
# Imposta i nomi delle colonne
|
|
X.columns = feature_names
|
|
|
|
# Estrai IP attaccante da Messaggio2 se disponibile
|
|
if 'Messaggio2' in new_data.columns:
|
|
new_data['IP_Attaccante'] = new_data['Messaggio2'].apply(lambda x: x.split(':')[0] if pd.notna(x) and ':' in str(x) else None)
|
|
logging.info(f"Estratti {len(new_data[new_data['IP_Attaccante'].notna()])} IP attaccanti")
|
|
|
|
# Se possibile, aggiungiamo alcune informazioni
|
|
if 'IP_Attaccante' in new_data.columns and 'Host' in new_data.columns:
|
|
try:
|
|
from category_encoders import HashingEncoder
|
|
|
|
# Converti i valori essenziali (IP e Host)
|
|
logging.info("--DEBUG-- Encoding valori essenziali...")
|
|
he = HashingEncoder(n_components=10, hash_method='md5')
|
|
|
|
# Attento a come gestiamo i valori nulli
|
|
encoded_values = he.fit_transform(
|
|
new_data[['Host', 'IP_Attaccante']].fillna('unknown')
|
|
)
|
|
|
|
# Popola le prime colonne disponibili con i valori codificati
|
|
for i in range(min(10, encoded_values.shape[1])):
|
|
col_name = f'col_{i}'
|
|
if col_name in X.columns:
|
|
X[col_name] = encoded_values.iloc[:, i].values
|
|
|
|
# Imposta il valore della colonna host_FIBRA se presente
|
|
if 'host_FIBRA' in X.columns:
|
|
X['host_FIBRA'] = new_data['Host'].fillna('').str.contains('FIBRA').astype(int)
|
|
|
|
# Imposta il valore della colonna host_nan se presente
|
|
if 'host_nan' in X.columns:
|
|
X['host_nan'] = new_data['Host'].isna().astype(int)
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'encoding dei valori: {e}")
|
|
|
|
logging.info(f"--DEBUG-- Preparazione dati completata: {X.shape} (dovrebbe essere {len(new_data)} righe, 83 colonne)")
|
|
return X
|
|
except Exception as e:
|
|
logging.error(f"Errore nella preparazione dei dati: {e}")
|
|
import traceback
|
|
logging.error(f"Traceback: {traceback.format_exc()}")
|
|
return None
|
|
|
|
def predict_anomalies(model, features):
|
|
"""
|
|
Predice le anomalie utilizzando il modello caricato
|
|
"""
|
|
try:
|
|
logging.info(f"--DEBUG-- Predizione su {features.shape[0]} esempi con {features.shape[1]} feature")
|
|
|
|
# Verifica se i nomi delle feature sono corretti
|
|
expected_names = [f'col_{i}' for i in range(53)]
|
|
if not all(col in features.columns for col in expected_names):
|
|
logging.error("Nomi delle colonne non compatibili con il modello")
|
|
# Rinomina le colonne se necessario
|
|
features.columns = [f'col_{i}' for i in range(features.shape[1])]
|
|
logging.info("Colonne rinominate per compatibilità")
|
|
|
|
# Aggiorna il contatore PRIMA di fare la predizione
|
|
update_counter('metrics_processed', features.shape[0])
|
|
|
|
if hasattr(model, 'predict'):
|
|
# Esegui la predizione con timeout
|
|
start_time = time.time()
|
|
max_time = 60 # Massimo 60 secondi
|
|
|
|
# Imposta X_sparse per compatibilità con il modello
|
|
try:
|
|
# Sopprimiamo il warning sui nomi delle feature
|
|
import warnings
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore", category=UserWarning, message="X does not have valid feature names")
|
|
from scipy import sparse
|
|
|
|
# Salva i nomi delle colonne prima di convertire in matrice sparse
|
|
feature_names = features.columns.tolist()
|
|
X_sparse = sparse.csr_matrix(features.values)
|
|
logging.info(f"--DEBUG-- Matrice sparse creata: {X_sparse.shape}")
|
|
|
|
# Prova a fare la predizione con la matrice sparse
|
|
predictions = model.predict(X_sparse)
|
|
logging.info(f"--DEBUG-- Predizione su matrice sparse completata: {len(predictions)} risultati")
|
|
except Exception as e:
|
|
logging.warning(f"Errore sulla matrice sparse: {e}. Provo con DataFrame normale.")
|
|
# Fallback: usa il DataFrame originale
|
|
predictions = model.predict(features)
|
|
logging.info(f"--DEBUG-- Predizione su DataFrame completata: {len(predictions)} risultati")
|
|
|
|
# Conta le anomalie
|
|
anomaly_count = sum(1 for p in predictions if p == -1)
|
|
logging.info(f"Trovate {anomaly_count} anomalie su {len(predictions)} predizioni")
|
|
|
|
return predictions
|
|
else:
|
|
logging.error("Il modello non ha il metodo predict")
|
|
# Fallback - considera tutto normale
|
|
return np.zeros(features.shape[0])
|
|
except Exception as e:
|
|
logging.error(f"Errore durante la predizione: {e}")
|
|
import traceback
|
|
logging.error(traceback.format_exc())
|
|
# Fallback - considera tutto normale
|
|
return np.zeros(features.shape[0])
|
|
|
|
def get_details(engine, ids):
|
|
"""
|
|
Ottieni i dettagli completi per gli ID specificati dalla tabella Esterna
|
|
"""
|
|
try:
|
|
if not ids:
|
|
return pd.DataFrame()
|
|
|
|
id_list = ','.join(map(str, ids))
|
|
query = text("""
|
|
SELECT ID, Data, Ora, Host, IndirizzoIP, Messaggio1, Messaggio2, Messaggio3, Messaggio4
|
|
FROM Esterna
|
|
WHERE ID IN ({})
|
|
""".format(id_list))
|
|
|
|
details = pd.read_sql(query, engine)
|
|
|
|
# Converti timestamp
|
|
if 'Data' in details.columns and 'Ora' in details.columns:
|
|
details['Data'] = pd.to_datetime(details['Data'], errors='coerce')
|
|
details['Ora'] = pd.to_timedelta(details['Ora'].astype(str), errors='coerce')
|
|
details['Timestamp'] = details['Data'] + details['Ora']
|
|
|
|
return details
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'ottenere i dettagli dalla tabella Esterna: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def classify_risk(anomaly_score):
|
|
"""
|
|
Classifica il livello di rischio in base allo score di anomalia
|
|
"""
|
|
# Normalizza lo score (potrebbe essere già normalizzato)
|
|
score = abs(anomaly_score)
|
|
|
|
if score < RISK_LEVELS['NORMALE']:
|
|
update_counter('ip_normal')
|
|
return 'NORMALE'
|
|
elif score < RISK_LEVELS['BASSO']:
|
|
update_counter('ip_low')
|
|
return 'BASSO'
|
|
elif score < RISK_LEVELS['MEDIO']:
|
|
update_counter('ip_medium')
|
|
return 'MEDIO'
|
|
elif score < RISK_LEVELS['ALTO']:
|
|
update_counter('ip_high')
|
|
return 'ALTO'
|
|
else:
|
|
update_counter('ip_critical')
|
|
return 'CRITICO'
|
|
|
|
def is_known_attacker(engine, ip_address):
|
|
"""
|
|
Verifica se un IP è un attaccante noto
|
|
Crea la tabella se non esiste
|
|
"""
|
|
try:
|
|
with engine.connect() as conn:
|
|
# Crea la tabella se non esiste
|
|
logging.debug(f"Verifica tabella known_attackers per IP {ip_address}")
|
|
create_table_query = text("""
|
|
CREATE TABLE IF NOT EXISTS known_attackers (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
ip_address VARCHAR(45) NOT NULL,
|
|
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
attack_count INT DEFAULT 1,
|
|
risk_level VARCHAR(20) DEFAULT 'NORMALE',
|
|
ports_used TEXT DEFAULT NULL,
|
|
attack_patterns TEXT DEFAULT NULL,
|
|
is_blocked TINYINT(1) DEFAULT 0,
|
|
UNIQUE KEY unique_ip (ip_address)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
|
""")
|
|
|
|
# Assicurati che la transazione sia valida
|
|
conn.execute(create_table_query)
|
|
|
|
# Verifica se l'IP esiste
|
|
check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
|
|
logging.debug(f"Esecuzione query: {check_query} con parametri: {{'ip': {ip_address}}}")
|
|
result = conn.execute(check_query, {"ip": ip_address}).fetchone()
|
|
|
|
exists = result is not None
|
|
logging.debug(f"IP {ip_address} è un attaccante noto: {exists}")
|
|
return exists
|
|
except Exception as e:
|
|
logging.error(f"Errore nel verificare se l'IP {ip_address} è un attaccante noto: {e}")
|
|
return False
|
|
|
|
def update_known_attacker(engine, ip_address, risk_level, port=None, message=None):
|
|
"""
|
|
Aggiorna o inserisce un attaccante noto
|
|
Incrementa il livello di rischio se l'IP viene visto più volte
|
|
"""
|
|
try:
|
|
conn = engine.connect()
|
|
trans = conn.begin() # Inizia una transazione esplicita
|
|
|
|
try:
|
|
# Verifica se l'IP esiste già
|
|
logging.debug(f"Aggiornamento attaccante noto: {ip_address}, rischio iniziale: {risk_level}")
|
|
query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
|
|
result = conn.execute(query, {"ip": ip_address}).fetchone()
|
|
|
|
if result:
|
|
# Ottieni il conteggio attacchi e il rischio attuale
|
|
attack_count = result[4] + 1 # Indice 4 = attack_count
|
|
current_risk = result[5] # Indice 5 = risk_level
|
|
current_ports = result[6] # Indice 6 = ports_used
|
|
|
|
logging.debug(f"IP {ip_address}: conteggio attuale={attack_count-1}, rischio attuale={current_risk}")
|
|
|
|
# Aggiorna la lista delle porte
|
|
new_ports = current_ports or ""
|
|
if port and port not in new_ports:
|
|
new_ports = f"{new_ports},{port}" if new_ports else port
|
|
|
|
# Incrementa il livello di rischio basato sul numero di rilevamenti
|
|
new_risk = risk_level
|
|
old_risk = current_risk # Teniamo traccia del rischio precedente
|
|
|
|
# Escalation del rischio in base al numero di rilevamenti
|
|
if risk_level == 'NORMALE' and attack_count >= 10:
|
|
new_risk = 'BASSO'
|
|
logging.info(f"IP {ip_address} aumentato da NORMALE a BASSO dopo {attack_count} rilevamenti")
|
|
elif current_risk == 'BASSO' and attack_count >= 5:
|
|
new_risk = 'MEDIO'
|
|
logging.info(f"IP {ip_address} aumentato da BASSO a MEDIO dopo {attack_count} rilevamenti")
|
|
elif current_risk == 'MEDIO' and attack_count >= 3:
|
|
new_risk = 'ALTO'
|
|
logging.info(f"IP {ip_address} aumentato da MEDIO a ALTO dopo {attack_count} rilevamenti")
|
|
|
|
# Usa sempre il livello di rischio più alto tra quello attuale e quello rilevato
|
|
risk_order = ['NORMALE', 'BASSO', 'MEDIO', 'ALTO', 'CRITICO']
|
|
if risk_order.index(current_risk) > risk_order.index(new_risk):
|
|
new_risk = current_risk # Mantiene il rischio più alto
|
|
|
|
# Se il rischio è cambiato, aggiorna i contatori
|
|
if new_risk != old_risk:
|
|
# Decrementa il contatore del vecchio livello se possibile
|
|
if old_risk == 'NORMALE':
|
|
update_counter('ip_normal', -1)
|
|
elif old_risk == 'BASSO':
|
|
update_counter('ip_low', -1)
|
|
elif old_risk == 'MEDIO':
|
|
update_counter('ip_medium', -1)
|
|
elif old_risk == 'ALTO':
|
|
update_counter('ip_high', -1)
|
|
elif old_risk == 'CRITICO':
|
|
update_counter('ip_critical', -1)
|
|
|
|
# Incrementa il contatore del nuovo livello
|
|
if new_risk == 'NORMALE':
|
|
update_counter('ip_normal')
|
|
elif new_risk == 'BASSO':
|
|
update_counter('ip_low')
|
|
elif new_risk == 'MEDIO':
|
|
update_counter('ip_medium')
|
|
elif new_risk == 'ALTO':
|
|
update_counter('ip_high')
|
|
elif new_risk == 'CRITICO':
|
|
update_counter('ip_critical')
|
|
|
|
logging.debug(f"IP {ip_address}: nuovo conteggio={attack_count}, nuovo rischio={new_risk}, porte={new_ports}")
|
|
|
|
# Aggiorna l'esistente
|
|
update_query = text("""
|
|
UPDATE known_attackers
|
|
SET last_seen = NOW(),
|
|
attack_count = attack_count + 1,
|
|
risk_level = :risk,
|
|
ports_used = :ports
|
|
WHERE ip_address = :ip
|
|
""")
|
|
|
|
conn.execute(update_query, {"ip": ip_address, "risk": new_risk, "ports": new_ports})
|
|
|
|
# Commit della transazione
|
|
trans.commit()
|
|
|
|
# Restituisci il nuovo livello di rischio
|
|
return new_risk
|
|
else:
|
|
# Inserisci nuovo
|
|
logging.debug(f"Inserimento nuovo attaccante: {ip_address}, rischio={risk_level}, porta={port}")
|
|
insert_query = text("""
|
|
INSERT INTO known_attackers
|
|
(ip_address, risk_level, ports_used, attack_patterns, is_blocked)
|
|
VALUES (:ip, :risk, :port, :message, 0)
|
|
""")
|
|
|
|
result = conn.execute(insert_query, {"ip": ip_address, "risk": risk_level, "port": port, "message": message})
|
|
logging.debug(f"Risultato inserimento: {result.rowcount} righe inserite")
|
|
|
|
# Commit della transazione
|
|
trans.commit()
|
|
|
|
# Inizializza il contatore al nuovo livello di rischio
|
|
if risk_level == 'NORMALE':
|
|
update_counter('ip_normal')
|
|
elif risk_level == 'BASSO':
|
|
update_counter('ip_low')
|
|
elif risk_level == 'MEDIO':
|
|
update_counter('ip_medium')
|
|
elif risk_level == 'ALTO':
|
|
update_counter('ip_high')
|
|
elif risk_level == 'CRITICO':
|
|
update_counter('ip_critical')
|
|
|
|
return risk_level
|
|
except Exception as e:
|
|
# Rollback in caso di errore
|
|
trans.rollback()
|
|
logging.error(f"Errore nell'aggiornare l'attaccante noto {ip_address}: {e}")
|
|
logging.error(f"Dettagli errore: {str(e)}")
|
|
return risk_level
|
|
finally:
|
|
# Chiudi la connessione
|
|
conn.close()
|
|
except Exception as e:
|
|
logging.error(f"Errore nel creare la connessione per {ip_address}: {e}")
|
|
return risk_level
|
|
|
|
def should_block_ip(risk_level):
|
|
"""
|
|
Determina se un IP dovrebbe essere bloccato in base al suo livello di rischio
|
|
"""
|
|
should_block = risk_level in ['ALTO', 'CRITICO']
|
|
logging.debug(f"IP con rischio {risk_level} dovrebbe essere bloccato: {should_block}")
|
|
return should_block
|
|
|
|
def insert_anomaly_to_db(engine, ip_address, risk_level, list_name):
|
|
"""
|
|
Inserisce o aggiorna un'anomalia nel database
|
|
"""
|
|
try:
|
|
with engine.connect() as conn:
|
|
# Verifica se la colonna risk_level esiste nella tabella ip_list
|
|
try:
|
|
# Verifica la struttura della tabella
|
|
check_column_query = text("""
|
|
SELECT COUNT(*) AS column_exists
|
|
FROM information_schema.COLUMNS
|
|
WHERE TABLE_SCHEMA = :db_name
|
|
AND TABLE_NAME = 'ip_list'
|
|
AND COLUMN_NAME = 'risk_level'
|
|
""")
|
|
|
|
db_name = os.environ.get('MYSQL_DATABASE', 'LOG_MIKROTIK')
|
|
result = conn.execute(check_column_query, {"db_name": db_name}).fetchone()
|
|
|
|
if result[0] == 0:
|
|
# La colonna non esiste, aggiungiamola
|
|
logging.info("La colonna risk_level non esiste nella tabella ip_list. Aggiunta in corso...")
|
|
alter_query = text("""
|
|
ALTER TABLE ip_list
|
|
ADD COLUMN risk_level VARCHAR(20) DEFAULT 'MEDIO'
|
|
""")
|
|
conn.execute(alter_query)
|
|
logging.info("Colonna risk_level aggiunta con successo.")
|
|
else:
|
|
logging.debug("La colonna risk_level esiste già nella tabella ip_list.")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore nella verifica/aggiunta della colonna risk_level: {e}")
|
|
# Continua comunque, potrebbe fallire nell'insert ma almeno ci abbiamo provato
|
|
|
|
# Ora procediamo con l'inserimento
|
|
upsert_query = text("""
|
|
INSERT INTO ip_list (list_name, ip_address, retrieved_at, risk_level)
|
|
VALUES (:list_name, :ip_address, NOW(), :risk_level)
|
|
ON DUPLICATE KEY UPDATE
|
|
retrieved_at = NOW(),
|
|
risk_level = :risk_level
|
|
""")
|
|
|
|
conn.execute(upsert_query, {
|
|
"list_name": list_name,
|
|
"ip_address": ip_address,
|
|
"risk_level": risk_level
|
|
})
|
|
|
|
logging.info(f"IP {ip_address} inserito/aggiornato nella lista {list_name} con rischio {risk_level}")
|
|
return True
|
|
except Exception as e:
|
|
logging.error(f"Errore nell'inserire l'IP {ip_address} nel database: {e}")
|
|
return False
|
|
|
|
def handle_anomaly(engine, ip_address, risk_level, port=None, message=None, list_name='ddos_ia'):
|
|
"""
|
|
Gestisce un'anomalia rilevata aggiornando le informazioni nel database
|
|
"""
|
|
logging.debug(f"Gestione anomalia per IP: {ip_address}, rischio: {risk_level}")
|
|
|
|
try:
|
|
# Aggiorna le informazioni sull'attaccante nel database
|
|
final_risk = update_known_attacker(engine, ip_address, risk_level, port, message)
|
|
|
|
# Se il rischio è ALTO o CRITICO, aggiungi IP alla lista di blocco
|
|
if should_block_ip(final_risk):
|
|
insert_anomaly_to_db(engine, ip_address, final_risk, list_name)
|
|
except Exception as e:
|
|
logging.error(f"Errore nella gestione dell'anomalia per {ip_address}: {e}")
|
|
|
|
def main():
|
|
"""
|
|
Funzione principale per il rilevamento DDoS
|
|
"""
|
|
# Parsing degli argomenti
|
|
parser = argparse.ArgumentParser(description='Rilevamento DDoS in tempo reale')
|
|
parser.add_argument('--debug', action='store_true', help='Abilita logging di debug dettagliato')
|
|
parser.add_argument('--batch-size', type=int, default=10000, help='Dimensione del batch di dati da analizzare')
|
|
parser.add_argument('--whitelist', type=str, default=WHITELIST_PATH, help='Percorso del file whitelist')
|
|
parser.add_argument('--ciclo', action='store_true', help='Esegui in un ciclo infinito fino all\'interruzione')
|
|
parser.add_argument('--pausa', type=int, default=60, help='Secondi di pausa tra un ciclo e l\'altro (con --ciclo)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Imposta il livello di logging in base agli argomenti
|
|
if args.debug:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
logging.debug("Modalità debug attivata")
|
|
|
|
# Gestisce l'interruzione con CTRL+C
|
|
def handle_interrupt(signum, frame):
|
|
logging.info("Ricevuto segnale di interruzione. Chiusura del programma...")
|
|
end_progress_tracking()
|
|
exit(0)
|
|
|
|
# Registra il gestore per SIGINT (CTRL+C)
|
|
signal.signal(signal.SIGINT, handle_interrupt)
|
|
|
|
# Ciclo infinito quando --ciclo è specificato
|
|
ciclo_count = 0
|
|
|
|
def esegui_analisi():
|
|
# Visualizza informazioni di avvio
|
|
if args.ciclo:
|
|
ciclo_txt = f" (ciclo {ciclo_count})"
|
|
else:
|
|
ciclo_txt = ""
|
|
|
|
logging.info(f"Avvio rilevamento DDoS{ciclo_txt}...")
|
|
start_progress_tracking(f"rilevamento DDoS{ciclo_txt}")
|
|
|
|
# Verifica percorsi e autorizzazioni dei file di modello
|
|
logging.info("Verifica dei percorsi dei modelli...")
|
|
model_files = {
|
|
"Modello principale": MODEL_PATH,
|
|
"Preprocessor": PREPROCESSOR_PATH
|
|
}
|
|
|
|
for name, path in model_files.items():
|
|
if os.path.exists(path):
|
|
logging.info(f"✅ {name} trovato: {path}")
|
|
try:
|
|
# Verifica che il file sia leggibile
|
|
with open(path, 'rb') as f:
|
|
f.read(1)
|
|
logging.info(f"✅ {name} è leggibile")
|
|
except Exception as e:
|
|
logging.error(f"❌ {name} esiste ma non è leggibile: {e}")
|
|
else:
|
|
logging.warning(f"❌ {name} non trovato: {path}")
|
|
|
|
# Test connessione database
|
|
if not test_database_connection():
|
|
logging.error("Impossibile connettersi al database. Verificare le credenziali e la disponibilità del server.")
|
|
end_progress_tracking()
|
|
return False
|
|
|
|
try:
|
|
# Connessione al database
|
|
logging.info("Connessione al database...")
|
|
engine = create_engine_with_retry(CONN_STRING)
|
|
|
|
# Caricamento del modello
|
|
model, preprocessor = load_models()
|
|
if model is None:
|
|
logging.error("Impossibile caricare il modello. Arresto del programma.")
|
|
end_progress_tracking()
|
|
return False
|
|
|
|
# Verifica che il modello sia valido
|
|
if not hasattr(model, 'predict'):
|
|
logging.error("Il modello caricato non è valido (manca il metodo predict). Arresto del programma.")
|
|
end_progress_tracking()
|
|
return False
|
|
|
|
# Carica la whitelist e processa i dati
|
|
whitelist = load_whitelist(args.whitelist)
|
|
|
|
# Verifica se ci sono dati nuovi
|
|
last_id = load_last_analyzed_id()
|
|
logging.info(f"Last analyzed ID: {last_id}")
|
|
|
|
# SEMPLIFICA IL FLUSSO PRINCIPALE
|
|
try:
|
|
# 1. Estrai dati
|
|
logging.info("--DEBUG-- Estrazione dati dal database...")
|
|
new_data = extract_data(engine, last_id, args.batch_size)
|
|
|
|
if new_data.empty:
|
|
logging.info("Nessun nuovo dato da analizzare.")
|
|
end_progress_tracking()
|
|
return True
|
|
|
|
# 2. Estrai e prepara gli IP attaccanti
|
|
logging.info("--DEBUG-- Preparazione IP attaccanti...")
|
|
# Converti in pochi passaggi per evitare blocchi
|
|
new_data['IP_Attaccante'] = new_data['Messaggio2'].apply(
|
|
lambda x: x.split(':')[0] if pd.notna(x) and isinstance(x, str) and ':' in x else None
|
|
)
|
|
|
|
# 3. Conta solo un campione di IP whitelistati per evitare blocchi
|
|
whitelisted_ips = set()
|
|
if len(new_data) > 100:
|
|
# Verifichiamo tutti gli IP per la whitelist
|
|
all_ips = new_data['IP_Attaccante'].dropna().unique()
|
|
logging.info(f"--DEBUG-- Verifica di {len(all_ips)} IP unici contro la whitelist...")
|
|
|
|
for ip in all_ips:
|
|
if is_ip_whitelisted(ip, whitelist):
|
|
whitelisted_ips.add(ip)
|
|
|
|
logging.info(f"Trovati {len(whitelisted_ips)} IP in whitelist su {len(all_ips)} IP unici")
|
|
|
|
# 4. Prepara i dati con metodo minimalista
|
|
logging.info("--DEBUG-- Preparazione dati per predizione...")
|
|
features = prepare_data(new_data, preprocessor)
|
|
|
|
if features is None or features.shape[0] == 0:
|
|
logging.error("Nessuna feature generata per la predizione.")
|
|
end_progress_tracking()
|
|
return True
|
|
|
|
# 5. Fai la predizione
|
|
logging.info("--DEBUG-- Predizione anomalie...")
|
|
predictions = predict_anomalies(model, features)
|
|
if len(predictions) == 0:
|
|
logging.warning("Nessuna predizione generata, uso array di zeri")
|
|
predictions = np.zeros(new_data.shape[0])
|
|
|
|
# 6. Aggiusta le dimensioni se necessario
|
|
if len(predictions) != len(new_data):
|
|
logging.warning(f"Dimensioni differenti: predizioni {len(predictions)}, dati {len(new_data)}")
|
|
if len(predictions) < len(new_data):
|
|
# Estendi l'array delle predizioni
|
|
predictions = np.append(predictions, np.zeros(len(new_data) - len(predictions)))
|
|
else:
|
|
# Tronca l'array delle predizioni
|
|
predictions = predictions[:len(new_data)]
|
|
|
|
# 7. Applica risultati e conta
|
|
new_data['anomaly'] = predictions
|
|
|
|
# 8. Aggiorna i contatori per ogni IP (senza duplicati)
|
|
ip_counter = {}
|
|
for idx, row in new_data.iterrows():
|
|
ip = row.get('IP_Attaccante')
|
|
if pd.notna(ip) and ip not in ip_counter:
|
|
ip_counter[ip] = True
|
|
|
|
if ip not in whitelisted_ips:
|
|
# Un nuovo IP analizzato
|
|
update_counter('ip_analyzed')
|
|
|
|
# Classifica in base al risultato della predizione
|
|
if row['anomaly'] == -1:
|
|
# Anomalo - non incrementiamo qui, lo farà update_known_attacker
|
|
pass
|
|
else:
|
|
# Normale
|
|
update_counter('ip_normal')
|
|
|
|
logging.info(f"Analizzati {len(ip_counter)} IP unici, {len(whitelisted_ips)} in whitelist")
|
|
|
|
# 9. Gestisci anomalie
|
|
anomalies = new_data[new_data['anomaly'] == -1]
|
|
logging.info(f"Rilevate {len(anomalies)} anomalie su {len(new_data)} eventi")
|
|
|
|
if not anomalies.empty:
|
|
# Mostra un esempio delle anomalie nei log
|
|
sample_size = min(5, len(anomalies))
|
|
logging.info(f"Esempio di {sample_size} anomalie rilevate:")
|
|
for idx, row in anomalies.head(sample_size).iterrows():
|
|
ip = row.get('IP_Attaccante')
|
|
msg = row.get('Messaggio2', 'N/A')
|
|
logging.info(f" - [{idx}] IP: {ip}, Messaggio: {msg}")
|
|
|
|
# Conteggia gli IP anomali per IP
|
|
anomaly_ips = {}
|
|
for idx, row in anomalies.iterrows():
|
|
ip = row.get('IP_Attaccante')
|
|
if pd.notna(ip):
|
|
if ip not in anomaly_ips:
|
|
anomaly_ips[ip] = 0
|
|
anomaly_ips[ip] += 1
|
|
|
|
logging.info(f"Trovati {len(anomaly_ips)} IP unici con anomalie")
|
|
|
|
# Mostra i top 10 IP con più anomalie
|
|
top_anomalies = sorted(anomaly_ips.items(), key=lambda x: x[1], reverse=True)[:10]
|
|
if top_anomalies:
|
|
logging.info("Top 10 IP con più anomalie:")
|
|
for ip, count in top_anomalies:
|
|
logging.info(f" - IP: {ip}, Anomalie: {count}")
|
|
|
|
# Processa ciascun IP una sola volta
|
|
processed_ips = set()
|
|
for idx, row in anomalies.iterrows():
|
|
ip = row.get('IP_Attaccante')
|
|
if pd.notna(ip) and ip not in processed_ips and not is_ip_whitelisted(ip, whitelist):
|
|
processed_ips.add(ip)
|
|
|
|
# Assegna un livello di rischio base
|
|
risk_level = 'MEDIO' # Default
|
|
port = None
|
|
|
|
if 'Porta_Attaccante' in row and pd.notna(row['Porta_Attaccante']):
|
|
port = row['Porta_Attaccante']
|
|
|
|
# Crea un messaggio informativo
|
|
msg = f"Anomalia rilevata da {row.get('Messaggio2', 'N/A')}"
|
|
|
|
# Gestisci l'anomalia
|
|
handle_anomaly(engine, ip, risk_level, port, msg, 'ddos_ia')
|
|
|
|
logging.info(f"Gestite anomalie per {len(processed_ips)} IP unici")
|
|
|
|
# 10. Salva l'ultimo ID analizzato
|
|
if not new_data.empty:
|
|
last_analyzed_id = new_data['ID'].max()
|
|
save_last_analyzed_id(last_analyzed_id)
|
|
|
|
# 11. Segnala il completamento
|
|
logging.info(f"Analisi completata: processati {len(new_data)} eventi, trovate {len(anomalies)} anomalie")
|
|
end_progress_tracking()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore durante l'analisi: {e}")
|
|
import traceback
|
|
logging.error(f"Traceback completo: {traceback.format_exc()}")
|
|
end_progress_tracking()
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.error(f"Errore generale: {e}")
|
|
import traceback
|
|
logging.error(f"Traceback completo: {traceback.format_exc()}")
|
|
end_progress_tracking()
|
|
return False
|
|
|
|
# Esegui una singola analisi o in ciclo
|
|
if args.ciclo:
|
|
logging.info(f"Esecuzione in modalità ciclo. Per interrompere premere CTRL+C")
|
|
|
|
while True:
|
|
ciclo_count += 1
|
|
success = esegui_analisi()
|
|
|
|
if success:
|
|
logging.info(f"Ciclo {ciclo_count} completato. Pausa di {args.pausa} secondi...")
|
|
time.sleep(args.pausa)
|
|
else:
|
|
logging.error(f"Errore nel ciclo {ciclo_count}. Pausa di {args.pausa*2} secondi prima di riprovare...")
|
|
time.sleep(args.pausa * 2) # Pausa più lunga in caso di errore
|
|
else:
|
|
# Modalità singola
|
|
esegui_analisi()
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--test":
|
|
logging.info("MODALITÀ TEST: verifica delle funzioni principali")
|
|
try:
|
|
engine = create_engine_with_retry(CONN_STRING)
|
|
test_ip = "192.168.1.1"
|
|
|
|
logging.info(f"Test 1: Verifica tabella known_attackers")
|
|
is_known = is_known_attacker(engine, test_ip)
|
|
logging.info(f"IP {test_ip} è un attaccante noto: {is_known}")
|
|
|
|
logging.info(f"Test 2: Aggiornamento known_attacker")
|
|
new_risk = update_known_attacker(engine, test_ip, "NORMALE", "80", "Test message")
|
|
logging.info(f"Nuovo livello di rischio: {new_risk}")
|
|
|
|
logging.info(f"Test 3: Verifica se ora è un attaccante noto")
|
|
is_known = is_known_attacker(engine, test_ip)
|
|
logging.info(f"IP {test_ip} è ora un attaccante noto: {is_known}")
|
|
|
|
logging.info("MODALITÀ TEST completata")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
logging.error(f"Errore nei test: {e}")
|
|
sys.exit(1)
|
|
else:
|
|
main() |