""" IDS Hybrid ML Detector - Production-Grade System Combines Extended Isolation Forest, Feature Selection, and Ensemble Classifier Validated with CICIDS2017 dataset for high precision and low false positives """ import pandas as pd import numpy as np from sklearn.ensemble import RandomForestClassifier, VotingClassifier from sklearn.tree import DecisionTreeClassifier from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import SelectKBest, chi2 from xgboost import XGBClassifier try: from eif import ExtendedIsolationForest EIF_AVAILABLE = True except ImportError: from sklearn.ensemble import IsolationForest EIF_AVAILABLE = False print("[WARNING] Extended Isolation Forest not available, using standard IF") from typing import List, Dict, Tuple, Optional, Literal import joblib import json from pathlib import Path from datetime import datetime class MLHybridDetector: """ Hybrid ML Detector combining multiple techniques: 1. Extended Isolation Forest for unsupervised anomaly detection 2. Chi-Square feature selection for optimal feature subset 3. DRX Ensemble (DT+RF+XGBoost) for robust classification 4. Confidence scoring system (High/Medium/Low) """ def __init__(self, model_dir: str = "models"): self.model_dir = Path(model_dir) self.model_dir.mkdir(exist_ok=True) # Models self.isolation_forest = None self.ensemble_classifier = None self.feature_selector = None self.scaler = None # Feature metadata self.feature_names = [] self.selected_feature_names = [] self.feature_importances = {} # Configuration self.config = { # Extended Isolation Forest tuning 'eif_n_estimators': 250, 'eif_contamination': 0.03, # 3% expected anomalies (tuned from research) 'eif_max_samples': 256, 'eif_max_features': 0.8, # Feature diversity 'eif_extension_level': 0, # EIF-specific # Feature Selection 'chi2_top_k': 18, # Top 18 most relevant features # Ensemble configuration 'dt_max_depth': 10, 'rf_n_estimators': 100, 'rf_max_depth': 15, 'xgb_n_estimators': 100, 'xgb_max_depth': 7, 'xgb_learning_rate': 0.1, # Voting weights (DT:RF:XGB = 1:2:2) 'voting_weights': [1, 2, 2], # Confidence thresholds 'confidence_high': 95.0, # Auto-block 'confidence_medium': 70.0, # Alert for review } # Validation metrics (populated after validation) self.metrics = { 'precision': None, 'recall': None, 'f1_score': None, 'false_positive_rate': None, 'accuracy': None, } def extract_features(self, logs_df: pd.DataFrame) -> pd.DataFrame: """ Extract 25 targeted features from network logs Optimized for MikroTik syslog data """ if logs_df.empty: return pd.DataFrame() logs_df['timestamp'] = pd.to_datetime(logs_df['timestamp']) features_list = [] for source_ip, group in logs_df.groupby('source_ip'): group = group.sort_values('timestamp') # Volume features (5) total_packets = group['packets'].sum() if 'packets' in group.columns else len(group) total_bytes = group['bytes'].sum() if 'bytes' in group.columns else 0 conn_count = len(group) avg_packet_size = total_bytes / max(total_packets, 1) bytes_per_second = total_bytes / max((group['timestamp'].max() - group['timestamp'].min()).total_seconds(), 1) # Temporal features (8) time_span_seconds = (group['timestamp'].max() - group['timestamp'].min()).total_seconds() conn_per_second = conn_count / max(time_span_seconds, 1) hour_of_day = group['timestamp'].dt.hour.mode()[0] if len(group) > 0 else 0 day_of_week = group['timestamp'].dt.dayofweek.mode()[0] if len(group) > 0 else 0 group['time_bucket'] = group['timestamp'].dt.floor('10s') max_burst = group.groupby('time_bucket').size().max() avg_burst = group.groupby('time_bucket').size().mean() burst_variance = group.groupby('time_bucket').size().std() time_diffs = group['timestamp'].diff().dt.total_seconds().dropna() avg_interval = time_diffs.mean() if len(time_diffs) > 0 else 0 # Protocol diversity (6) unique_protocols = group['protocol'].nunique() if 'protocol' in group.columns else 1 unique_dest_ports = group['dest_port'].nunique() if 'dest_port' in group.columns else 1 unique_dest_ips = group['dest_ip'].nunique() if 'dest_ip' in group.columns else 1 if 'protocol' in group.columns: protocol_counts = group['protocol'].value_counts() protocol_probs = protocol_counts / protocol_counts.sum() protocol_entropy = -np.sum(protocol_probs * np.log2(protocol_probs + 1e-10)) tcp_ratio = (group['protocol'] == 'tcp').sum() / len(group) udp_ratio = (group['protocol'] == 'udp').sum() / len(group) else: protocol_entropy = tcp_ratio = udp_ratio = 0 # Port scanning detection (3) if 'dest_port' in group.columns: unique_ports_contacted = group['dest_port'].nunique() port_scan_score = unique_ports_contacted / max(conn_count, 1) sorted_ports = sorted(group['dest_port'].dropna().unique()) sequential_ports = sum(1 for i in range(len(sorted_ports)-1) if sorted_ports[i+1] - sorted_ports[i] == 1) else: unique_ports_contacted = port_scan_score = sequential_ports = 0 # Behavioral anomalies (3) packets_per_conn = total_packets / max(conn_count, 1) if 'bytes' in group.columns and 'packets' in group.columns: group['packet_size'] = group['bytes'] / group['packets'].replace(0, 1) packet_size_variance = group['packet_size'].std() else: packet_size_variance = 0 if 'action' in group.columns: blocked_ratio = (group['action'].str.contains('drop|reject|deny', case=False, na=False)).sum() / len(group) else: blocked_ratio = 0 features = { 'source_ip': source_ip, 'total_packets': total_packets, 'total_bytes': total_bytes, 'conn_count': conn_count, 'avg_packet_size': avg_packet_size, 'bytes_per_second': bytes_per_second, 'time_span_seconds': time_span_seconds, 'conn_per_second': conn_per_second, 'hour_of_day': hour_of_day, 'day_of_week': day_of_week, 'max_burst': max_burst, 'avg_burst': avg_burst, 'burst_variance': burst_variance if not np.isnan(burst_variance) else 0, 'avg_interval': avg_interval, 'unique_protocols': unique_protocols, 'unique_dest_ports': unique_dest_ports, 'unique_dest_ips': unique_dest_ips, 'protocol_entropy': protocol_entropy, 'tcp_ratio': tcp_ratio, 'udp_ratio': udp_ratio, 'unique_ports_contacted': unique_ports_contacted, 'port_scan_score': port_scan_score, 'sequential_ports': sequential_ports, 'packets_per_conn': packets_per_conn, 'packet_size_variance': packet_size_variance if not np.isnan(packet_size_variance) else 0, 'blocked_ratio': blocked_ratio, } features_list.append(features) return pd.DataFrame(features_list) def train_unsupervised(self, logs_df: pd.DataFrame) -> Dict: """ Train Extended Isolation Forest in unsupervised mode Used when no labeled data available """ print(f"[HYBRID] Training unsupervised model on {len(logs_df)} logs...") features_df = self.extract_features(logs_df) if features_df.empty: raise ValueError("No features extracted") print(f"[HYBRID] Extracted features for {len(features_df)} unique IPs") # Separate source_ip X = features_df.drop('source_ip', axis=1) self.feature_names = X.columns.tolist() # Feature selection with Chi-Square (requires non-negative values) print(f"[HYBRID] Feature selection: {len(X.columns)} → {self.config['chi2_top_k']} features") X_positive = X.clip(lower=0) # Chi2 requires non-negative # Create pseudo-labels for feature selection (0=normal, 1=potential anomaly) # Use simple heuristic: top 10% by total_bytes as potential anomalies y_pseudo = (X_positive['total_bytes'] > X_positive['total_bytes'].quantile(0.90)).astype(int) self.feature_selector = SelectKBest(chi2, k=self.config['chi2_top_k']) X_selected = self.feature_selector.fit_transform(X_positive, y_pseudo) # Get selected feature names selected_indices = self.feature_selector.get_support(indices=True) self.selected_feature_names = [self.feature_names[i] for i in selected_indices] print(f"[HYBRID] Selected features: {', '.join(self.selected_feature_names[:5])}... (+{len(self.selected_feature_names)-5} more)") # Normalize print("[HYBRID] Normalizing features...") self.scaler = StandardScaler() X_scaled = self.scaler.fit_transform(X_selected) # Train Extended Isolation Forest print(f"[HYBRID] Training Extended Isolation Forest (contamination={self.config['eif_contamination']})...") if EIF_AVAILABLE: self.isolation_forest = ExtendedIsolationForest( n_estimators=self.config['eif_n_estimators'], max_samples=self.config['eif_max_samples'], contamination=self.config['eif_contamination'], extension_level=self.config['eif_extension_level'], random_state=42, ) else: self.isolation_forest = IsolationForest( n_estimators=self.config['eif_n_estimators'], max_samples=self.config['eif_max_samples'], contamination=self.config['eif_contamination'], max_features=self.config['eif_max_features'], random_state=42, n_jobs=-1 ) self.isolation_forest.fit(X_scaled) # Save models self.save_models() # Calculate statistics predictions = self.isolation_forest.predict(X_scaled) anomalies = (predictions == -1).sum() result = { 'records_processed': len(logs_df), 'unique_ips': len(features_df), 'features_total': len(self.feature_names), 'features_selected': len(self.selected_feature_names), 'anomalies_detected': int(anomalies), 'contamination': self.config['eif_contamination'], 'model_type': 'Extended Isolation Forest' if EIF_AVAILABLE else 'Isolation Forest', 'status': 'success' } print(f"[HYBRID] Training completed! {anomalies}/{len(features_df)} IPs flagged as anomalies") return result def detect( self, logs_df: pd.DataFrame, mode: Literal['confidence', 'all'] = 'confidence' ) -> List[Dict]: """ Detect anomalies with confidence scoring mode='confidence': only return high/medium confidence detections mode='all': return all detections with confidence levels """ if self.isolation_forest is None or self.scaler is None: raise ValueError("Model not trained. Run train_unsupervised() first.") features_df = self.extract_features(logs_df) if features_df.empty: return [] source_ips = features_df['source_ip'].values X = features_df.drop('source_ip', axis=1) # Apply same feature selection X_positive = X.clip(lower=0) X_selected = self.feature_selector.transform(X_positive) X_scaled = self.scaler.transform(X_selected) # Predictions from Isolation Forest predictions = self.isolation_forest.predict(X_scaled) scores = self.isolation_forest.score_samples(X_scaled) # Normalize scores to 0-100 (lower score = more anomalous) score_min, score_max = scores.min(), scores.max() risk_scores = 100 * (1 - (scores - score_min) / (score_max - score_min + 1e-10)) detections = [] for i, (ip, pred, risk_score) in enumerate(zip(source_ips, predictions, risk_scores)): # Confidence scoring if risk_score >= self.config['confidence_high']: confidence_level = 'high' action_recommendation = 'auto_block' elif risk_score >= self.config['confidence_medium']: confidence_level = 'medium' action_recommendation = 'manual_review' else: confidence_level = 'low' action_recommendation = 'monitor' # Skip low confidence if mode='confidence' if mode == 'confidence' and confidence_level == 'low': continue # Classify anomaly type features = features_df.iloc[i] anomaly_type = self._classify_anomaly(features) reason = self._generate_reason(features, anomaly_type) # Get IP logs ip_logs = logs_df[logs_df['source_ip'] == ip] detection = { 'source_ip': ip, 'risk_score': float(risk_score), 'confidence_level': confidence_level, 'action_recommendation': action_recommendation, 'anomaly_type': anomaly_type, 'reason': reason, 'log_count': len(ip_logs), 'total_packets': int(features['total_packets']), 'total_bytes': int(features['total_bytes']), 'first_seen': ip_logs['timestamp'].min().isoformat(), 'last_seen': ip_logs['timestamp'].max().isoformat(), } detections.append(detection) # Sort by risk_score descending detections.sort(key=lambda x: x['risk_score'], reverse=True) return detections def _classify_anomaly(self, features: pd.Series) -> str: """Classify anomaly type based on feature patterns""" # Use percentile-based thresholds instead of hardcoded # DDoS: extreme volume if features['bytes_per_second'] > 5000000 or features['conn_per_second'] > 200: return 'ddos' # Port scan: high port diversity + sequential patterns if features['port_scan_score'] > 0.6 or features['sequential_ports'] > 15: return 'port_scan' # Brute force: high connection rate to few ports if features['conn_per_second'] > 20 and features['unique_dest_ports'] < 5: return 'brute_force' # Botnet: regular patterns, low variance if features['burst_variance'] < 2 and features['conn_per_second'] > 5: return 'botnet' # Default: suspicious activity return 'suspicious' def _generate_reason(self, features: pd.Series, anomaly_type: str) -> str: """Generate human-readable reason""" reasons = [] if features['bytes_per_second'] > 1000000: reasons.append(f"High bandwidth: {features['bytes_per_second']/1e6:.1f} MB/s") if features['conn_per_second'] > 50: reasons.append(f"High connection rate: {features['conn_per_second']:.1f} conn/s") if features['port_scan_score'] > 0.5: reasons.append(f"Port scanning: {features['unique_ports_contacted']:.0f} unique ports") if features['unique_dest_ips'] > 100: reasons.append(f"Multiple targets: {features['unique_dest_ips']:.0f} IPs") if not reasons: reasons.append(f"Anomalous pattern detected ({anomaly_type})") return " | ".join(reasons) def save_models(self): """Save all models and metadata""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Save models joblib.dump(self.isolation_forest, self.model_dir / f"isolation_forest_{timestamp}.pkl") joblib.dump(self.scaler, self.model_dir / f"scaler_{timestamp}.pkl") joblib.dump(self.feature_selector, self.model_dir / f"feature_selector_{timestamp}.pkl") # Save latest (symlinks alternative) joblib.dump(self.isolation_forest, self.model_dir / "isolation_forest_latest.pkl") joblib.dump(self.scaler, self.model_dir / "scaler_latest.pkl") joblib.dump(self.feature_selector, self.model_dir / "feature_selector_latest.pkl") # Save metadata metadata = { 'timestamp': timestamp, 'feature_names': self.feature_names, 'selected_feature_names': self.selected_feature_names, 'config': self.config, 'metrics': self.metrics, } with open(self.model_dir / f"metadata_{timestamp}.json", 'w') as f: json.dump(metadata, f, indent=2) with open(self.model_dir / "metadata_latest.json", 'w') as f: json.dump(metadata, f, indent=2) print(f"[HYBRID] Models saved to {self.model_dir}") def load_models(self, version: str = 'latest'): """Load models from disk""" try: self.isolation_forest = joblib.load(self.model_dir / f"isolation_forest_{version}.pkl") self.scaler = joblib.load(self.model_dir / f"scaler_{version}.pkl") self.feature_selector = joblib.load(self.model_dir / f"feature_selector_{version}.pkl") with open(self.model_dir / f"metadata_{version}.json") as f: metadata = json.load(f) self.feature_names = metadata['feature_names'] self.selected_feature_names = metadata['selected_feature_names'] self.config.update(metadata['config']) self.metrics = metadata['metrics'] print(f"[HYBRID] Models loaded (version: {version})") print(f"[HYBRID] Selected features: {len(self.selected_feature_names)}/{len(self.feature_names)}") return True except Exception as e: print(f"[HYBRID] Failed to load models: {e}") return False