ids.alfacom.it/python_ml/auto_block.py
marco370 a7967260b1 Improve IP blocking by separating detection and blocking steps
Refactor auto_block.py to call the Node.js backend for blocking critical IPs and adjust the auto-block service configuration.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: aef8a3be-adf0-4bdc-942f-3e7b19be7d72
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/4aeldgV
2026-02-16 15:04:35 +00:00

97 lines
3.4 KiB
Python

#!/usr/bin/env python3
"""
IDS Auto-Blocking Script
Rileva e blocca automaticamente IP con risk_score >= 80
Eseguito periodicamente da systemd timer (ogni 5 minuti)
Flusso:
1. Chiama Node.js /api/ml/detect per eseguire detection ML
2. Chiama Node.js /api/ml/block-all-critical per bloccare IP critici sui router
"""
import requests
import sys
from datetime import datetime
NODE_API_URL = "http://localhost:5000"
ML_API_URL = "http://localhost:8000"
def auto_block():
"""Esegue detection e blocking automatico degli IP critici"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] Starting auto-block cycle...")
# Step 1: Esegui detection via ML Backend (se disponibile)
try:
print(f"[{timestamp}] Step 1: Detection ML...")
response = requests.post(
f"{ML_API_URL}/detect",
json={
"max_records": 50000,
"hours_back": 1.0,
"risk_threshold": 75.0,
"auto_block": False
},
timeout=120
)
if response.status_code == 200:
data = response.json()
detections = len(data.get("detections", []))
print(f"[{timestamp}] Detection completata: {detections} anomalie rilevate")
else:
print(f"[{timestamp}] Detection API error: HTTP {response.status_code}")
except requests.exceptions.ConnectionError:
print(f"[{timestamp}] ML Backend non raggiungibile, skip detection (blocco IP esistenti continua)")
except requests.exceptions.Timeout:
print(f"[{timestamp}] ML Detection timeout, skip (blocco IP esistenti continua)")
except Exception as e:
print(f"[{timestamp}] Detection error: {e}")
# Step 2: Blocca IP critici (score >= 80) via Node.js
try:
print(f"[{timestamp}] Step 2: Blocco IP critici sui router...")
response = requests.post(
f"{NODE_API_URL}/api/ml/block-all-critical",
json={
"min_score": 80,
"limit": 200,
"list_name": "ddos_blocked"
},
timeout=120
)
if response.status_code == 200:
data = response.json()
blocked = data.get("blocked", 0)
failed = data.get("failed", 0)
skipped = data.get("skipped", 0)
remaining = data.get("remaining", 0)
if blocked > 0:
print(f"[{timestamp}] {blocked} IP bloccati sui router, {failed} falliti, {skipped} gia' bloccati")
else:
print(f"[{timestamp}] Nessun nuovo IP da bloccare ({skipped} gia' bloccati)")
if remaining > 0:
print(f"[{timestamp}] Rimangono {remaining} IP critici da bloccare")
return 0
else:
print(f"[{timestamp}] Block API error: HTTP {response.status_code} - {response.text[:200]}")
return 1
except requests.exceptions.ConnectionError:
print(f"[{timestamp}] ERRORE: Node.js backend non raggiungibile su {NODE_API_URL}")
return 1
except requests.exceptions.Timeout:
print(f"[{timestamp}] ERRORE: Timeout blocco IP (120s)")
return 1
except Exception as e:
print(f"[{timestamp}] ERRORE imprevisto: {type(e).__name__}: {e}")
return 1
if __name__ == "__main__":
exit_code = auto_block()
sys.exit(exit_code)