ids.alfacom.it/python_ml/ml_analyzer.py
marco370 ac9c35b61f Add database storage for network data and router management
Refactors storage to use a database backend, introducing schemas and functions for routers, network logs, detections, whitelist, and training history. Integrates Drizzle ORM with Neon Postgres for data persistence.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: intermediate_checkpoint
Replit-Commit-Event-Id: 4e9219bb-e0f1-4799-bb3f-6c759dc16069
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/c9ITWqD
2025-11-15 11:12:44 +00:00

386 lines
16 KiB
Python

"""
IDS ML Analyzer - Sistema di analisi semplificato e veloce
Usa solo 25 feature mirate invece di 150+ per migliore performance e accuratezza
"""
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
import joblib
import json
from pathlib import Path
class MLAnalyzer:
def __init__(self, model_dir: str = "models"):
self.model_dir = Path(model_dir)
self.model_dir.mkdir(exist_ok=True)
self.model = None
self.scaler = None
self.feature_names = []
def extract_features(self, logs_df: pd.DataFrame) -> pd.DataFrame:
"""
Estrae 25 feature mirate e performanti dai log
Focus su: volume, pattern temporali, comportamento protocolli
"""
if logs_df.empty:
return pd.DataFrame()
# Assicura che timestamp sia datetime
logs_df['timestamp'] = pd.to_datetime(logs_df['timestamp'])
# Raggruppa per source_ip
features_list = []
for source_ip, group in logs_df.groupby('source_ip'):
group = group.sort_values('timestamp')
# 1. VOLUME FEATURES (5 feature) - critiche per DDoS
total_packets = group['packets'].sum() if 'packets' in group.columns else len(group)
total_bytes = group['bytes'].sum() if 'bytes' in group.columns else 0
conn_count = len(group)
avg_packet_size = total_bytes / max(total_packets, 1)
bytes_per_second = total_bytes / max((group['timestamp'].max() - group['timestamp'].min()).total_seconds(), 1)
# 2. TEMPORAL FEATURES (8 feature) - pattern timing
time_span_seconds = (group['timestamp'].max() - group['timestamp'].min()).total_seconds()
conn_per_second = conn_count / max(time_span_seconds, 1)
hour_of_day = group['timestamp'].dt.hour.mode()[0] if len(group) > 0 else 0
day_of_week = group['timestamp'].dt.dayofweek.mode()[0] if len(group) > 0 else 0
# Burst detection - quante connessioni in finestre di 10 secondi
group['time_bucket'] = group['timestamp'].dt.floor('10s')
max_burst = group.groupby('time_bucket').size().max()
avg_burst = group.groupby('time_bucket').size().mean()
burst_variance = group.groupby('time_bucket').size().std()
# Intervalli tra connessioni
time_diffs = group['timestamp'].diff().dt.total_seconds().dropna()
avg_interval = time_diffs.mean() if len(time_diffs) > 0 else 0
# 3. PROTOCOL DIVERSITY (6 feature) - varietà protocolli
unique_protocols = group['protocol'].nunique() if 'protocol' in group.columns else 1
unique_dest_ports = group['dest_port'].nunique() if 'dest_port' in group.columns else 1
unique_dest_ips = group['dest_ip'].nunique() if 'dest_ip' in group.columns else 1
# Calcola entropia protocolli (più alto = più vario)
if 'protocol' in group.columns:
protocol_counts = group['protocol'].value_counts()
protocol_probs = protocol_counts / protocol_counts.sum()
protocol_entropy = -np.sum(protocol_probs * np.log2(protocol_probs + 1e-10))
else:
protocol_entropy = 0
# Rapporto TCP/UDP
if 'protocol' in group.columns:
tcp_ratio = (group['protocol'] == 'tcp').sum() / len(group)
udp_ratio = (group['protocol'] == 'udp').sum() / len(group)
else:
tcp_ratio = udp_ratio = 0
# 4. PORT SCANNING DETECTION (3 feature)
if 'dest_port' in group.columns:
unique_ports_contacted = group['dest_port'].nunique()
port_scan_score = unique_ports_contacted / max(conn_count, 1)
sequential_ports = 0
sorted_ports = sorted(group['dest_port'].dropna().unique())
for i in range(len(sorted_ports) - 1):
if sorted_ports[i+1] - sorted_ports[i] == 1:
sequential_ports += 1
else:
unique_ports_contacted = 0
port_scan_score = 0
sequential_ports = 0
# 5. BEHAVIORAL ANOMALIES (3 feature)
# Rapporto pacchetti/connessioni
packets_per_conn = total_packets / max(conn_count, 1)
# Variazione dimensione pacchetti
if 'bytes' in group.columns and 'packets' in group.columns:
group['packet_size'] = group['bytes'] / group['packets'].replace(0, 1)
packet_size_variance = group['packet_size'].std()
else:
packet_size_variance = 0
# Azioni bloccate vs permesse
if 'action' in group.columns:
blocked_ratio = (group['action'].str.contains('drop|reject|deny', case=False, na=False)).sum() / len(group)
else:
blocked_ratio = 0
features = {
'source_ip': source_ip,
# Volume
'total_packets': total_packets,
'total_bytes': total_bytes,
'conn_count': conn_count,
'avg_packet_size': avg_packet_size,
'bytes_per_second': bytes_per_second,
# Temporal
'time_span_seconds': time_span_seconds,
'conn_per_second': conn_per_second,
'hour_of_day': hour_of_day,
'day_of_week': day_of_week,
'max_burst': max_burst,
'avg_burst': avg_burst,
'burst_variance': burst_variance if not np.isnan(burst_variance) else 0,
'avg_interval': avg_interval,
# Protocol diversity
'unique_protocols': unique_protocols,
'unique_dest_ports': unique_dest_ports,
'unique_dest_ips': unique_dest_ips,
'protocol_entropy': protocol_entropy,
'tcp_ratio': tcp_ratio,
'udp_ratio': udp_ratio,
# Port scanning
'unique_ports_contacted': unique_ports_contacted,
'port_scan_score': port_scan_score,
'sequential_ports': sequential_ports,
# Behavioral
'packets_per_conn': packets_per_conn,
'packet_size_variance': packet_size_variance if not np.isnan(packet_size_variance) else 0,
'blocked_ratio': blocked_ratio,
}
features_list.append(features)
return pd.DataFrame(features_list)
def train(self, logs_df: pd.DataFrame, contamination: float = 0.01) -> Dict:
"""
Addestra il modello Isolation Forest
contamination: percentuale attesa di anomalie (default 1%)
"""
print(f"[TRAINING] Estrazione feature da {len(logs_df)} log...")
features_df = self.extract_features(logs_df)
if features_df.empty:
raise ValueError("Nessuna feature estratta dai log")
print(f"[TRAINING] Feature estratte per {len(features_df)} IP unici")
# Salva source_ip separatamente
source_ips = features_df['source_ip'].values
# Rimuovi colonna source_ip per training
X = features_df.drop('source_ip', axis=1)
self.feature_names = X.columns.tolist()
# Normalizza features
print("[TRAINING] Normalizzazione features...")
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
# Addestra Isolation Forest
print(f"[TRAINING] Addestramento Isolation Forest (contamination={contamination})...")
self.model = IsolationForest(
contamination=contamination,
random_state=42,
n_estimators=100,
max_samples='auto',
n_jobs=-1
)
self.model.fit(X_scaled)
# Salva modello
self.save_model()
# Calcola statistiche
predictions = self.model.predict(X_scaled)
anomalies = (predictions == -1).sum()
result = {
'records_processed': len(logs_df),
'unique_ips': len(features_df),
'features_count': len(self.feature_names),
'anomalies_detected': int(anomalies),
'contamination': contamination,
'status': 'success'
}
print(f"[TRAINING] Completato! {anomalies}/{len(features_df)} IP anomali rilevati")
return result
def detect(self, logs_df: pd.DataFrame, risk_threshold: float = 60.0) -> List[Dict]:
"""
Rileva anomalie nei log
risk_threshold: soglia minima di rischio per segnalare (0-100)
"""
if self.model is None or self.scaler is None:
raise ValueError("Modello non addestrato. Esegui train() prima.")
features_df = self.extract_features(logs_df)
if features_df.empty:
return []
source_ips = features_df['source_ip'].values
X = features_df.drop('source_ip', axis=1)
X_scaled = self.scaler.transform(X)
# Predizioni
predictions = self.model.predict(X_scaled)
scores = self.model.score_samples(X_scaled)
# Normalizza score a 0-100 (score più negativo = più anomalo)
score_min, score_max = scores.min(), scores.max()
risk_scores = 100 * (1 - (scores - score_min) / (score_max - score_min + 1e-10))
detections = []
for i, (ip, pred, risk_score) in enumerate(zip(source_ips, predictions, risk_scores)):
if pred == -1 and risk_score >= risk_threshold:
# Trova log per questo IP
ip_logs = logs_df[logs_df['source_ip'] == ip]
# Determina tipo anomalia basato su feature
features = features_df.iloc[i]
anomaly_type = self._classify_anomaly(features)
reason = self._generate_reason(features, anomaly_type)
# Calcola confidence basato su quanto è lontano dalla soglia
confidence = min(100, risk_score)
detection = {
'source_ip': ip,
'risk_score': float(risk_score),
'confidence': float(confidence),
'anomaly_type': anomaly_type,
'reason': reason,
'log_count': len(ip_logs),
'first_seen': ip_logs['timestamp'].min().isoformat(),
'last_seen': ip_logs['timestamp'].max().isoformat(),
}
detections.append(detection)
# Ordina per risk_score decrescente
detections.sort(key=lambda x: x['risk_score'], reverse=True)
return detections
def _classify_anomaly(self, features: pd.Series) -> str:
"""Classifica il tipo di anomalia basato sulle feature"""
# DDoS: alto volume, alta frequenza
if features['bytes_per_second'] > 1000000 or features['conn_per_second'] > 100:
return 'ddos'
# Port scanning: molte porte uniche, porte sequenziali
if features['port_scan_score'] > 0.5 or features['sequential_ports'] > 10:
return 'port_scan'
# Brute force: molte connessioni a stessa porta
if features['conn_per_second'] > 10 and features['unique_dest_ports'] < 3:
return 'brute_force'
# Botnet: pattern temporali regolari, bassa varianza burst
if features['burst_variance'] < 1 and features['conn_count'] > 50:
return 'botnet'
return 'suspicious'
def _generate_reason(self, features: pd.Series, anomaly_type: str) -> str:
"""Genera una ragione leggibile per l'anomalia"""
reasons = []
if features['bytes_per_second'] > 1000000:
reasons.append(f"Alto throughput ({features['bytes_per_second']/1e6:.1f} MB/s)")
if features['conn_per_second'] > 100:
reasons.append(f"Alta frequenza connessioni ({features['conn_per_second']:.0f} conn/s)")
if features['port_scan_score'] > 0.5:
reasons.append(f"Scanning {features['unique_ports_contacted']:.0f} porte")
if features['max_burst'] > 100:
reasons.append(f"Burst anomali (max {features['max_burst']:.0f} conn/10s)")
if not reasons:
reasons.append(f"Comportamento anomalo ({anomaly_type})")
return "; ".join(reasons)
def save_model(self):
"""Salva modello e scaler su disco"""
model_path = self.model_dir / "isolation_forest.joblib"
scaler_path = self.model_dir / "scaler.joblib"
features_path = self.model_dir / "feature_names.json"
joblib.dump(self.model, model_path)
joblib.dump(self.scaler, scaler_path)
with open(features_path, 'w') as f:
json.dump({'features': self.feature_names}, f)
print(f"[SAVE] Modello salvato in {self.model_dir}")
def load_model(self) -> bool:
"""Carica modello e scaler da disco"""
model_path = self.model_dir / "isolation_forest.joblib"
scaler_path = self.model_dir / "scaler.joblib"
features_path = self.model_dir / "feature_names.json"
if not all(p.exists() for p in [model_path, scaler_path, features_path]):
return False
self.model = joblib.load(model_path)
self.scaler = joblib.load(scaler_path)
with open(features_path, 'r') as f:
data = json.load(f)
self.feature_names = data['features']
print(f"[LOAD] Modello caricato da {self.model_dir}")
return True
if __name__ == "__main__":
# Test con dati demo
print("=== TEST ML ANALYZER ===\n")
# Crea dati demo
demo_logs = []
base_time = datetime.now()
# Traffico normale
for i in range(100):
demo_logs.append({
'source_ip': f'192.168.1.{i % 10 + 1}',
'timestamp': base_time + timedelta(seconds=i * 10),
'dest_ip': '8.8.8.8',
'dest_port': 80 if i % 2 == 0 else 443,
'protocol': 'tcp',
'bytes': 1000 + i * 10,
'packets': 10 + i,
'action': 'accept'
})
# Traffico anomalo (DDoS simulation)
for i in range(1000):
demo_logs.append({
'source_ip': '10.0.0.100',
'timestamp': base_time + timedelta(seconds=i),
'dest_ip': '192.168.1.1',
'dest_port': 80,
'protocol': 'tcp',
'bytes': 100,
'packets': 1,
'action': 'accept'
})
df = pd.DataFrame(demo_logs)
# Test training
analyzer = MLAnalyzer()
result = analyzer.train(df, contamination=0.05)
print(f"\n✅ Training completato: {result}")
# Test detection
detections = analyzer.detect(df, risk_threshold=60)
print(f"\n🚨 Rilevate {len(detections)} anomalie:")
for det in detections[:5]:
print(f" - {det['source_ip']}: {det['anomaly_type']} (risk: {det['risk_score']:.1f})")
print(f" Motivo: {det['reason']}")