From 16617aa0fa40fe7c2c191b103ab4b5f7afe5b264 Mon Sep 17 00:00:00 2001 From: marco370 <48531002-marco370@users.noreply.replit.com> Date: Mon, 24 Nov 2025 16:25:40 +0000 Subject: [PATCH] Improve model training by adding robust error handling and logging Add exception handling to the model training process to log failures and improve robustness. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: intermediate_checkpoint Replit-Commit-Event-Id: 9c7ad6b8-3e9d-41fe-83f7-6b2a48f8ff44 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/2lUhxO2 --- .replit | 4 - python_ml/main.py | 41 ++++- python_ml/ml_hybrid_detector.py | 309 +++++++++++++++++++++++++++++--- python_ml/train_hybrid.py | 8 +- 4 files changed, 324 insertions(+), 38 deletions(-) diff --git a/.replit b/.replit index d388bbc..3dc4618 100644 --- a/.replit +++ b/.replit @@ -14,10 +14,6 @@ run = ["npm", "run", "start"] localPort = 5000 externalPort = 80 -[[ports]] -localPort = 37135 -externalPort = 3001 - [[ports]] localPort = 41303 externalPort = 3002 diff --git a/python_ml/main.py b/python_ml/main.py index ed87083..31b9eac 100644 --- a/python_ml/main.py +++ b/python_ml/main.py @@ -192,15 +192,40 @@ async def train_model(request: TrainRequest, background_tasks: BackgroundTasks): # Training - usa detector appropriato print("[TRAIN] Addestramento modello...") - if USE_HYBRID_DETECTOR: - print("[TRAIN] Using Hybrid ML Detector") - result = ml_detector.train_unsupervised(df) - else: - print("[TRAIN] Using Legacy ML Analyzer") - result = ml_analyzer.train(df, contamination=request.contamination) - print(f"[TRAIN] Modello addestrato: {result}") + try: + if USE_HYBRID_DETECTOR: + print("[TRAIN] Using Hybrid ML Detector") + result = ml_detector.train_unsupervised(df) + else: + print("[TRAIN] Using Legacy ML Analyzer") + result = ml_analyzer.train(df, contamination=request.contamination) + print(f"[TRAIN] Modello addestrato: {result}") + except ValueError as e: + # Training FAILED - ensemble could not be created + error_msg = str(e) + print(f"\n[TRAIN] ❌ TRAINING FAILED") + print(f"{error_msg}") + + # Save failure to database + cursor.execute(""" + INSERT INTO training_history + (model_version, records_processed, features_count, training_duration, status, notes) + VALUES (%s, %s, %s, %s, %s, %s) + """, ( + "1.0.0", + len(df), + 0, + 0, + 'failed', + f"ERROR: {error_msg[:500]}" # Truncate if too long + )) + conn.commit() + print("[TRAIN] ❌ Training failure logged to database") + + # Re-raise to propagate error + raise - # Salva nel database + # Salva nel database (solo se training SUCCESS) print("[TRAIN] Salvataggio training history nel database...") cursor.execute(""" INSERT INTO training_history diff --git a/python_ml/ml_hybrid_detector.py b/python_ml/ml_hybrid_detector.py index 99ce1aa..87be785 100644 --- a/python_ml/ml_hybrid_detector.py +++ b/python_ml/ml_hybrid_detector.py @@ -194,10 +194,12 @@ class MLHybridDetector: def train_unsupervised(self, logs_df: pd.DataFrame) -> Dict: """ - Train Extended Isolation Forest in unsupervised mode - Used when no labeled data available + Train Hybrid System: + 1. Extended Isolation Forest (unsupervised) + 2. Pseudo-labeling from IF predictions + 3. Ensemble Classifier (DT+RF+XGB) on pseudo-labels """ - print(f"[HYBRID] Training unsupervised model on {len(logs_df)} logs...") + print(f"[HYBRID] Training hybrid model on {len(logs_df)} logs...") features_df = self.extract_features(logs_df) if features_df.empty: @@ -209,28 +211,60 @@ class MLHybridDetector: X = features_df.drop('source_ip', axis=1) self.feature_names = X.columns.tolist() - # Feature selection with Chi-Square (requires non-negative values) + # STEP 1: Initial IF training for pseudo-labels + print("[HYBRID] Pre-training Isolation Forest for feature selection...") + + # Ensure non-negative values + X_positive = X.clip(lower=0) + 1e-10 + + # Normalize for initial IF + temp_scaler = StandardScaler() + X_temp_scaled = temp_scaler.fit_transform(X_positive) + + # Train temporary IF for pseudo-labeling + if EIF_AVAILABLE: + temp_if = ExtendedIsolationForest( + n_estimators=100, # Faster pre-training + contamination=self.config['eif_contamination'], + random_state=42 + ) + else: + temp_if = IsolationForest( + n_estimators=100, + contamination=self.config['eif_contamination'], + random_state=42, + n_jobs=-1 + ) + + temp_if.fit(X_temp_scaled) + temp_predictions = temp_if.predict(X_temp_scaled) + + # Use IF predictions as pseudo-labels for feature selection + y_pseudo_select = (temp_predictions == -1).astype(int) + print(f"[HYBRID] Generated {y_pseudo_select.sum()} pseudo-anomalies from pre-training IF") + + # Feature selection with Chi-Square print(f"[HYBRID] Feature selection: {len(X.columns)} → {self.config['chi2_top_k']} features") - X_positive = X.clip(lower=0) # Chi2 requires non-negative - # Create pseudo-labels for feature selection (0=normal, 1=potential anomaly) - # Use simple heuristic: top 10% by total_bytes as potential anomalies - y_pseudo = (X_positive['total_bytes'] > X_positive['total_bytes'].quantile(0.90)).astype(int) + # Validate k is not larger than available features + k_select = min(self.config['chi2_top_k'], X_positive.shape[1]) + if k_select < self.config['chi2_top_k']: + print(f"[HYBRID] Warning: Reducing k from {self.config['chi2_top_k']} to {k_select} (max available)") - self.feature_selector = SelectKBest(chi2, k=self.config['chi2_top_k']) - X_selected = self.feature_selector.fit_transform(X_positive, y_pseudo) + self.feature_selector = SelectKBest(chi2, k=k_select) + X_selected = self.feature_selector.fit_transform(X_positive, y_pseudo_select) # Get selected feature names selected_indices = self.feature_selector.get_support(indices=True) self.selected_feature_names = [self.feature_names[i] for i in selected_indices] print(f"[HYBRID] Selected features: {', '.join(self.selected_feature_names[:5])}... (+{len(self.selected_feature_names)-5} more)") - # Normalize + # STEP 2: Normalize print("[HYBRID] Normalizing features...") self.scaler = StandardScaler() X_scaled = self.scaler.fit_transform(X_selected) - # Train Extended Isolation Forest + # STEP 3: Train Extended Isolation Forest print(f"[HYBRID] Training Extended Isolation Forest (contamination={self.config['eif_contamination']})...") if EIF_AVAILABLE: self.isolation_forest = ExtendedIsolationForest( @@ -252,25 +286,195 @@ class MLHybridDetector: self.isolation_forest.fit(X_scaled) + # STEP 4: Generate pseudo-labels from IF predictions + print("[HYBRID] Generating pseudo-labels from Isolation Forest...") + if_predictions = self.isolation_forest.predict(X_scaled) + if_scores = self.isolation_forest.score_samples(X_scaled) + + # Convert IF predictions to pseudo-labels (1=anomaly, 0=normal) + y_pseudo_train = (if_predictions == -1).astype(int) + anomalies_count = y_pseudo_train.sum() + + # CRITICAL: Handle zero-anomaly case with ADAPTIVE PERCENTILES + min_anomalies_required = max(10, int(len(y_pseudo_train) * 0.02)) # At least 2% or 10 + + if anomalies_count < min_anomalies_required: + print(f"[HYBRID] ⚠️ IF found only {anomalies_count} anomalies (need {min_anomalies_required})") + print(f"[HYBRID] Applying ADAPTIVE percentile fallback...") + + # Try progressively higher percentiles to get enough pseudo-anomalies + percentiles_to_try = [5, 10, 15, 20] # Bottom X% scores + for percentile in percentiles_to_try: + anomaly_threshold = np.percentile(if_scores, percentile) + y_pseudo_train = (if_scores <= anomaly_threshold).astype(int) + anomalies_count = y_pseudo_train.sum() + + print(f"[HYBRID] Trying {percentile}% percentile → {anomalies_count} anomalies") + + if anomalies_count >= min_anomalies_required: + print(f"[HYBRID] ✅ Success with {percentile}% percentile") + break + + # Final check: FAIL if ensemble cannot be trained + if anomalies_count < 2: + error_msg = ( + f"HYBRID TRAINING FAILED: Insufficient pseudo-anomalies ({anomalies_count}) for ensemble training.\n\n" + f"Dataset appears too clean for supervised ensemble classifier.\n" + f"Attempted adaptive percentiles (5%, 10%, 15%, 20%) but still < 2 classes.\n\n" + f"SOLUTIONS:\n" + f" 1. Collect more diverse network traffic data\n" + f" 2. Lower contamination threshold (currently {self.config['eif_contamination']})\n" + f" 3. Use larger dataset (currently {len(features_df)} unique IPs)\n\n" + f"IMPORTANT: Hybrid detector REQUIRES ensemble classifier.\n" + f"Cannot deploy incomplete IF-only system when hybrid was requested." + ) + print(f"\n[HYBRID] ❌ {error_msg}") + + raise ValueError(error_msg) + + print(f"[HYBRID] Pseudo-labels: {anomalies_count} anomalies, {len(y_pseudo_train)-anomalies_count} normal") + + # Use IF confidence: samples with extreme anomaly scores are labeled with higher confidence + # High anomaly = low score, so invert + score_min, score_max = if_scores.min(), if_scores.max() + anomaly_confidence = 1 - (if_scores - score_min) / (score_max - score_min + 1e-10) + + # Weight samples: high confidence anomalies + random normal samples + sample_weights = np.where( + y_pseudo_train == 1, + anomaly_confidence, # Anomalies weighted by confidence + 0.5 # Normal traffic baseline weight + ) + + # STEP 5: Train Ensemble Classifier (DT + RF + XGBoost) + print("[HYBRID] Training ensemble classifier (DT + RF + XGBoost)...") + + # CRITICAL: Re-check class distribution after all preprocessing + unique_classes = np.unique(y_pseudo_train) + if len(unique_classes) < 2: + error_msg = ( + f"HYBRID TRAINING FAILED: Class distribution collapsed to {len(unique_classes)} class(es) " + f"after feature selection/preprocessing.\n\n" + f"This indicates feature selection eliminated discriminative features.\n\n" + f"SOLUTIONS:\n" + f" 1. Use larger dataset with more diverse traffic\n" + f" 2. Lower contamination threshold\n" + f" 3. Reduce chi2_top_k (currently {self.config['chi2_top_k']}) to keep more features\n\n" + f"Hybrid detector REQUIRES ensemble classifier - cannot proceed with monoclasse." + ) + print(f"\n[HYBRID] ❌ {error_msg}") + raise ValueError(error_msg) + + print(f"[HYBRID] Class distribution OK: {unique_classes} (counts: {np.bincount(y_pseudo_train)})") + + # Decision Tree + dt_classifier = DecisionTreeClassifier( + max_depth=self.config['dt_max_depth'], + random_state=42, + class_weight='balanced' # Handle imbalance + ) + + # Random Forest + rf_classifier = RandomForestClassifier( + n_estimators=self.config['rf_n_estimators'], + max_depth=self.config['rf_max_depth'], + random_state=42, + n_jobs=-1, + class_weight='balanced' + ) + + # XGBoost + xgb_classifier = XGBClassifier( + n_estimators=self.config['xgb_n_estimators'], + max_depth=self.config['xgb_max_depth'], + learning_rate=self.config['xgb_learning_rate'], + random_state=42, + use_label_encoder=False, + eval_metric='logloss', + scale_pos_weight=len(y_pseudo_train) / max(anomalies_count, 1) # Handle imbalance + ) + + # Voting Classifier with weighted voting + self.ensemble_classifier = VotingClassifier( + estimators=[ + ('dt', dt_classifier), + ('rf', rf_classifier), + ('xgb', xgb_classifier) + ], + voting='soft', # Use probability averaging + weights=self.config['voting_weights'] # [1, 2, 2] - favor RF and XGB + ) + + # Train ensemble on pseudo-labeled data with error handling + try: + self.ensemble_classifier.fit(X_scaled, y_pseudo_train, sample_weight=sample_weights) + print("[HYBRID] Ensemble .fit() completed successfully") + except Exception as e: + error_msg = ( + f"HYBRID TRAINING FAILED: Ensemble .fit() raised exception:\n{str(e)}\n\n" + f"This may indicate:\n" + f" - Insufficient data variation\n" + f" - Class imbalance too extreme\n" + f" - Invalid sample weights\n\n" + f"Hybrid detector REQUIRES working ensemble classifier." + ) + print(f"\n[HYBRID] ❌ {error_msg}") + self.ensemble_classifier = None + raise ValueError(error_msg) from e + + # Verify ensemble is functional + if self.ensemble_classifier is None: + error_msg = "HYBRID TRAINING FAILED: Ensemble classifier is None after fit()" + print(f"\n[HYBRID] ❌ {error_msg}") + raise ValueError(error_msg) + + # Verify ensemble has predict_proba method + if not hasattr(self.ensemble_classifier, 'predict_proba'): + error_msg = "HYBRID TRAINING FAILED: Ensemble missing predict_proba method" + print(f"\n[HYBRID] ❌ {error_msg}") + self.ensemble_classifier = None + raise ValueError(error_msg) + + # Verify ensemble can make predictions + try: + test_proba = self.ensemble_classifier.predict_proba(X_scaled[:1]) + if test_proba.shape[1] < 2: + raise ValueError(f"Ensemble produces {test_proba.shape[1]} classes, need 2") + print(f"[HYBRID] ✅ Ensemble verified: produces {test_proba.shape[1]} class probabilities") + except Exception as e: + error_msg = f"HYBRID TRAINING FAILED: Ensemble cannot make predictions: {str(e)}" + print(f"\n[HYBRID] ❌ {error_msg}") + self.ensemble_classifier = None + raise ValueError(error_msg) from e + + print("[HYBRID] Ensemble training completed and verified!") + # Save models self.save_models() - # Calculate statistics - predictions = self.isolation_forest.predict(X_scaled) - anomalies = (predictions == -1).sum() + # FINAL VERIFICATION: Ensure ensemble is still set after save + if self.ensemble_classifier is None: + error_msg = "HYBRID TRAINING FAILED: Ensemble became None after save" + print(f"\n[HYBRID] ❌ {error_msg}") + raise ValueError(error_msg) + # Calculate statistics - only after ALL verifications passed result = { 'records_processed': len(logs_df), 'unique_ips': len(features_df), 'features_total': len(self.feature_names), 'features_selected': len(self.selected_feature_names), - 'anomalies_detected': int(anomalies), + 'features_count': len(self.selected_feature_names), # For backward compatibility with /train endpoint + 'anomalies_detected': int(anomalies_count), 'contamination': self.config['eif_contamination'], - 'model_type': 'Extended Isolation Forest' if EIF_AVAILABLE else 'Isolation Forest', - 'status': 'success' + 'model_type': 'Hybrid (EIF + Ensemble)', + 'ensemble_models': ['DecisionTree', 'RandomForest', 'XGBoost'], + 'status': 'success', + 'ensemble_verified': True # Explicit flag for verification } - print(f"[HYBRID] Training completed! {anomalies}/{len(features_df)} IPs flagged as anomalies") + print(f"[HYBRID] ✅ Training completed successfully! {anomalies_count}/{len(features_df)} IPs flagged as anomalies") + print(f"[HYBRID] ✅ Ensemble classifier verified and ready for production") return result def detect( @@ -295,16 +499,48 @@ class MLHybridDetector: # Apply same feature selection X_positive = X.clip(lower=0) + X_positive = X_positive + 1e-10 # Add epsilon X_selected = self.feature_selector.transform(X_positive) X_scaled = self.scaler.transform(X_selected) - # Predictions from Isolation Forest - predictions = self.isolation_forest.predict(X_scaled) - scores = self.isolation_forest.score_samples(X_scaled) + # HYBRID SCORING: Combine Isolation Forest + Ensemble Classifier - # Normalize scores to 0-100 (lower score = more anomalous) - score_min, score_max = scores.min(), scores.max() - risk_scores = 100 * (1 - (scores - score_min) / (score_max - score_min + 1e-10)) + # Step 1: Isolation Forest score (unsupervised anomaly detection) + if_predictions = self.isolation_forest.predict(X_scaled) + if_scores = self.isolation_forest.score_samples(X_scaled) + + # Normalize IF scores to 0-100 (lower score = more anomalous) + if_score_min, if_score_max = if_scores.min(), if_scores.max() + if_risk_scores = 100 * (1 - (if_scores - if_score_min) / (if_score_max - if_score_min + 1e-10)) + + # Step 2: Ensemble score (supervised classification on pseudo-labels) + if self.ensemble_classifier is not None: + print(f"[DETECT] Ensemble classifier available - computing hybrid score...") + + # Get ensemble probability predictions + ensemble_proba = self.ensemble_classifier.predict_proba(X_scaled) + # Probability of being anomaly (class 1) + ensemble_anomaly_proba = ensemble_proba[:, 1] + # Convert to 0-100 scale + ensemble_risk_scores = ensemble_anomaly_proba * 100 + + # Combine scores: weighted average (IF: 40%, Ensemble: 60%) + # Ensemble gets more weight as it's trained on pseudo-labels + risk_scores = 0.4 * if_risk_scores + 0.6 * ensemble_risk_scores + + # Debugging: show score distribution + print(f"[DETECT] IF scores: min={if_risk_scores.min():.1f}, max={if_risk_scores.max():.1f}, mean={if_risk_scores.mean():.1f}") + print(f"[DETECT] Ensemble scores: min={ensemble_risk_scores.min():.1f}, max={ensemble_risk_scores.max():.1f}, mean={ensemble_risk_scores.mean():.1f}") + print(f"[DETECT] Combined scores: min={risk_scores.min():.1f}, max={risk_scores.max():.1f}, mean={risk_scores.mean():.1f}") + print(f"[DETECT] ✅ Hybrid scoring active: 40% IF + 60% Ensemble") + else: + # Fallback to IF-only if ensemble not available + risk_scores = if_risk_scores + print(f"[DETECT] ⚠️ Ensemble NOT available - using IF-only scoring") + print(f"[DETECT] IF scores: min={if_risk_scores.min():.1f}, max={if_risk_scores.max():.1f}, mean={if_risk_scores.mean():.1f}") + + # For backward compatibility + predictions = if_predictions detections = [] for i, (ip, pred, risk_score) in enumerate(zip(source_ips, predictions, risk_scores)): @@ -402,6 +638,11 @@ class MLHybridDetector: joblib.dump(self.scaler, self.model_dir / f"scaler_{timestamp}.pkl") joblib.dump(self.feature_selector, self.model_dir / f"feature_selector_{timestamp}.pkl") + # Save ensemble if available + if self.ensemble_classifier is not None: + joblib.dump(self.ensemble_classifier, self.model_dir / f"ensemble_classifier_{timestamp}.pkl") + joblib.dump(self.ensemble_classifier, self.model_dir / "ensemble_classifier_latest.pkl") + # Save latest (symlinks alternative) joblib.dump(self.isolation_forest, self.model_dir / "isolation_forest_latest.pkl") joblib.dump(self.scaler, self.model_dir / "scaler_latest.pkl") @@ -414,6 +655,7 @@ class MLHybridDetector: 'selected_feature_names': self.selected_feature_names, 'config': self.config, 'metrics': self.metrics, + 'has_ensemble': self.ensemble_classifier is not None, } with open(self.model_dir / f"metadata_{timestamp}.json", 'w') as f: @@ -423,6 +665,8 @@ class MLHybridDetector: json.dump(metadata, f, indent=2) print(f"[HYBRID] Models saved to {self.model_dir}") + if self.ensemble_classifier is not None: + print(f"[HYBRID] Ensemble classifier included") def load_models(self, version: str = 'latest'): """Load models from disk""" @@ -431,6 +675,15 @@ class MLHybridDetector: self.scaler = joblib.load(self.model_dir / f"scaler_{version}.pkl") self.feature_selector = joblib.load(self.model_dir / f"feature_selector_{version}.pkl") + # Try to load ensemble if available + ensemble_path = self.model_dir / f"ensemble_classifier_{version}.pkl" + if ensemble_path.exists(): + self.ensemble_classifier = joblib.load(ensemble_path) + print(f"[HYBRID] Ensemble classifier loaded") + else: + self.ensemble_classifier = None + print(f"[HYBRID] No ensemble classifier found (IF-only mode)") + with open(self.model_dir / f"metadata_{version}.json") as f: metadata = json.load(f) self.feature_names = metadata['feature_names'] @@ -440,6 +693,12 @@ class MLHybridDetector: print(f"[HYBRID] Models loaded (version: {version})") print(f"[HYBRID] Selected features: {len(self.selected_feature_names)}/{len(self.feature_names)}") + + if self.ensemble_classifier is not None: + print(f"[HYBRID] Mode: Hybrid (IF + Ensemble)") + else: + print(f"[HYBRID] Mode: IF-only (Ensemble not available)") + return True except Exception as e: print(f"[HYBRID] Failed to load models: {e}") diff --git a/python_ml/train_hybrid.py b/python_ml/train_hybrid.py index e676f94..9950bdd 100644 --- a/python_ml/train_hybrid.py +++ b/python_ml/train_hybrid.py @@ -286,7 +286,13 @@ def test_on_synthetic(args): metrics = validator.calculate(y_true, y_pred) validator.print_summary(metrics, title="Synthetic Test Results") - print("\n✅ System test completed successfully!") + print("\n✅ System test completed!") + + # Check if ensemble was trained + if detector.ensemble_classifier is None: + print("\n⚠️ WARNING: System running in IF-only mode (no ensemble)") + print(" This may occur with very clean datasets") + print(" Expected metrics will be lower than hybrid mode") return detector, metrics