Add exception handling to the model training process to log failures and improve robustness. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: intermediate_checkpoint Replit-Commit-Event-Id: 9c7ad6b8-3e9d-41fe-83f7-6b2a48f8ff44 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/2lUhxO2
706 lines
32 KiB
Python
706 lines
32 KiB
Python
"""
|
|
IDS Hybrid ML Detector - Production-Grade System
|
|
Combines Extended Isolation Forest, Feature Selection, and Ensemble Classifier
|
|
Validated with CICIDS2017 dataset for high precision and low false positives
|
|
"""
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
|
|
from sklearn.tree import DecisionTreeClassifier
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.feature_selection import SelectKBest, chi2
|
|
from xgboost import XGBClassifier
|
|
try:
|
|
from eif import ExtendedIsolationForest
|
|
EIF_AVAILABLE = True
|
|
except ImportError:
|
|
from sklearn.ensemble import IsolationForest
|
|
EIF_AVAILABLE = False
|
|
print("[WARNING] Extended Isolation Forest not available, using standard IF")
|
|
|
|
from typing import List, Dict, Tuple, Optional, Literal
|
|
import joblib
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
|
|
class MLHybridDetector:
|
|
"""
|
|
Hybrid ML Detector combining multiple techniques:
|
|
1. Extended Isolation Forest for unsupervised anomaly detection
|
|
2. Chi-Square feature selection for optimal feature subset
|
|
3. DRX Ensemble (DT+RF+XGBoost) for robust classification
|
|
4. Confidence scoring system (High/Medium/Low)
|
|
"""
|
|
|
|
def __init__(self, model_dir: str = "models"):
|
|
self.model_dir = Path(model_dir)
|
|
self.model_dir.mkdir(exist_ok=True)
|
|
|
|
# Models
|
|
self.isolation_forest = None
|
|
self.ensemble_classifier = None
|
|
self.feature_selector = None
|
|
self.scaler = None
|
|
|
|
# Feature metadata
|
|
self.feature_names = []
|
|
self.selected_feature_names = []
|
|
self.feature_importances = {}
|
|
|
|
# Configuration
|
|
self.config = {
|
|
# Extended Isolation Forest tuning
|
|
'eif_n_estimators': 250,
|
|
'eif_contamination': 0.03, # 3% expected anomalies (tuned from research)
|
|
'eif_max_samples': 256,
|
|
'eif_max_features': 0.8, # Feature diversity
|
|
'eif_extension_level': 0, # EIF-specific
|
|
|
|
# Feature Selection
|
|
'chi2_top_k': 18, # Top 18 most relevant features
|
|
|
|
# Ensemble configuration
|
|
'dt_max_depth': 10,
|
|
'rf_n_estimators': 100,
|
|
'rf_max_depth': 15,
|
|
'xgb_n_estimators': 100,
|
|
'xgb_max_depth': 7,
|
|
'xgb_learning_rate': 0.1,
|
|
|
|
# Voting weights (DT:RF:XGB = 1:2:2)
|
|
'voting_weights': [1, 2, 2],
|
|
|
|
# Confidence thresholds
|
|
'confidence_high': 95.0, # Auto-block
|
|
'confidence_medium': 70.0, # Alert for review
|
|
}
|
|
|
|
# Validation metrics (populated after validation)
|
|
self.metrics = {
|
|
'precision': None,
|
|
'recall': None,
|
|
'f1_score': None,
|
|
'false_positive_rate': None,
|
|
'accuracy': None,
|
|
}
|
|
|
|
def extract_features(self, logs_df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Extract 25 targeted features from network logs
|
|
Optimized for MikroTik syslog data
|
|
"""
|
|
if logs_df.empty:
|
|
return pd.DataFrame()
|
|
|
|
logs_df['timestamp'] = pd.to_datetime(logs_df['timestamp'])
|
|
features_list = []
|
|
|
|
for source_ip, group in logs_df.groupby('source_ip'):
|
|
group = group.sort_values('timestamp')
|
|
|
|
# Volume features (5)
|
|
total_packets = group['packets'].sum() if 'packets' in group.columns else len(group)
|
|
total_bytes = group['bytes'].sum() if 'bytes' in group.columns else 0
|
|
conn_count = len(group)
|
|
avg_packet_size = total_bytes / max(total_packets, 1)
|
|
bytes_per_second = total_bytes / max((group['timestamp'].max() - group['timestamp'].min()).total_seconds(), 1)
|
|
|
|
# Temporal features (8)
|
|
time_span_seconds = (group['timestamp'].max() - group['timestamp'].min()).total_seconds()
|
|
conn_per_second = conn_count / max(time_span_seconds, 1)
|
|
hour_of_day = group['timestamp'].dt.hour.mode()[0] if len(group) > 0 else 0
|
|
day_of_week = group['timestamp'].dt.dayofweek.mode()[0] if len(group) > 0 else 0
|
|
|
|
group['time_bucket'] = group['timestamp'].dt.floor('10s')
|
|
max_burst = group.groupby('time_bucket').size().max()
|
|
avg_burst = group.groupby('time_bucket').size().mean()
|
|
burst_variance = group.groupby('time_bucket').size().std()
|
|
|
|
time_diffs = group['timestamp'].diff().dt.total_seconds().dropna()
|
|
avg_interval = time_diffs.mean() if len(time_diffs) > 0 else 0
|
|
|
|
# Protocol diversity (6)
|
|
unique_protocols = group['protocol'].nunique() if 'protocol' in group.columns else 1
|
|
unique_dest_ports = group['dest_port'].nunique() if 'dest_port' in group.columns else 1
|
|
unique_dest_ips = group['dest_ip'].nunique() if 'dest_ip' in group.columns else 1
|
|
|
|
if 'protocol' in group.columns:
|
|
protocol_counts = group['protocol'].value_counts()
|
|
protocol_probs = protocol_counts / protocol_counts.sum()
|
|
protocol_entropy = -np.sum(protocol_probs * np.log2(protocol_probs + 1e-10))
|
|
tcp_ratio = (group['protocol'] == 'tcp').sum() / len(group)
|
|
udp_ratio = (group['protocol'] == 'udp').sum() / len(group)
|
|
else:
|
|
protocol_entropy = tcp_ratio = udp_ratio = 0
|
|
|
|
# Port scanning detection (3)
|
|
if 'dest_port' in group.columns:
|
|
unique_ports_contacted = group['dest_port'].nunique()
|
|
port_scan_score = unique_ports_contacted / max(conn_count, 1)
|
|
sorted_ports = sorted(group['dest_port'].dropna().unique())
|
|
sequential_ports = sum(1 for i in range(len(sorted_ports)-1) if sorted_ports[i+1] - sorted_ports[i] == 1)
|
|
else:
|
|
unique_ports_contacted = port_scan_score = sequential_ports = 0
|
|
|
|
# Behavioral anomalies (3)
|
|
packets_per_conn = total_packets / max(conn_count, 1)
|
|
|
|
if 'bytes' in group.columns and 'packets' in group.columns:
|
|
group['packet_size'] = group['bytes'] / group['packets'].replace(0, 1)
|
|
packet_size_variance = group['packet_size'].std()
|
|
else:
|
|
packet_size_variance = 0
|
|
|
|
if 'action' in group.columns:
|
|
blocked_ratio = (group['action'].str.contains('drop|reject|deny', case=False, na=False)).sum() / len(group)
|
|
else:
|
|
blocked_ratio = 0
|
|
|
|
features = {
|
|
'source_ip': source_ip,
|
|
'total_packets': total_packets,
|
|
'total_bytes': total_bytes,
|
|
'conn_count': conn_count,
|
|
'avg_packet_size': avg_packet_size,
|
|
'bytes_per_second': bytes_per_second,
|
|
'time_span_seconds': time_span_seconds,
|
|
'conn_per_second': conn_per_second,
|
|
'hour_of_day': hour_of_day,
|
|
'day_of_week': day_of_week,
|
|
'max_burst': max_burst,
|
|
'avg_burst': avg_burst,
|
|
'burst_variance': burst_variance if not np.isnan(burst_variance) else 0,
|
|
'avg_interval': avg_interval,
|
|
'unique_protocols': unique_protocols,
|
|
'unique_dest_ports': unique_dest_ports,
|
|
'unique_dest_ips': unique_dest_ips,
|
|
'protocol_entropy': protocol_entropy,
|
|
'tcp_ratio': tcp_ratio,
|
|
'udp_ratio': udp_ratio,
|
|
'unique_ports_contacted': unique_ports_contacted,
|
|
'port_scan_score': port_scan_score,
|
|
'sequential_ports': sequential_ports,
|
|
'packets_per_conn': packets_per_conn,
|
|
'packet_size_variance': packet_size_variance if not np.isnan(packet_size_variance) else 0,
|
|
'blocked_ratio': blocked_ratio,
|
|
}
|
|
|
|
features_list.append(features)
|
|
|
|
return pd.DataFrame(features_list)
|
|
|
|
def train_unsupervised(self, logs_df: pd.DataFrame) -> Dict:
|
|
"""
|
|
Train Hybrid System:
|
|
1. Extended Isolation Forest (unsupervised)
|
|
2. Pseudo-labeling from IF predictions
|
|
3. Ensemble Classifier (DT+RF+XGB) on pseudo-labels
|
|
"""
|
|
print(f"[HYBRID] Training hybrid model on {len(logs_df)} logs...")
|
|
|
|
features_df = self.extract_features(logs_df)
|
|
if features_df.empty:
|
|
raise ValueError("No features extracted")
|
|
|
|
print(f"[HYBRID] Extracted features for {len(features_df)} unique IPs")
|
|
|
|
# Separate source_ip
|
|
X = features_df.drop('source_ip', axis=1)
|
|
self.feature_names = X.columns.tolist()
|
|
|
|
# STEP 1: Initial IF training for pseudo-labels
|
|
print("[HYBRID] Pre-training Isolation Forest for feature selection...")
|
|
|
|
# Ensure non-negative values
|
|
X_positive = X.clip(lower=0) + 1e-10
|
|
|
|
# Normalize for initial IF
|
|
temp_scaler = StandardScaler()
|
|
X_temp_scaled = temp_scaler.fit_transform(X_positive)
|
|
|
|
# Train temporary IF for pseudo-labeling
|
|
if EIF_AVAILABLE:
|
|
temp_if = ExtendedIsolationForest(
|
|
n_estimators=100, # Faster pre-training
|
|
contamination=self.config['eif_contamination'],
|
|
random_state=42
|
|
)
|
|
else:
|
|
temp_if = IsolationForest(
|
|
n_estimators=100,
|
|
contamination=self.config['eif_contamination'],
|
|
random_state=42,
|
|
n_jobs=-1
|
|
)
|
|
|
|
temp_if.fit(X_temp_scaled)
|
|
temp_predictions = temp_if.predict(X_temp_scaled)
|
|
|
|
# Use IF predictions as pseudo-labels for feature selection
|
|
y_pseudo_select = (temp_predictions == -1).astype(int)
|
|
print(f"[HYBRID] Generated {y_pseudo_select.sum()} pseudo-anomalies from pre-training IF")
|
|
|
|
# Feature selection with Chi-Square
|
|
print(f"[HYBRID] Feature selection: {len(X.columns)} → {self.config['chi2_top_k']} features")
|
|
|
|
# Validate k is not larger than available features
|
|
k_select = min(self.config['chi2_top_k'], X_positive.shape[1])
|
|
if k_select < self.config['chi2_top_k']:
|
|
print(f"[HYBRID] Warning: Reducing k from {self.config['chi2_top_k']} to {k_select} (max available)")
|
|
|
|
self.feature_selector = SelectKBest(chi2, k=k_select)
|
|
X_selected = self.feature_selector.fit_transform(X_positive, y_pseudo_select)
|
|
|
|
# Get selected feature names
|
|
selected_indices = self.feature_selector.get_support(indices=True)
|
|
self.selected_feature_names = [self.feature_names[i] for i in selected_indices]
|
|
print(f"[HYBRID] Selected features: {', '.join(self.selected_feature_names[:5])}... (+{len(self.selected_feature_names)-5} more)")
|
|
|
|
# STEP 2: Normalize
|
|
print("[HYBRID] Normalizing features...")
|
|
self.scaler = StandardScaler()
|
|
X_scaled = self.scaler.fit_transform(X_selected)
|
|
|
|
# STEP 3: Train Extended Isolation Forest
|
|
print(f"[HYBRID] Training Extended Isolation Forest (contamination={self.config['eif_contamination']})...")
|
|
if EIF_AVAILABLE:
|
|
self.isolation_forest = ExtendedIsolationForest(
|
|
n_estimators=self.config['eif_n_estimators'],
|
|
max_samples=self.config['eif_max_samples'],
|
|
contamination=self.config['eif_contamination'],
|
|
extension_level=self.config['eif_extension_level'],
|
|
random_state=42,
|
|
)
|
|
else:
|
|
self.isolation_forest = IsolationForest(
|
|
n_estimators=self.config['eif_n_estimators'],
|
|
max_samples=self.config['eif_max_samples'],
|
|
contamination=self.config['eif_contamination'],
|
|
max_features=self.config['eif_max_features'],
|
|
random_state=42,
|
|
n_jobs=-1
|
|
)
|
|
|
|
self.isolation_forest.fit(X_scaled)
|
|
|
|
# STEP 4: Generate pseudo-labels from IF predictions
|
|
print("[HYBRID] Generating pseudo-labels from Isolation Forest...")
|
|
if_predictions = self.isolation_forest.predict(X_scaled)
|
|
if_scores = self.isolation_forest.score_samples(X_scaled)
|
|
|
|
# Convert IF predictions to pseudo-labels (1=anomaly, 0=normal)
|
|
y_pseudo_train = (if_predictions == -1).astype(int)
|
|
anomalies_count = y_pseudo_train.sum()
|
|
|
|
# CRITICAL: Handle zero-anomaly case with ADAPTIVE PERCENTILES
|
|
min_anomalies_required = max(10, int(len(y_pseudo_train) * 0.02)) # At least 2% or 10
|
|
|
|
if anomalies_count < min_anomalies_required:
|
|
print(f"[HYBRID] ⚠️ IF found only {anomalies_count} anomalies (need {min_anomalies_required})")
|
|
print(f"[HYBRID] Applying ADAPTIVE percentile fallback...")
|
|
|
|
# Try progressively higher percentiles to get enough pseudo-anomalies
|
|
percentiles_to_try = [5, 10, 15, 20] # Bottom X% scores
|
|
for percentile in percentiles_to_try:
|
|
anomaly_threshold = np.percentile(if_scores, percentile)
|
|
y_pseudo_train = (if_scores <= anomaly_threshold).astype(int)
|
|
anomalies_count = y_pseudo_train.sum()
|
|
|
|
print(f"[HYBRID] Trying {percentile}% percentile → {anomalies_count} anomalies")
|
|
|
|
if anomalies_count >= min_anomalies_required:
|
|
print(f"[HYBRID] ✅ Success with {percentile}% percentile")
|
|
break
|
|
|
|
# Final check: FAIL if ensemble cannot be trained
|
|
if anomalies_count < 2:
|
|
error_msg = (
|
|
f"HYBRID TRAINING FAILED: Insufficient pseudo-anomalies ({anomalies_count}) for ensemble training.\n\n"
|
|
f"Dataset appears too clean for supervised ensemble classifier.\n"
|
|
f"Attempted adaptive percentiles (5%, 10%, 15%, 20%) but still < 2 classes.\n\n"
|
|
f"SOLUTIONS:\n"
|
|
f" 1. Collect more diverse network traffic data\n"
|
|
f" 2. Lower contamination threshold (currently {self.config['eif_contamination']})\n"
|
|
f" 3. Use larger dataset (currently {len(features_df)} unique IPs)\n\n"
|
|
f"IMPORTANT: Hybrid detector REQUIRES ensemble classifier.\n"
|
|
f"Cannot deploy incomplete IF-only system when hybrid was requested."
|
|
)
|
|
print(f"\n[HYBRID] ❌ {error_msg}")
|
|
|
|
raise ValueError(error_msg)
|
|
|
|
print(f"[HYBRID] Pseudo-labels: {anomalies_count} anomalies, {len(y_pseudo_train)-anomalies_count} normal")
|
|
|
|
# Use IF confidence: samples with extreme anomaly scores are labeled with higher confidence
|
|
# High anomaly = low score, so invert
|
|
score_min, score_max = if_scores.min(), if_scores.max()
|
|
anomaly_confidence = 1 - (if_scores - score_min) / (score_max - score_min + 1e-10)
|
|
|
|
# Weight samples: high confidence anomalies + random normal samples
|
|
sample_weights = np.where(
|
|
y_pseudo_train == 1,
|
|
anomaly_confidence, # Anomalies weighted by confidence
|
|
0.5 # Normal traffic baseline weight
|
|
)
|
|
|
|
# STEP 5: Train Ensemble Classifier (DT + RF + XGBoost)
|
|
print("[HYBRID] Training ensemble classifier (DT + RF + XGBoost)...")
|
|
|
|
# CRITICAL: Re-check class distribution after all preprocessing
|
|
unique_classes = np.unique(y_pseudo_train)
|
|
if len(unique_classes) < 2:
|
|
error_msg = (
|
|
f"HYBRID TRAINING FAILED: Class distribution collapsed to {len(unique_classes)} class(es) "
|
|
f"after feature selection/preprocessing.\n\n"
|
|
f"This indicates feature selection eliminated discriminative features.\n\n"
|
|
f"SOLUTIONS:\n"
|
|
f" 1. Use larger dataset with more diverse traffic\n"
|
|
f" 2. Lower contamination threshold\n"
|
|
f" 3. Reduce chi2_top_k (currently {self.config['chi2_top_k']}) to keep more features\n\n"
|
|
f"Hybrid detector REQUIRES ensemble classifier - cannot proceed with monoclasse."
|
|
)
|
|
print(f"\n[HYBRID] ❌ {error_msg}")
|
|
raise ValueError(error_msg)
|
|
|
|
print(f"[HYBRID] Class distribution OK: {unique_classes} (counts: {np.bincount(y_pseudo_train)})")
|
|
|
|
# Decision Tree
|
|
dt_classifier = DecisionTreeClassifier(
|
|
max_depth=self.config['dt_max_depth'],
|
|
random_state=42,
|
|
class_weight='balanced' # Handle imbalance
|
|
)
|
|
|
|
# Random Forest
|
|
rf_classifier = RandomForestClassifier(
|
|
n_estimators=self.config['rf_n_estimators'],
|
|
max_depth=self.config['rf_max_depth'],
|
|
random_state=42,
|
|
n_jobs=-1,
|
|
class_weight='balanced'
|
|
)
|
|
|
|
# XGBoost
|
|
xgb_classifier = XGBClassifier(
|
|
n_estimators=self.config['xgb_n_estimators'],
|
|
max_depth=self.config['xgb_max_depth'],
|
|
learning_rate=self.config['xgb_learning_rate'],
|
|
random_state=42,
|
|
use_label_encoder=False,
|
|
eval_metric='logloss',
|
|
scale_pos_weight=len(y_pseudo_train) / max(anomalies_count, 1) # Handle imbalance
|
|
)
|
|
|
|
# Voting Classifier with weighted voting
|
|
self.ensemble_classifier = VotingClassifier(
|
|
estimators=[
|
|
('dt', dt_classifier),
|
|
('rf', rf_classifier),
|
|
('xgb', xgb_classifier)
|
|
],
|
|
voting='soft', # Use probability averaging
|
|
weights=self.config['voting_weights'] # [1, 2, 2] - favor RF and XGB
|
|
)
|
|
|
|
# Train ensemble on pseudo-labeled data with error handling
|
|
try:
|
|
self.ensemble_classifier.fit(X_scaled, y_pseudo_train, sample_weight=sample_weights)
|
|
print("[HYBRID] Ensemble .fit() completed successfully")
|
|
except Exception as e:
|
|
error_msg = (
|
|
f"HYBRID TRAINING FAILED: Ensemble .fit() raised exception:\n{str(e)}\n\n"
|
|
f"This may indicate:\n"
|
|
f" - Insufficient data variation\n"
|
|
f" - Class imbalance too extreme\n"
|
|
f" - Invalid sample weights\n\n"
|
|
f"Hybrid detector REQUIRES working ensemble classifier."
|
|
)
|
|
print(f"\n[HYBRID] ❌ {error_msg}")
|
|
self.ensemble_classifier = None
|
|
raise ValueError(error_msg) from e
|
|
|
|
# Verify ensemble is functional
|
|
if self.ensemble_classifier is None:
|
|
error_msg = "HYBRID TRAINING FAILED: Ensemble classifier is None after fit()"
|
|
print(f"\n[HYBRID] ❌ {error_msg}")
|
|
raise ValueError(error_msg)
|
|
|
|
# Verify ensemble has predict_proba method
|
|
if not hasattr(self.ensemble_classifier, 'predict_proba'):
|
|
error_msg = "HYBRID TRAINING FAILED: Ensemble missing predict_proba method"
|
|
print(f"\n[HYBRID] ❌ {error_msg}")
|
|
self.ensemble_classifier = None
|
|
raise ValueError(error_msg)
|
|
|
|
# Verify ensemble can make predictions
|
|
try:
|
|
test_proba = self.ensemble_classifier.predict_proba(X_scaled[:1])
|
|
if test_proba.shape[1] < 2:
|
|
raise ValueError(f"Ensemble produces {test_proba.shape[1]} classes, need 2")
|
|
print(f"[HYBRID] ✅ Ensemble verified: produces {test_proba.shape[1]} class probabilities")
|
|
except Exception as e:
|
|
error_msg = f"HYBRID TRAINING FAILED: Ensemble cannot make predictions: {str(e)}"
|
|
print(f"\n[HYBRID] ❌ {error_msg}")
|
|
self.ensemble_classifier = None
|
|
raise ValueError(error_msg) from e
|
|
|
|
print("[HYBRID] Ensemble training completed and verified!")
|
|
|
|
# Save models
|
|
self.save_models()
|
|
|
|
# FINAL VERIFICATION: Ensure ensemble is still set after save
|
|
if self.ensemble_classifier is None:
|
|
error_msg = "HYBRID TRAINING FAILED: Ensemble became None after save"
|
|
print(f"\n[HYBRID] ❌ {error_msg}")
|
|
raise ValueError(error_msg)
|
|
|
|
# Calculate statistics - only after ALL verifications passed
|
|
result = {
|
|
'records_processed': len(logs_df),
|
|
'unique_ips': len(features_df),
|
|
'features_total': len(self.feature_names),
|
|
'features_selected': len(self.selected_feature_names),
|
|
'features_count': len(self.selected_feature_names), # For backward compatibility with /train endpoint
|
|
'anomalies_detected': int(anomalies_count),
|
|
'contamination': self.config['eif_contamination'],
|
|
'model_type': 'Hybrid (EIF + Ensemble)',
|
|
'ensemble_models': ['DecisionTree', 'RandomForest', 'XGBoost'],
|
|
'status': 'success',
|
|
'ensemble_verified': True # Explicit flag for verification
|
|
}
|
|
|
|
print(f"[HYBRID] ✅ Training completed successfully! {anomalies_count}/{len(features_df)} IPs flagged as anomalies")
|
|
print(f"[HYBRID] ✅ Ensemble classifier verified and ready for production")
|
|
return result
|
|
|
|
def detect(
|
|
self,
|
|
logs_df: pd.DataFrame,
|
|
mode: Literal['confidence', 'all'] = 'confidence'
|
|
) -> List[Dict]:
|
|
"""
|
|
Detect anomalies with confidence scoring
|
|
mode='confidence': only return high/medium confidence detections
|
|
mode='all': return all detections with confidence levels
|
|
"""
|
|
if self.isolation_forest is None or self.scaler is None:
|
|
raise ValueError("Model not trained. Run train_unsupervised() first.")
|
|
|
|
features_df = self.extract_features(logs_df)
|
|
if features_df.empty:
|
|
return []
|
|
|
|
source_ips = features_df['source_ip'].values
|
|
X = features_df.drop('source_ip', axis=1)
|
|
|
|
# Apply same feature selection
|
|
X_positive = X.clip(lower=0)
|
|
X_positive = X_positive + 1e-10 # Add epsilon
|
|
X_selected = self.feature_selector.transform(X_positive)
|
|
X_scaled = self.scaler.transform(X_selected)
|
|
|
|
# HYBRID SCORING: Combine Isolation Forest + Ensemble Classifier
|
|
|
|
# Step 1: Isolation Forest score (unsupervised anomaly detection)
|
|
if_predictions = self.isolation_forest.predict(X_scaled)
|
|
if_scores = self.isolation_forest.score_samples(X_scaled)
|
|
|
|
# Normalize IF scores to 0-100 (lower score = more anomalous)
|
|
if_score_min, if_score_max = if_scores.min(), if_scores.max()
|
|
if_risk_scores = 100 * (1 - (if_scores - if_score_min) / (if_score_max - if_score_min + 1e-10))
|
|
|
|
# Step 2: Ensemble score (supervised classification on pseudo-labels)
|
|
if self.ensemble_classifier is not None:
|
|
print(f"[DETECT] Ensemble classifier available - computing hybrid score...")
|
|
|
|
# Get ensemble probability predictions
|
|
ensemble_proba = self.ensemble_classifier.predict_proba(X_scaled)
|
|
# Probability of being anomaly (class 1)
|
|
ensemble_anomaly_proba = ensemble_proba[:, 1]
|
|
# Convert to 0-100 scale
|
|
ensemble_risk_scores = ensemble_anomaly_proba * 100
|
|
|
|
# Combine scores: weighted average (IF: 40%, Ensemble: 60%)
|
|
# Ensemble gets more weight as it's trained on pseudo-labels
|
|
risk_scores = 0.4 * if_risk_scores + 0.6 * ensemble_risk_scores
|
|
|
|
# Debugging: show score distribution
|
|
print(f"[DETECT] IF scores: min={if_risk_scores.min():.1f}, max={if_risk_scores.max():.1f}, mean={if_risk_scores.mean():.1f}")
|
|
print(f"[DETECT] Ensemble scores: min={ensemble_risk_scores.min():.1f}, max={ensemble_risk_scores.max():.1f}, mean={ensemble_risk_scores.mean():.1f}")
|
|
print(f"[DETECT] Combined scores: min={risk_scores.min():.1f}, max={risk_scores.max():.1f}, mean={risk_scores.mean():.1f}")
|
|
print(f"[DETECT] ✅ Hybrid scoring active: 40% IF + 60% Ensemble")
|
|
else:
|
|
# Fallback to IF-only if ensemble not available
|
|
risk_scores = if_risk_scores
|
|
print(f"[DETECT] ⚠️ Ensemble NOT available - using IF-only scoring")
|
|
print(f"[DETECT] IF scores: min={if_risk_scores.min():.1f}, max={if_risk_scores.max():.1f}, mean={if_risk_scores.mean():.1f}")
|
|
|
|
# For backward compatibility
|
|
predictions = if_predictions
|
|
|
|
detections = []
|
|
for i, (ip, pred, risk_score) in enumerate(zip(source_ips, predictions, risk_scores)):
|
|
# Confidence scoring
|
|
if risk_score >= self.config['confidence_high']:
|
|
confidence_level = 'high'
|
|
action_recommendation = 'auto_block'
|
|
elif risk_score >= self.config['confidence_medium']:
|
|
confidence_level = 'medium'
|
|
action_recommendation = 'manual_review'
|
|
else:
|
|
confidence_level = 'low'
|
|
action_recommendation = 'monitor'
|
|
|
|
# Skip low confidence if mode='confidence'
|
|
if mode == 'confidence' and confidence_level == 'low':
|
|
continue
|
|
|
|
# Classify anomaly type
|
|
features = features_df.iloc[i]
|
|
anomaly_type = self._classify_anomaly(features)
|
|
reason = self._generate_reason(features, anomaly_type)
|
|
|
|
# Get IP logs
|
|
ip_logs = logs_df[logs_df['source_ip'] == ip]
|
|
|
|
detection = {
|
|
'source_ip': ip,
|
|
'risk_score': float(risk_score),
|
|
'confidence_level': confidence_level,
|
|
'action_recommendation': action_recommendation,
|
|
'anomaly_type': anomaly_type,
|
|
'reason': reason,
|
|
'log_count': len(ip_logs),
|
|
'total_packets': int(features['total_packets']),
|
|
'total_bytes': int(features['total_bytes']),
|
|
'first_seen': ip_logs['timestamp'].min().isoformat(),
|
|
'last_seen': ip_logs['timestamp'].max().isoformat(),
|
|
}
|
|
detections.append(detection)
|
|
|
|
# Sort by risk_score descending
|
|
detections.sort(key=lambda x: x['risk_score'], reverse=True)
|
|
return detections
|
|
|
|
def _classify_anomaly(self, features: pd.Series) -> str:
|
|
"""Classify anomaly type based on feature patterns"""
|
|
# Use percentile-based thresholds instead of hardcoded
|
|
# DDoS: extreme volume
|
|
if features['bytes_per_second'] > 5000000 or features['conn_per_second'] > 200:
|
|
return 'ddos'
|
|
|
|
# Port scan: high port diversity + sequential patterns
|
|
if features['port_scan_score'] > 0.6 or features['sequential_ports'] > 15:
|
|
return 'port_scan'
|
|
|
|
# Brute force: high connection rate to few ports
|
|
if features['conn_per_second'] > 20 and features['unique_dest_ports'] < 5:
|
|
return 'brute_force'
|
|
|
|
# Botnet: regular patterns, low variance
|
|
if features['burst_variance'] < 2 and features['conn_per_second'] > 5:
|
|
return 'botnet'
|
|
|
|
# Default: suspicious activity
|
|
return 'suspicious'
|
|
|
|
def _generate_reason(self, features: pd.Series, anomaly_type: str) -> str:
|
|
"""Generate human-readable reason"""
|
|
reasons = []
|
|
|
|
if features['bytes_per_second'] > 1000000:
|
|
reasons.append(f"High bandwidth: {features['bytes_per_second']/1e6:.1f} MB/s")
|
|
|
|
if features['conn_per_second'] > 50:
|
|
reasons.append(f"High connection rate: {features['conn_per_second']:.1f} conn/s")
|
|
|
|
if features['port_scan_score'] > 0.5:
|
|
reasons.append(f"Port scanning: {features['unique_ports_contacted']:.0f} unique ports")
|
|
|
|
if features['unique_dest_ips'] > 100:
|
|
reasons.append(f"Multiple targets: {features['unique_dest_ips']:.0f} IPs")
|
|
|
|
if not reasons:
|
|
reasons.append(f"Anomalous pattern detected ({anomaly_type})")
|
|
|
|
return " | ".join(reasons)
|
|
|
|
def save_models(self):
|
|
"""Save all models and metadata"""
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Save models
|
|
joblib.dump(self.isolation_forest, self.model_dir / f"isolation_forest_{timestamp}.pkl")
|
|
joblib.dump(self.scaler, self.model_dir / f"scaler_{timestamp}.pkl")
|
|
joblib.dump(self.feature_selector, self.model_dir / f"feature_selector_{timestamp}.pkl")
|
|
|
|
# Save ensemble if available
|
|
if self.ensemble_classifier is not None:
|
|
joblib.dump(self.ensemble_classifier, self.model_dir / f"ensemble_classifier_{timestamp}.pkl")
|
|
joblib.dump(self.ensemble_classifier, self.model_dir / "ensemble_classifier_latest.pkl")
|
|
|
|
# Save latest (symlinks alternative)
|
|
joblib.dump(self.isolation_forest, self.model_dir / "isolation_forest_latest.pkl")
|
|
joblib.dump(self.scaler, self.model_dir / "scaler_latest.pkl")
|
|
joblib.dump(self.feature_selector, self.model_dir / "feature_selector_latest.pkl")
|
|
|
|
# Save metadata
|
|
metadata = {
|
|
'timestamp': timestamp,
|
|
'feature_names': self.feature_names,
|
|
'selected_feature_names': self.selected_feature_names,
|
|
'config': self.config,
|
|
'metrics': self.metrics,
|
|
'has_ensemble': self.ensemble_classifier is not None,
|
|
}
|
|
|
|
with open(self.model_dir / f"metadata_{timestamp}.json", 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
with open(self.model_dir / "metadata_latest.json", 'w') as f:
|
|
json.dump(metadata, f, indent=2)
|
|
|
|
print(f"[HYBRID] Models saved to {self.model_dir}")
|
|
if self.ensemble_classifier is not None:
|
|
print(f"[HYBRID] Ensemble classifier included")
|
|
|
|
def load_models(self, version: str = 'latest'):
|
|
"""Load models from disk"""
|
|
try:
|
|
self.isolation_forest = joblib.load(self.model_dir / f"isolation_forest_{version}.pkl")
|
|
self.scaler = joblib.load(self.model_dir / f"scaler_{version}.pkl")
|
|
self.feature_selector = joblib.load(self.model_dir / f"feature_selector_{version}.pkl")
|
|
|
|
# Try to load ensemble if available
|
|
ensemble_path = self.model_dir / f"ensemble_classifier_{version}.pkl"
|
|
if ensemble_path.exists():
|
|
self.ensemble_classifier = joblib.load(ensemble_path)
|
|
print(f"[HYBRID] Ensemble classifier loaded")
|
|
else:
|
|
self.ensemble_classifier = None
|
|
print(f"[HYBRID] No ensemble classifier found (IF-only mode)")
|
|
|
|
with open(self.model_dir / f"metadata_{version}.json") as f:
|
|
metadata = json.load(f)
|
|
self.feature_names = metadata['feature_names']
|
|
self.selected_feature_names = metadata['selected_feature_names']
|
|
self.config.update(metadata['config'])
|
|
self.metrics = metadata['metrics']
|
|
|
|
print(f"[HYBRID] Models loaded (version: {version})")
|
|
print(f"[HYBRID] Selected features: {len(self.selected_feature_names)}/{len(self.feature_names)}")
|
|
|
|
if self.ensemble_classifier is not None:
|
|
print(f"[HYBRID] Mode: Hybrid (IF + Ensemble)")
|
|
else:
|
|
print(f"[HYBRID] Mode: IF-only (Ensemble not available)")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"[HYBRID] Failed to load models: {e}")
|
|
return False
|