#!/usr/bin/env python3 """ IDS Auto-Blocking Script Rileva e blocca automaticamente IP con risk_score >= 80 Eseguito periodicamente da systemd timer (ogni 5 minuti) Flusso: 1. Chiama Node.js /api/ml/detect per eseguire detection ML 2. Chiama Node.js /api/ml/block-all-critical per bloccare IP critici sui router """ import requests import sys from datetime import datetime NODE_API_URL = "http://localhost:5000" ML_API_URL = "http://localhost:8000" def auto_block(): """Esegue detection e blocking automatico degli IP critici""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] Starting auto-block cycle...") # Step 1: Esegui detection via ML Backend (se disponibile) try: print(f"[{timestamp}] Step 1: Detection ML...") response = requests.post( f"{ML_API_URL}/detect", json={ "max_records": 50000, "hours_back": 1.0, "risk_threshold": 75.0, "auto_block": False }, timeout=120 ) if response.status_code == 200: data = response.json() detections = len(data.get("detections", [])) print(f"[{timestamp}] Detection completata: {detections} anomalie rilevate") else: print(f"[{timestamp}] Detection API error: HTTP {response.status_code}") except requests.exceptions.ConnectionError: print(f"[{timestamp}] ML Backend non raggiungibile, skip detection (blocco IP esistenti continua)") except requests.exceptions.Timeout: print(f"[{timestamp}] ML Detection timeout, skip (blocco IP esistenti continua)") except Exception as e: print(f"[{timestamp}] Detection error: {e}") # Step 2: Blocca IP critici (score >= 80) via Node.js try: print(f"[{timestamp}] Step 2: Blocco IP critici sui router...") response = requests.post( f"{NODE_API_URL}/api/ml/block-all-critical", json={ "min_score": 80, "limit": 200, "list_name": "ddos_blocked" }, timeout=300 ) if response.status_code == 200: data = response.json() blocked = data.get("blocked", 0) failed = data.get("failed", 0) skipped = data.get("skipped", 0) remaining = data.get("remaining", 0) if blocked > 0: print(f"[{timestamp}] {blocked} IP bloccati sui router, {failed} falliti, {skipped} gia' bloccati") else: print(f"[{timestamp}] Nessun nuovo IP da bloccare ({skipped} gia' bloccati)") if remaining > 0: print(f"[{timestamp}] Rimangono {remaining} IP critici da bloccare") return 0 else: print(f"[{timestamp}] Block API error: HTTP {response.status_code} - {response.text[:200]}") return 1 except requests.exceptions.ConnectionError: print(f"[{timestamp}] ERRORE: Node.js backend non raggiungibile su {NODE_API_URL}") return 1 except requests.exceptions.Timeout: print(f"[{timestamp}] ERRORE: Timeout blocco IP (300s)") return 1 except Exception as e: print(f"[{timestamp}] ERRORE imprevisto: {type(e).__name__}: {e}") return 1 if __name__ == "__main__": exit_code = auto_block() sys.exit(exit_code)