ids.alfacom.it/python_ml/compare_models.py
marco370 a32700c149 Add script to compare old and new detection models
Creates a Python script that loads old detection data, reanalyzes IPs with the new hybrid detector, and compares the results to identify differences and improvements.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: fe294b77-4492-471d-9d6e-9c924153f4d8
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/RJGlbTt
2025-11-25 08:36:32 +00:00

266 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""
IDS Model Comparison Script
Confronta detection del vecchio modello (1.0.0) con il nuovo Hybrid Detector (2.0.0)
"""
import psycopg2
from psycopg2.extras import RealDictCursor
import pandas as pd
from datetime import datetime
import os
from dotenv import load_dotenv
from ml_hybrid_detector import MLHybridDetector
from ml_analyzer import MLAnalyzer
load_dotenv()
def get_db_connection():
"""Connect to PostgreSQL database"""
return psycopg2.connect(
host=os.getenv('PGHOST', 'localhost'),
port=os.getenv('PGPORT', 5432),
database=os.getenv('PGDATABASE', 'ids'),
user=os.getenv('PGUSER', 'postgres'),
password=os.getenv('PGPASSWORD')
)
def load_old_detections(limit=100):
"""
Carica le detection del vecchio modello (model_version 1.0.0)
"""
print("\n[1] Caricamento detection vecchio modello (v1.0.0)...")
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
query = """
SELECT
d.id,
d.ip_address,
d.anomaly_score,
d.risk_level,
d.detection_count,
d.last_detected,
d.blocked,
d.model_version
FROM detections d
WHERE d.model_version = '1.0.0'
ORDER BY d.anomaly_score DESC
LIMIT %s
"""
cursor.execute(query, (limit,))
detections = cursor.fetchall()
cursor.close()
conn.close()
print(f" Trovate {len(detections)} detection del modello v1.0.0")
return detections
def get_network_logs_for_ip(ip_address, days=7):
"""
Recupera i log di rete per un IP specifico (ultimi N giorni)
"""
conn = get_db_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
query = """
SELECT
timestamp,
source_ip,
destination_ip as dest_ip,
destination_port as dest_port,
protocol,
packet_length,
action
FROM network_logs
WHERE source_ip = %s
AND timestamp > NOW() - INTERVAL '1 day' * %s
ORDER BY timestamp DESC
LIMIT 10000
"""
cursor.execute(query, (ip_address, days))
rows = cursor.fetchall()
cursor.close()
conn.close()
return rows
def reanalyze_with_hybrid(detector, ip_address, old_detection):
"""
Rianalizza un IP con il nuovo Hybrid Detector
"""
# Recupera log per questo IP
logs = get_network_logs_for_ip(ip_address, days=7)
if not logs:
return None
df = pd.DataFrame(logs)
# Feature extraction (come nel detector)
features_df = detector.extract_features(df)
if len(features_df) == 0:
return None
# Prendi le feature dell'IP aggregato
ip_features = features_df.iloc[0:1]
# Rianalizza con nuovo modello
result = detector.detect(ip_features)
if not result or len(result) == 0:
return None
new_detection = result[0]
# Confronto
comparison = {
'ip_address': ip_address,
'logs_count': len(logs),
# Vecchio modello (v1.0.0)
'old_score': old_detection['anomaly_score'],
'old_risk_level': old_detection['risk_level'],
'old_blocked': old_detection['blocked'],
# Nuovo modello (v2.0.0)
'new_score': new_detection.get('anomaly_score', 0),
'new_risk_level': new_detection.get('risk_level', 'unknown'),
'new_confidence': new_detection.get('confidence', 'unknown'),
'new_is_anomaly': new_detection.get('is_anomaly', False),
# Delta
'score_delta': new_detection.get('anomaly_score', 0) - old_detection['anomaly_score'],
'risk_changed': old_detection['risk_level'] != new_detection.get('risk_level', 'unknown'),
}
return comparison
def main():
print("\n" + "="*80)
print(" IDS MODEL COMPARISON - v1.0.0 vs v2.0.0")
print("="*80)
# Carica vecchie detection
old_detections = load_old_detections(limit=50)
if not old_detections:
print("\n❌ Nessuna detection del vecchio modello trovata!")
return
# Carica nuovo modello Hybrid
print("\n[2] Caricamento nuovo Hybrid Detector (v2.0.0)...")
detector = MLHybridDetector(model_dir="models")
if not detector.load_models():
print("\n❌ Modelli Hybrid non trovati! Esegui prima il training:")
print(" sudo /opt/ids/deployment/run_ml_training.sh")
return
print(f" ✅ Hybrid Detector caricato (18 feature selezionate)")
# Rianalizza ogni IP con nuovo modello
print(f"\n[3] Rianalisi di {len(old_detections)} IP con nuovo modello...")
print(" (Questo può richiedere alcuni minuti...)")
comparisons = []
for i, old_det in enumerate(old_detections):
ip = old_det['ip_address']
print(f"\n [{i+1}/{len(old_detections)}] Analisi IP: {ip}")
print(f" Old: score={old_det['anomaly_score']:.1f}, risk={old_det['risk_level']}, blocked={old_det['blocked']}")
comparison = reanalyze_with_hybrid(detector, ip, old_det)
if comparison:
comparisons.append(comparison)
print(f" New: score={comparison['new_score']:.1f}, risk={comparison['new_risk_level']}, confidence={comparison['new_confidence']}")
print(f" Δ: {comparison['score_delta']:+.1f} score")
else:
print(f" ⚠ Nessun log recente trovato per questo IP")
# Riepilogo
print("\n" + "="*80)
print(" RISULTATI CONFRONTO")
print("="*80)
if not comparisons:
print("\n❌ Nessun IP rianalizzato (log non disponibili)")
return
df_comp = pd.DataFrame(comparisons)
# Statistiche
print(f"\nIP rianalizzati: {len(comparisons)}/{len(old_detections)}")
print(f"\nScore medio:")
print(f" Vecchio modello: {df_comp['old_score'].mean():.1f}")
print(f" Nuovo modello: {df_comp['new_score'].mean():.1f}")
print(f" Delta medio: {df_comp['score_delta'].mean():+.1f}")
# False Positives (vecchio modello flaggava, nuovo no)
false_positives = df_comp[
(df_comp['old_score'] >= 80) &
(~df_comp['new_is_anomaly'])
]
print(f"\n🎯 Possibili False Positives ridotti: {len(false_positives)}")
if len(false_positives) > 0:
print("\n IP che vecchio modello bloccava ma nuovo modello ritiene normali:")
for _, row in false_positives.iterrows():
print(f"{row['ip_address']} (old={row['old_score']:.0f}, new={row['new_score']:.0f})")
# True Positives confermati
true_positives = df_comp[
(df_comp['old_score'] >= 80) &
(df_comp['new_is_anomaly'])
]
print(f"\n✅ Anomalie confermate da entrambi i modelli: {len(true_positives)}")
# Confidence breakdown (solo nuovo modello)
if 'new_confidence' in df_comp.columns:
print(f"\n📊 Confidence Level distribuzione (nuovo modello):")
conf_counts = df_comp['new_confidence'].value_counts()
for conf, count in conf_counts.items():
print(f"{conf}: {count} IP")
# Risk level changes
risk_changes = df_comp[df_comp['risk_changed']]
print(f"\n🔄 IP con cambio livello di rischio: {len(risk_changes)}")
# Top 10 maggiori riduzioni score
print(f"\n📉 Top 10 riduzioni score (possibili FP corretti):")
top_reductions = df_comp.nsmallest(10, 'score_delta')
for i, row in enumerate(top_reductions.itertuples(), 1):
print(f" {i}. {row.ip_address}: {row.old_score:.0f}{row.new_score:.0f} ({row.score_delta:+.0f})")
# Top 10 maggiori aumenti score
print(f"\n📈 Top 10 aumenti score (nuove anomalie scoperte):")
top_increases = df_comp.nlargest(10, 'score_delta')
for i, row in enumerate(top_increases.itertuples(), 1):
print(f" {i}. {row.ip_address}: {row.old_score:.0f}{row.new_score:.0f} ({row.score_delta:+.0f})")
# Salva CSV per analisi dettagliata
output_file = f"model_comparison_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df_comp.to_csv(output_file, index=False)
print(f"\n💾 Risultati completi salvati in: {output_file}")
print("\n" + "="*80)
print("✅ Confronto completato!")
print("="*80 + "\n")
if __name__ == "__main__":
main()