Refactors storage to use a database backend, introducing schemas and functions for routers, network logs, detections, whitelist, and training history. Integrates Drizzle ORM with Neon Postgres for data persistence. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: intermediate_checkpoint Replit-Commit-Event-Id: 4e9219bb-e0f1-4799-bb3f-6c759dc16069 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/c9ITWqD
386 lines
16 KiB
Python
386 lines
16 KiB
Python
"""
|
|
IDS ML Analyzer - Sistema di analisi semplificato e veloce
|
|
Usa solo 25 feature mirate invece di 150+ per migliore performance e accuratezza
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from sklearn.ensemble import IsolationForest
|
|
from sklearn.preprocessing import StandardScaler
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Tuple
|
|
import joblib
|
|
import json
|
|
from pathlib import Path
|
|
|
|
class MLAnalyzer:
|
|
def __init__(self, model_dir: str = "models"):
|
|
self.model_dir = Path(model_dir)
|
|
self.model_dir.mkdir(exist_ok=True)
|
|
|
|
self.model = None
|
|
self.scaler = None
|
|
self.feature_names = []
|
|
|
|
def extract_features(self, logs_df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Estrae 25 feature mirate e performanti dai log
|
|
Focus su: volume, pattern temporali, comportamento protocolli
|
|
"""
|
|
if logs_df.empty:
|
|
return pd.DataFrame()
|
|
|
|
# Assicura che timestamp sia datetime
|
|
logs_df['timestamp'] = pd.to_datetime(logs_df['timestamp'])
|
|
|
|
# Raggruppa per source_ip
|
|
features_list = []
|
|
|
|
for source_ip, group in logs_df.groupby('source_ip'):
|
|
group = group.sort_values('timestamp')
|
|
|
|
# 1. VOLUME FEATURES (5 feature) - critiche per DDoS
|
|
total_packets = group['packets'].sum() if 'packets' in group.columns else len(group)
|
|
total_bytes = group['bytes'].sum() if 'bytes' in group.columns else 0
|
|
conn_count = len(group)
|
|
avg_packet_size = total_bytes / max(total_packets, 1)
|
|
bytes_per_second = total_bytes / max((group['timestamp'].max() - group['timestamp'].min()).total_seconds(), 1)
|
|
|
|
# 2. TEMPORAL FEATURES (8 feature) - pattern timing
|
|
time_span_seconds = (group['timestamp'].max() - group['timestamp'].min()).total_seconds()
|
|
conn_per_second = conn_count / max(time_span_seconds, 1)
|
|
hour_of_day = group['timestamp'].dt.hour.mode()[0] if len(group) > 0 else 0
|
|
day_of_week = group['timestamp'].dt.dayofweek.mode()[0] if len(group) > 0 else 0
|
|
|
|
# Burst detection - quante connessioni in finestre di 10 secondi
|
|
group['time_bucket'] = group['timestamp'].dt.floor('10s')
|
|
max_burst = group.groupby('time_bucket').size().max()
|
|
avg_burst = group.groupby('time_bucket').size().mean()
|
|
burst_variance = group.groupby('time_bucket').size().std()
|
|
|
|
# Intervalli tra connessioni
|
|
time_diffs = group['timestamp'].diff().dt.total_seconds().dropna()
|
|
avg_interval = time_diffs.mean() if len(time_diffs) > 0 else 0
|
|
|
|
# 3. PROTOCOL DIVERSITY (6 feature) - varietà protocolli
|
|
unique_protocols = group['protocol'].nunique() if 'protocol' in group.columns else 1
|
|
unique_dest_ports = group['dest_port'].nunique() if 'dest_port' in group.columns else 1
|
|
unique_dest_ips = group['dest_ip'].nunique() if 'dest_ip' in group.columns else 1
|
|
|
|
# Calcola entropia protocolli (più alto = più vario)
|
|
if 'protocol' in group.columns:
|
|
protocol_counts = group['protocol'].value_counts()
|
|
protocol_probs = protocol_counts / protocol_counts.sum()
|
|
protocol_entropy = -np.sum(protocol_probs * np.log2(protocol_probs + 1e-10))
|
|
else:
|
|
protocol_entropy = 0
|
|
|
|
# Rapporto TCP/UDP
|
|
if 'protocol' in group.columns:
|
|
tcp_ratio = (group['protocol'] == 'tcp').sum() / len(group)
|
|
udp_ratio = (group['protocol'] == 'udp').sum() / len(group)
|
|
else:
|
|
tcp_ratio = udp_ratio = 0
|
|
|
|
# 4. PORT SCANNING DETECTION (3 feature)
|
|
if 'dest_port' in group.columns:
|
|
unique_ports_contacted = group['dest_port'].nunique()
|
|
port_scan_score = unique_ports_contacted / max(conn_count, 1)
|
|
sequential_ports = 0
|
|
sorted_ports = sorted(group['dest_port'].dropna().unique())
|
|
for i in range(len(sorted_ports) - 1):
|
|
if sorted_ports[i+1] - sorted_ports[i] == 1:
|
|
sequential_ports += 1
|
|
else:
|
|
unique_ports_contacted = 0
|
|
port_scan_score = 0
|
|
sequential_ports = 0
|
|
|
|
# 5. BEHAVIORAL ANOMALIES (3 feature)
|
|
# Rapporto pacchetti/connessioni
|
|
packets_per_conn = total_packets / max(conn_count, 1)
|
|
|
|
# Variazione dimensione pacchetti
|
|
if 'bytes' in group.columns and 'packets' in group.columns:
|
|
group['packet_size'] = group['bytes'] / group['packets'].replace(0, 1)
|
|
packet_size_variance = group['packet_size'].std()
|
|
else:
|
|
packet_size_variance = 0
|
|
|
|
# Azioni bloccate vs permesse
|
|
if 'action' in group.columns:
|
|
blocked_ratio = (group['action'].str.contains('drop|reject|deny', case=False, na=False)).sum() / len(group)
|
|
else:
|
|
blocked_ratio = 0
|
|
|
|
features = {
|
|
'source_ip': source_ip,
|
|
# Volume
|
|
'total_packets': total_packets,
|
|
'total_bytes': total_bytes,
|
|
'conn_count': conn_count,
|
|
'avg_packet_size': avg_packet_size,
|
|
'bytes_per_second': bytes_per_second,
|
|
# Temporal
|
|
'time_span_seconds': time_span_seconds,
|
|
'conn_per_second': conn_per_second,
|
|
'hour_of_day': hour_of_day,
|
|
'day_of_week': day_of_week,
|
|
'max_burst': max_burst,
|
|
'avg_burst': avg_burst,
|
|
'burst_variance': burst_variance if not np.isnan(burst_variance) else 0,
|
|
'avg_interval': avg_interval,
|
|
# Protocol diversity
|
|
'unique_protocols': unique_protocols,
|
|
'unique_dest_ports': unique_dest_ports,
|
|
'unique_dest_ips': unique_dest_ips,
|
|
'protocol_entropy': protocol_entropy,
|
|
'tcp_ratio': tcp_ratio,
|
|
'udp_ratio': udp_ratio,
|
|
# Port scanning
|
|
'unique_ports_contacted': unique_ports_contacted,
|
|
'port_scan_score': port_scan_score,
|
|
'sequential_ports': sequential_ports,
|
|
# Behavioral
|
|
'packets_per_conn': packets_per_conn,
|
|
'packet_size_variance': packet_size_variance if not np.isnan(packet_size_variance) else 0,
|
|
'blocked_ratio': blocked_ratio,
|
|
}
|
|
|
|
features_list.append(features)
|
|
|
|
return pd.DataFrame(features_list)
|
|
|
|
def train(self, logs_df: pd.DataFrame, contamination: float = 0.01) -> Dict:
|
|
"""
|
|
Addestra il modello Isolation Forest
|
|
contamination: percentuale attesa di anomalie (default 1%)
|
|
"""
|
|
print(f"[TRAINING] Estrazione feature da {len(logs_df)} log...")
|
|
features_df = self.extract_features(logs_df)
|
|
|
|
if features_df.empty:
|
|
raise ValueError("Nessuna feature estratta dai log")
|
|
|
|
print(f"[TRAINING] Feature estratte per {len(features_df)} IP unici")
|
|
|
|
# Salva source_ip separatamente
|
|
source_ips = features_df['source_ip'].values
|
|
|
|
# Rimuovi colonna source_ip per training
|
|
X = features_df.drop('source_ip', axis=1)
|
|
self.feature_names = X.columns.tolist()
|
|
|
|
# Normalizza features
|
|
print("[TRAINING] Normalizzazione features...")
|
|
self.scaler = StandardScaler()
|
|
X_scaled = self.scaler.fit_transform(X)
|
|
|
|
# Addestra Isolation Forest
|
|
print(f"[TRAINING] Addestramento Isolation Forest (contamination={contamination})...")
|
|
self.model = IsolationForest(
|
|
contamination=contamination,
|
|
random_state=42,
|
|
n_estimators=100,
|
|
max_samples='auto',
|
|
n_jobs=-1
|
|
)
|
|
self.model.fit(X_scaled)
|
|
|
|
# Salva modello
|
|
self.save_model()
|
|
|
|
# Calcola statistiche
|
|
predictions = self.model.predict(X_scaled)
|
|
anomalies = (predictions == -1).sum()
|
|
|
|
result = {
|
|
'records_processed': len(logs_df),
|
|
'unique_ips': len(features_df),
|
|
'features_count': len(self.feature_names),
|
|
'anomalies_detected': int(anomalies),
|
|
'contamination': contamination,
|
|
'status': 'success'
|
|
}
|
|
|
|
print(f"[TRAINING] Completato! {anomalies}/{len(features_df)} IP anomali rilevati")
|
|
return result
|
|
|
|
def detect(self, logs_df: pd.DataFrame, risk_threshold: float = 60.0) -> List[Dict]:
|
|
"""
|
|
Rileva anomalie nei log
|
|
risk_threshold: soglia minima di rischio per segnalare (0-100)
|
|
"""
|
|
if self.model is None or self.scaler is None:
|
|
raise ValueError("Modello non addestrato. Esegui train() prima.")
|
|
|
|
features_df = self.extract_features(logs_df)
|
|
|
|
if features_df.empty:
|
|
return []
|
|
|
|
source_ips = features_df['source_ip'].values
|
|
X = features_df.drop('source_ip', axis=1)
|
|
X_scaled = self.scaler.transform(X)
|
|
|
|
# Predizioni
|
|
predictions = self.model.predict(X_scaled)
|
|
scores = self.model.score_samples(X_scaled)
|
|
|
|
# Normalizza score a 0-100 (score più negativo = più anomalo)
|
|
score_min, score_max = scores.min(), scores.max()
|
|
risk_scores = 100 * (1 - (scores - score_min) / (score_max - score_min + 1e-10))
|
|
|
|
detections = []
|
|
for i, (ip, pred, risk_score) in enumerate(zip(source_ips, predictions, risk_scores)):
|
|
if pred == -1 and risk_score >= risk_threshold:
|
|
# Trova log per questo IP
|
|
ip_logs = logs_df[logs_df['source_ip'] == ip]
|
|
|
|
# Determina tipo anomalia basato su feature
|
|
features = features_df.iloc[i]
|
|
anomaly_type = self._classify_anomaly(features)
|
|
reason = self._generate_reason(features, anomaly_type)
|
|
|
|
# Calcola confidence basato su quanto è lontano dalla soglia
|
|
confidence = min(100, risk_score)
|
|
|
|
detection = {
|
|
'source_ip': ip,
|
|
'risk_score': float(risk_score),
|
|
'confidence': float(confidence),
|
|
'anomaly_type': anomaly_type,
|
|
'reason': reason,
|
|
'log_count': len(ip_logs),
|
|
'first_seen': ip_logs['timestamp'].min().isoformat(),
|
|
'last_seen': ip_logs['timestamp'].max().isoformat(),
|
|
}
|
|
detections.append(detection)
|
|
|
|
# Ordina per risk_score decrescente
|
|
detections.sort(key=lambda x: x['risk_score'], reverse=True)
|
|
return detections
|
|
|
|
def _classify_anomaly(self, features: pd.Series) -> str:
|
|
"""Classifica il tipo di anomalia basato sulle feature"""
|
|
# DDoS: alto volume, alta frequenza
|
|
if features['bytes_per_second'] > 1000000 or features['conn_per_second'] > 100:
|
|
return 'ddos'
|
|
|
|
# Port scanning: molte porte uniche, porte sequenziali
|
|
if features['port_scan_score'] > 0.5 or features['sequential_ports'] > 10:
|
|
return 'port_scan'
|
|
|
|
# Brute force: molte connessioni a stessa porta
|
|
if features['conn_per_second'] > 10 and features['unique_dest_ports'] < 3:
|
|
return 'brute_force'
|
|
|
|
# Botnet: pattern temporali regolari, bassa varianza burst
|
|
if features['burst_variance'] < 1 and features['conn_count'] > 50:
|
|
return 'botnet'
|
|
|
|
return 'suspicious'
|
|
|
|
def _generate_reason(self, features: pd.Series, anomaly_type: str) -> str:
|
|
"""Genera una ragione leggibile per l'anomalia"""
|
|
reasons = []
|
|
|
|
if features['bytes_per_second'] > 1000000:
|
|
reasons.append(f"Alto throughput ({features['bytes_per_second']/1e6:.1f} MB/s)")
|
|
|
|
if features['conn_per_second'] > 100:
|
|
reasons.append(f"Alta frequenza connessioni ({features['conn_per_second']:.0f} conn/s)")
|
|
|
|
if features['port_scan_score'] > 0.5:
|
|
reasons.append(f"Scanning {features['unique_ports_contacted']:.0f} porte")
|
|
|
|
if features['max_burst'] > 100:
|
|
reasons.append(f"Burst anomali (max {features['max_burst']:.0f} conn/10s)")
|
|
|
|
if not reasons:
|
|
reasons.append(f"Comportamento anomalo ({anomaly_type})")
|
|
|
|
return "; ".join(reasons)
|
|
|
|
def save_model(self):
|
|
"""Salva modello e scaler su disco"""
|
|
model_path = self.model_dir / "isolation_forest.joblib"
|
|
scaler_path = self.model_dir / "scaler.joblib"
|
|
features_path = self.model_dir / "feature_names.json"
|
|
|
|
joblib.dump(self.model, model_path)
|
|
joblib.dump(self.scaler, scaler_path)
|
|
|
|
with open(features_path, 'w') as f:
|
|
json.dump({'features': self.feature_names}, f)
|
|
|
|
print(f"[SAVE] Modello salvato in {self.model_dir}")
|
|
|
|
def load_model(self) -> bool:
|
|
"""Carica modello e scaler da disco"""
|
|
model_path = self.model_dir / "isolation_forest.joblib"
|
|
scaler_path = self.model_dir / "scaler.joblib"
|
|
features_path = self.model_dir / "feature_names.json"
|
|
|
|
if not all(p.exists() for p in [model_path, scaler_path, features_path]):
|
|
return False
|
|
|
|
self.model = joblib.load(model_path)
|
|
self.scaler = joblib.load(scaler_path)
|
|
|
|
with open(features_path, 'r') as f:
|
|
data = json.load(f)
|
|
self.feature_names = data['features']
|
|
|
|
print(f"[LOAD] Modello caricato da {self.model_dir}")
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test con dati demo
|
|
print("=== TEST ML ANALYZER ===\n")
|
|
|
|
# Crea dati demo
|
|
demo_logs = []
|
|
base_time = datetime.now()
|
|
|
|
# Traffico normale
|
|
for i in range(100):
|
|
demo_logs.append({
|
|
'source_ip': f'192.168.1.{i % 10 + 1}',
|
|
'timestamp': base_time + timedelta(seconds=i * 10),
|
|
'dest_ip': '8.8.8.8',
|
|
'dest_port': 80 if i % 2 == 0 else 443,
|
|
'protocol': 'tcp',
|
|
'bytes': 1000 + i * 10,
|
|
'packets': 10 + i,
|
|
'action': 'accept'
|
|
})
|
|
|
|
# Traffico anomalo (DDoS simulation)
|
|
for i in range(1000):
|
|
demo_logs.append({
|
|
'source_ip': '10.0.0.100',
|
|
'timestamp': base_time + timedelta(seconds=i),
|
|
'dest_ip': '192.168.1.1',
|
|
'dest_port': 80,
|
|
'protocol': 'tcp',
|
|
'bytes': 100,
|
|
'packets': 1,
|
|
'action': 'accept'
|
|
})
|
|
|
|
df = pd.DataFrame(demo_logs)
|
|
|
|
# Test training
|
|
analyzer = MLAnalyzer()
|
|
result = analyzer.train(df, contamination=0.05)
|
|
print(f"\n✅ Training completato: {result}")
|
|
|
|
# Test detection
|
|
detections = analyzer.detect(df, risk_threshold=60)
|
|
print(f"\n🚨 Rilevate {len(detections)} anomalie:")
|
|
for det in detections[:5]:
|
|
print(f" - {det['source_ip']}: {det['anomaly_type']} (risk: {det['risk_score']:.1f})")
|
|
print(f" Motivo: {det['reason']}")
|