ids.alfacom.it/python_ml/cleanup_detections.py
marco370 791b7caa4d Add automatic cleanup for old detections and IP blocks
Implement automated detection cleanup after 48 hours and IP unblocking after 2 hours using systemd timers and Python scripts.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: intermediate_checkpoint
Replit-Commit-Event-Id: 3809a8a0-8dd5-4b5a-9e32-9e075dab335e
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/L6QSDnx
2025-11-25 10:40:44 +00:00

170 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""
IDS - Cleanup Detections Script
================================
Automatizza la pulizia delle detections e lo sblocco degli IP secondo le regole:
1. Cancella detections non anomale dopo 48 ore
2. Sblocca IP bloccati se non più anomali dopo 2 ore
Esecuzione: Ogni ora via cron/systemd timer
"""
import os
import sys
import logging
from datetime import datetime, timedelta
import psycopg2
from psycopg2.extras import RealDictCursor
from dotenv import load_dotenv
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
handlers=[
logging.FileHandler('/var/log/ids/cleanup.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Load environment
load_dotenv()
def get_db_connection():
"""Connessione al database PostgreSQL"""
return psycopg2.connect(
host=os.getenv('PGHOST', 'localhost'),
port=int(os.getenv('PGPORT', 5432)),
user=os.getenv('PGUSER'),
password=os.getenv('PGPASSWORD'),
database=os.getenv('PGDATABASE')
)
def cleanup_old_detections(conn, hours=48):
"""
Cancella detections vecchie di più di N ore.
Logica: Se un IP è stato rilevato ma dopo 48 ore non è più
considerato anomalo (non appare in nuove detections), eliminalo.
"""
cursor = conn.cursor(cursor_factory=RealDictCursor)
cutoff_time = datetime.now() - timedelta(hours=hours)
# Conta detections da eliminare
cursor.execute("""
SELECT COUNT(*) as count
FROM detections
WHERE detected_at < %s
AND blocked = false
""", (cutoff_time,))
count = cursor.fetchone()['count']
if count > 0:
logger.info(f"Trovate {count} detections da eliminare (più vecchie di {hours}h)")
# Elimina
cursor.execute("""
DELETE FROM detections
WHERE detected_at < %s
AND blocked = false
""", (cutoff_time,))
conn.commit()
logger.info(f"✅ Eliminate {cursor.rowcount} detections vecchie")
else:
logger.info(f"Nessuna detection da eliminare (soglia: {hours}h)")
cursor.close()
return count
def unblock_old_ips(conn, hours=2):
"""
Sblocca IP bloccati da più di N ore.
Logica: Se un IP è stato bloccato ma dopo 2 ore non è più
anomalo (nessuna nuova detection), sbloccalo dal DB.
NOTA: Questo NON rimuove l'IP dalle firewall list dei router MikroTik.
Per quello serve chiamare l'API /unblock-ip del ML backend.
"""
cursor = conn.cursor(cursor_factory=RealDictCursor)
cutoff_time = datetime.now() - timedelta(hours=hours)
# Trova IP bloccati da più di N ore senza nuove detections
cursor.execute("""
SELECT d.source_ip, d.blocked_at, d.anomaly_type, d.risk_score
FROM detections d
WHERE d.blocked = true
AND d.blocked_at < %s
AND NOT EXISTS (
SELECT 1 FROM detections d2
WHERE d2.source_ip = d.source_ip
AND d2.detected_at > %s
)
""", (cutoff_time, cutoff_time))
ips_to_unblock = cursor.fetchall()
if ips_to_unblock:
logger.info(f"Trovati {len(ips_to_unblock)} IP da sbloccare (bloccati da più di {hours}h)")
for ip_data in ips_to_unblock:
ip = ip_data['source_ip']
logger.info(f" - {ip} (tipo: {ip_data['anomaly_type']}, score: {ip_data['risk_score']})")
# Aggiorna DB
cursor.execute("""
UPDATE detections
SET blocked = false, blocked_at = NULL
WHERE source_ip = %s
""", (ip,))
conn.commit()
logger.info(f"✅ Sbloccati {len(ips_to_unblock)} IP nel database")
logger.warning("⚠️ ATTENZIONE: IP ancora presenti nelle firewall list MikroTik!")
logger.info("💡 Per rimuoverli dai router, usa: curl -X POST http://localhost:8000/unblock-ip -d '{\"ip_address\": \"X.X.X.X\"}'")
else:
logger.info(f"Nessun IP da sbloccare (soglia: {hours}h)")
cursor.close()
return len(ips_to_unblock)
def main():
"""Esecuzione cleanup completo"""
logger.info("=" * 60)
logger.info("CLEANUP DETECTIONS - Avvio")
logger.info("=" * 60)
try:
conn = get_db_connection()
logger.info("✅ Connesso al database")
# 1. Cleanup detections vecchie (48h)
logger.info("\n[1/2] Cleanup detections vecchie...")
deleted_count = cleanup_old_detections(conn, hours=48)
# 2. Sblocco IP vecchi (2h)
logger.info("\n[2/2] Sblocco IP vecchi...")
unblocked_count = unblock_old_ips(conn, hours=2)
conn.close()
logger.info("\n" + "=" * 60)
logger.info("CLEANUP COMPLETATO")
logger.info(f" - Detections eliminate: {deleted_count}")
logger.info(f" - IP sbloccati (DB): {unblocked_count}")
logger.info("=" * 60)
return 0
except Exception as e:
logger.error(f"❌ Errore durante cleanup: {e}", exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())