ids.alfacom.it/python_ml/auto_block.py
marco370 b45b810eb9 Improve IP blocking process by increasing timeouts and adding detailed logging
Increase auto-block timeout to 300s, update systemd service timeout to 480s, and reduce individual MikroTik request timeout to 8s. Add per-router logging for blocking operations.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: intermediate_checkpoint
Replit-Commit-Event-Id: 455f4d8c-e90c-45d5-a7f1-e5f98b1345d3
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/cJuycQ5
2026-02-16 18:35:39 +00:00

97 lines
3.4 KiB
Python

#!/usr/bin/env python3
"""
IDS Auto-Blocking Script
Rileva e blocca automaticamente IP con risk_score >= 80
Eseguito periodicamente da systemd timer (ogni 5 minuti)
Flusso:
1. Chiama Node.js /api/ml/detect per eseguire detection ML
2. Chiama Node.js /api/ml/block-all-critical per bloccare IP critici sui router
"""
import requests
import sys
from datetime import datetime
NODE_API_URL = "http://localhost:5000"
ML_API_URL = "http://localhost:8000"
def auto_block():
"""Esegue detection e blocking automatico degli IP critici"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] Starting auto-block cycle...")
# Step 1: Esegui detection via ML Backend (se disponibile)
try:
print(f"[{timestamp}] Step 1: Detection ML...")
response = requests.post(
f"{ML_API_URL}/detect",
json={
"max_records": 50000,
"hours_back": 1.0,
"risk_threshold": 75.0,
"auto_block": False
},
timeout=120
)
if response.status_code == 200:
data = response.json()
detections = len(data.get("detections", []))
print(f"[{timestamp}] Detection completata: {detections} anomalie rilevate")
else:
print(f"[{timestamp}] Detection API error: HTTP {response.status_code}")
except requests.exceptions.ConnectionError:
print(f"[{timestamp}] ML Backend non raggiungibile, skip detection (blocco IP esistenti continua)")
except requests.exceptions.Timeout:
print(f"[{timestamp}] ML Detection timeout, skip (blocco IP esistenti continua)")
except Exception as e:
print(f"[{timestamp}] Detection error: {e}")
# Step 2: Blocca IP critici (score >= 80) via Node.js
try:
print(f"[{timestamp}] Step 2: Blocco IP critici sui router...")
response = requests.post(
f"{NODE_API_URL}/api/ml/block-all-critical",
json={
"min_score": 80,
"limit": 200,
"list_name": "ddos_blocked"
},
timeout=300
)
if response.status_code == 200:
data = response.json()
blocked = data.get("blocked", 0)
failed = data.get("failed", 0)
skipped = data.get("skipped", 0)
remaining = data.get("remaining", 0)
if blocked > 0:
print(f"[{timestamp}] {blocked} IP bloccati sui router, {failed} falliti, {skipped} gia' bloccati")
else:
print(f"[{timestamp}] Nessun nuovo IP da bloccare ({skipped} gia' bloccati)")
if remaining > 0:
print(f"[{timestamp}] Rimangono {remaining} IP critici da bloccare")
return 0
else:
print(f"[{timestamp}] Block API error: HTTP {response.status_code} - {response.text[:200]}")
return 1
except requests.exceptions.ConnectionError:
print(f"[{timestamp}] ERRORE: Node.js backend non raggiungibile su {NODE_API_URL}")
return 1
except requests.exceptions.Timeout:
print(f"[{timestamp}] ERRORE: Timeout blocco IP (300s)")
return 1
except Exception as e:
print(f"[{timestamp}] ERRORE imprevisto: {type(e).__name__}: {e}")
return 1
if __name__ == "__main__":
exit_code = auto_block()
sys.exit(exit_code)