Creates a Python script that loads old detection data, reanalyzes IPs with the new hybrid detector, and compares the results to identify differences and improvements. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: fe294b77-4492-471d-9d6e-9c924153f4d8 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/RJGlbTt
266 lines
8.4 KiB
Python
266 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
IDS Model Comparison Script
|
|
Confronta detection del vecchio modello (1.0.0) con il nuovo Hybrid Detector (2.0.0)
|
|
"""
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from ml_hybrid_detector import MLHybridDetector
|
|
from ml_analyzer import MLAnalyzer
|
|
|
|
load_dotenv()
|
|
|
|
|
|
def get_db_connection():
|
|
"""Connect to PostgreSQL database"""
|
|
return psycopg2.connect(
|
|
host=os.getenv('PGHOST', 'localhost'),
|
|
port=os.getenv('PGPORT', 5432),
|
|
database=os.getenv('PGDATABASE', 'ids'),
|
|
user=os.getenv('PGUSER', 'postgres'),
|
|
password=os.getenv('PGPASSWORD')
|
|
)
|
|
|
|
|
|
def load_old_detections(limit=100):
|
|
"""
|
|
Carica le detection del vecchio modello (model_version 1.0.0)
|
|
"""
|
|
print("\n[1] Caricamento detection vecchio modello (v1.0.0)...")
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
SELECT
|
|
d.id,
|
|
d.ip_address,
|
|
d.anomaly_score,
|
|
d.risk_level,
|
|
d.detection_count,
|
|
d.last_detected,
|
|
d.blocked,
|
|
d.model_version
|
|
FROM detections d
|
|
WHERE d.model_version = '1.0.0'
|
|
ORDER BY d.anomaly_score DESC
|
|
LIMIT %s
|
|
"""
|
|
|
|
cursor.execute(query, (limit,))
|
|
detections = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
print(f" Trovate {len(detections)} detection del modello v1.0.0")
|
|
|
|
return detections
|
|
|
|
|
|
def get_network_logs_for_ip(ip_address, days=7):
|
|
"""
|
|
Recupera i log di rete per un IP specifico (ultimi N giorni)
|
|
"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
query = """
|
|
SELECT
|
|
timestamp,
|
|
source_ip,
|
|
destination_ip as dest_ip,
|
|
destination_port as dest_port,
|
|
protocol,
|
|
packet_length,
|
|
action
|
|
FROM network_logs
|
|
WHERE source_ip = %s
|
|
AND timestamp > NOW() - INTERVAL '1 day' * %s
|
|
ORDER BY timestamp DESC
|
|
LIMIT 10000
|
|
"""
|
|
|
|
cursor.execute(query, (ip_address, days))
|
|
rows = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return rows
|
|
|
|
|
|
def reanalyze_with_hybrid(detector, ip_address, old_detection):
|
|
"""
|
|
Rianalizza un IP con il nuovo Hybrid Detector
|
|
"""
|
|
# Recupera log per questo IP
|
|
logs = get_network_logs_for_ip(ip_address, days=7)
|
|
|
|
if not logs:
|
|
return None
|
|
|
|
df = pd.DataFrame(logs)
|
|
|
|
# Feature extraction (come nel detector)
|
|
features_df = detector.extract_features(df)
|
|
|
|
if len(features_df) == 0:
|
|
return None
|
|
|
|
# Prendi le feature dell'IP aggregato
|
|
ip_features = features_df.iloc[0:1]
|
|
|
|
# Rianalizza con nuovo modello
|
|
result = detector.detect(ip_features)
|
|
|
|
if not result or len(result) == 0:
|
|
return None
|
|
|
|
new_detection = result[0]
|
|
|
|
# Confronto
|
|
comparison = {
|
|
'ip_address': ip_address,
|
|
'logs_count': len(logs),
|
|
|
|
# Vecchio modello (v1.0.0)
|
|
'old_score': old_detection['anomaly_score'],
|
|
'old_risk_level': old_detection['risk_level'],
|
|
'old_blocked': old_detection['blocked'],
|
|
|
|
# Nuovo modello (v2.0.0)
|
|
'new_score': new_detection.get('anomaly_score', 0),
|
|
'new_risk_level': new_detection.get('risk_level', 'unknown'),
|
|
'new_confidence': new_detection.get('confidence', 'unknown'),
|
|
'new_is_anomaly': new_detection.get('is_anomaly', False),
|
|
|
|
# Delta
|
|
'score_delta': new_detection.get('anomaly_score', 0) - old_detection['anomaly_score'],
|
|
'risk_changed': old_detection['risk_level'] != new_detection.get('risk_level', 'unknown'),
|
|
}
|
|
|
|
return comparison
|
|
|
|
|
|
def main():
|
|
print("\n" + "="*80)
|
|
print(" IDS MODEL COMPARISON - v1.0.0 vs v2.0.0")
|
|
print("="*80)
|
|
|
|
# Carica vecchie detection
|
|
old_detections = load_old_detections(limit=50)
|
|
|
|
if not old_detections:
|
|
print("\n❌ Nessuna detection del vecchio modello trovata!")
|
|
return
|
|
|
|
# Carica nuovo modello Hybrid
|
|
print("\n[2] Caricamento nuovo Hybrid Detector (v2.0.0)...")
|
|
detector = MLHybridDetector(model_dir="models")
|
|
|
|
if not detector.load_models():
|
|
print("\n❌ Modelli Hybrid non trovati! Esegui prima il training:")
|
|
print(" sudo /opt/ids/deployment/run_ml_training.sh")
|
|
return
|
|
|
|
print(f" ✅ Hybrid Detector caricato (18 feature selezionate)")
|
|
|
|
# Rianalizza ogni IP con nuovo modello
|
|
print(f"\n[3] Rianalisi di {len(old_detections)} IP con nuovo modello...")
|
|
print(" (Questo può richiedere alcuni minuti...)")
|
|
|
|
comparisons = []
|
|
|
|
for i, old_det in enumerate(old_detections):
|
|
ip = old_det['ip_address']
|
|
|
|
print(f"\n [{i+1}/{len(old_detections)}] Analisi IP: {ip}")
|
|
print(f" Old: score={old_det['anomaly_score']:.1f}, risk={old_det['risk_level']}, blocked={old_det['blocked']}")
|
|
|
|
comparison = reanalyze_with_hybrid(detector, ip, old_det)
|
|
|
|
if comparison:
|
|
comparisons.append(comparison)
|
|
print(f" New: score={comparison['new_score']:.1f}, risk={comparison['new_risk_level']}, confidence={comparison['new_confidence']}")
|
|
print(f" Δ: {comparison['score_delta']:+.1f} score")
|
|
else:
|
|
print(f" ⚠ Nessun log recente trovato per questo IP")
|
|
|
|
# Riepilogo
|
|
print("\n" + "="*80)
|
|
print(" RISULTATI CONFRONTO")
|
|
print("="*80)
|
|
|
|
if not comparisons:
|
|
print("\n❌ Nessun IP rianalizzato (log non disponibili)")
|
|
return
|
|
|
|
df_comp = pd.DataFrame(comparisons)
|
|
|
|
# Statistiche
|
|
print(f"\nIP rianalizzati: {len(comparisons)}/{len(old_detections)}")
|
|
print(f"\nScore medio:")
|
|
print(f" Vecchio modello: {df_comp['old_score'].mean():.1f}")
|
|
print(f" Nuovo modello: {df_comp['new_score'].mean():.1f}")
|
|
print(f" Delta medio: {df_comp['score_delta'].mean():+.1f}")
|
|
|
|
# False Positives (vecchio modello flaggava, nuovo no)
|
|
false_positives = df_comp[
|
|
(df_comp['old_score'] >= 80) &
|
|
(~df_comp['new_is_anomaly'])
|
|
]
|
|
|
|
print(f"\n🎯 Possibili False Positives ridotti: {len(false_positives)}")
|
|
if len(false_positives) > 0:
|
|
print("\n IP che vecchio modello bloccava ma nuovo modello ritiene normali:")
|
|
for _, row in false_positives.iterrows():
|
|
print(f" • {row['ip_address']} (old={row['old_score']:.0f}, new={row['new_score']:.0f})")
|
|
|
|
# True Positives confermati
|
|
true_positives = df_comp[
|
|
(df_comp['old_score'] >= 80) &
|
|
(df_comp['new_is_anomaly'])
|
|
]
|
|
|
|
print(f"\n✅ Anomalie confermate da entrambi i modelli: {len(true_positives)}")
|
|
|
|
# Confidence breakdown (solo nuovo modello)
|
|
if 'new_confidence' in df_comp.columns:
|
|
print(f"\n📊 Confidence Level distribuzione (nuovo modello):")
|
|
conf_counts = df_comp['new_confidence'].value_counts()
|
|
for conf, count in conf_counts.items():
|
|
print(f" • {conf}: {count} IP")
|
|
|
|
# Risk level changes
|
|
risk_changes = df_comp[df_comp['risk_changed']]
|
|
print(f"\n🔄 IP con cambio livello di rischio: {len(risk_changes)}")
|
|
|
|
# Top 10 maggiori riduzioni score
|
|
print(f"\n📉 Top 10 riduzioni score (possibili FP corretti):")
|
|
top_reductions = df_comp.nsmallest(10, 'score_delta')
|
|
for i, row in enumerate(top_reductions.itertuples(), 1):
|
|
print(f" {i}. {row.ip_address}: {row.old_score:.0f} → {row.new_score:.0f} ({row.score_delta:+.0f})")
|
|
|
|
# Top 10 maggiori aumenti score
|
|
print(f"\n📈 Top 10 aumenti score (nuove anomalie scoperte):")
|
|
top_increases = df_comp.nlargest(10, 'score_delta')
|
|
for i, row in enumerate(top_increases.itertuples(), 1):
|
|
print(f" {i}. {row.ip_address}: {row.old_score:.0f} → {row.new_score:.0f} ({row.score_delta:+.0f})")
|
|
|
|
# Salva CSV per analisi dettagliata
|
|
output_file = f"model_comparison_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
df_comp.to_csv(output_file, index=False)
|
|
print(f"\n💾 Risultati completi salvati in: {output_file}")
|
|
|
|
print("\n" + "="*80)
|
|
print("✅ Confronto completato!")
|
|
print("="*80 + "\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|