ids.alfacom.it/python_ml/analytics_aggregator.py
marco370 2b802397d1 Improve analytics data aggregation security and reliability
Enhance security by removing hardcoded paths, implementing a wrapper script for manual execution, and adding robust credential validation in the analytics aggregator.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 47661cde-4285-4ce6-8d00-fb236a5a01b7
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/oGXAoP7
2025-11-24 08:35:46 +00:00

337 lines
13 KiB
Python

"""
Network Analytics Aggregator
Aggrega statistiche traffico (normale + attacchi) ogni ora e giorno
Mantiene dati permanenti per analytics long-term
"""
import psycopg2
from psycopg2.extras import RealDictCursor
import os
from datetime import datetime, timedelta
import json
from typing import Dict, List, Optional
from collections import defaultdict
import sys
class AnalyticsAggregator:
"""
Aggregatore analytics per traffico normale + attacchi
Salva statistiche permanenti in network_analytics
SECURITY: Richiede variabili d'ambiente per credenziali DB.
- Production: Gestite da systemd EnvironmentFile
- Manual: Usare script wrapper run_analytics.sh
"""
def __init__(self):
# Leggi credenziali da variabili d'ambiente (iniettate da systemd o wrapper)
self.db_params = {
'host': os.getenv('PGHOST', 'localhost'),
'port': int(os.getenv('PGPORT', 5432)),
'database': os.getenv('PGDATABASE', 'ids_db'),
'user': os.getenv('PGUSER', 'ids'),
'password': os.getenv('PGPASSWORD', ''),
}
# Fail-fast: verifica credenziali obbligatorie
missing = []
for key in ['PGHOST', 'PGDATABASE', 'PGUSER', 'PGPASSWORD']:
if not os.getenv(key):
missing.append(key)
if missing:
raise ValueError(
f"Credenziali database mancanti: {', '.join(missing)}\n"
f"Esecuzione manuale: usa ./deployment/run_analytics.sh\n"
f"Systemd: verifica EnvironmentFile in ids-analytics-aggregator.service"
)
def get_connection(self):
"""Crea connessione database"""
return psycopg2.connect(**self.db_params)
def aggregate_hourly(self, target_hour: Optional[datetime] = None):
"""
Aggrega statistiche per una specifica ora
Se target_hour è None, usa l'ora precedente
"""
if target_hour is None:
# Usa ora precedente (es: se ora sono le 10:30, aggrega le 09:00-10:00)
now = datetime.now()
target_hour = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=1)
hour_start = target_hour
hour_end = hour_start + timedelta(hours=1)
print(f"[ANALYTICS] Aggregazione oraria: {hour_start.strftime('%Y-%m-%d %H:00')}")
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
# 1. Analizza network_logs nell'ora target
cursor.execute("""
SELECT
source_ip,
COUNT(*) as packets,
SUM(packet_length) as bytes
FROM network_logs
WHERE timestamp >= %s AND timestamp < %s
GROUP BY source_ip
""", (hour_start, hour_end))
traffic_by_ip = {row['source_ip']: row for row in cursor.fetchall()}
if not traffic_by_ip:
print(f"[ANALYTICS] Nessun traffico nell'ora {hour_start.strftime('%H:00')}")
cursor.close()
conn.close()
return
# 2. Identifica IP attaccanti (detections nell'ora)
cursor.execute("""
SELECT DISTINCT source_ip, anomaly_type, risk_score, country
FROM detections
WHERE detected_at >= %s AND detected_at < %s
""", (hour_start, hour_end))
attacker_ips = {}
attacks_by_type = defaultdict(int)
for row in cursor.fetchall():
ip = row['source_ip']
attacker_ips[ip] = row
attacks_by_type[row['anomaly_type']] += 1
# 3. Classifica traffico: normale vs attacco
total_packets = 0
total_bytes = 0
normal_packets = 0
normal_bytes = 0
attack_packets = 0
attack_bytes = 0
traffic_by_country = defaultdict(lambda: {'normal': 0, 'attacks': 0})
attacks_by_country = defaultdict(int)
top_normal = []
top_attackers = []
for ip, stats in traffic_by_ip.items():
packets = stats['packets']
bytes_count = stats['bytes'] or 0
total_packets += packets
total_bytes += bytes_count
if ip in attacker_ips:
# IP attaccante
attack_packets += packets
attack_bytes += bytes_count
country = attacker_ips[ip].get('country')
if country:
traffic_by_country[country]['attacks'] += packets
attacks_by_country[country] += 1
top_attackers.append({
'ip': ip,
'country': country,
'risk_score': float(attacker_ips[ip]['risk_score']),
'packets': packets,
'bytes': bytes_count
})
else:
# IP normale
normal_packets += packets
normal_bytes += bytes_count
# Lookup paese per IP normale (da detections precedenti o geo cache)
cursor.execute("""
SELECT country FROM detections
WHERE source_ip = %s AND country IS NOT NULL
ORDER BY detected_at DESC LIMIT 1
""", (ip,))
geo_row = cursor.fetchone()
country = geo_row['country'] if geo_row else None
if country:
traffic_by_country[country]['normal'] += packets
top_normal.append({
'ip': ip,
'packets': packets,
'bytes': bytes_count,
'country': country
})
# 4. Top 10 per categoria
top_normal = sorted(top_normal, key=lambda x: x['packets'], reverse=True)[:10]
top_attackers = sorted(top_attackers, key=lambda x: x['risk_score'], reverse=True)[:10]
# 5. Salva aggregazione
cursor.execute("""
INSERT INTO network_analytics (
date, hour,
total_packets, total_bytes, unique_ips,
normal_packets, normal_bytes, normal_unique_ips, top_normal_ips,
attack_packets, attack_bytes, attack_unique_ips,
attacks_by_country, attacks_by_type, top_attackers,
traffic_by_country
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (date, hour) DO UPDATE SET
total_packets = EXCLUDED.total_packets,
total_bytes = EXCLUDED.total_bytes,
unique_ips = EXCLUDED.unique_ips,
normal_packets = EXCLUDED.normal_packets,
normal_bytes = EXCLUDED.normal_bytes,
normal_unique_ips = EXCLUDED.normal_unique_ips,
top_normal_ips = EXCLUDED.top_normal_ips,
attack_packets = EXCLUDED.attack_packets,
attack_bytes = EXCLUDED.attack_bytes,
attack_unique_ips = EXCLUDED.attack_unique_ips,
attacks_by_country = EXCLUDED.attacks_by_country,
attacks_by_type = EXCLUDED.attacks_by_type,
top_attackers = EXCLUDED.top_attackers,
traffic_by_country = EXCLUDED.traffic_by_country
""", (
target_hour.date(),
target_hour.hour,
total_packets,
total_bytes,
len(traffic_by_ip),
normal_packets,
normal_bytes,
len(traffic_by_ip) - len(attacker_ips),
json.dumps(top_normal),
attack_packets,
attack_bytes,
len(attacker_ips),
json.dumps(dict(attacks_by_country)),
json.dumps(dict(attacks_by_type)),
json.dumps(top_attackers),
json.dumps({k: dict(v) for k, v in traffic_by_country.items()})
))
conn.commit()
print(f"[ANALYTICS] ✅ Aggregazione completata:")
print(f" - Totale: {total_packets} pacchetti, {len(traffic_by_ip)} IP unici")
print(f" - Normale: {normal_packets} pacchetti ({normal_packets*100//total_packets if total_packets else 0}%)")
print(f" - Attacchi: {attack_packets} pacchetti ({attack_packets*100//total_packets if total_packets else 0}%), {len(attacker_ips)} IP")
except Exception as e:
print(f"[ANALYTICS] ❌ Errore aggregazione oraria: {e}")
conn.rollback()
raise
finally:
cursor.close()
conn.close()
def aggregate_daily(self, target_date: Optional[datetime] = None):
"""
Aggrega statistiche giornaliere (somma delle ore)
Se target_date è None, usa giorno precedente
"""
if target_date is None:
target_date = datetime.now().date() - timedelta(days=1)
print(f"[ANALYTICS] Aggregazione giornaliera: {target_date}")
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
# Somma aggregazioni orarie del giorno
cursor.execute("""
SELECT
SUM(total_packets) as total_packets,
SUM(total_bytes) as total_bytes,
MAX(unique_ips) as unique_ips,
SUM(normal_packets) as normal_packets,
SUM(normal_bytes) as normal_bytes,
MAX(normal_unique_ips) as normal_unique_ips,
SUM(attack_packets) as attack_packets,
SUM(attack_bytes) as attack_bytes,
MAX(attack_unique_ips) as attack_unique_ips
FROM network_analytics
WHERE date = %s AND hour IS NOT NULL
""", (target_date,))
daily_stats = cursor.fetchone()
if not daily_stats or not daily_stats['total_packets']:
print(f"[ANALYTICS] Nessun dato per {target_date}")
cursor.close()
conn.close()
return
# Merge JSON fields (countries, types, top IPs)
# TODO: Implementare merge intelligente se necessario
# Salva aggregazione giornaliera (hour = NULL)
cursor.execute("""
INSERT INTO network_analytics (
date, hour,
total_packets, total_bytes, unique_ips,
normal_packets, normal_bytes, normal_unique_ips,
attack_packets, attack_bytes, attack_unique_ips
)
VALUES (%s, NULL, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (date, hour) DO UPDATE SET
total_packets = EXCLUDED.total_packets,
total_bytes = EXCLUDED.total_bytes,
unique_ips = EXCLUDED.unique_ips,
normal_packets = EXCLUDED.normal_packets,
normal_bytes = EXCLUDED.normal_bytes,
normal_unique_ips = EXCLUDED.normal_unique_ips,
attack_packets = EXCLUDED.attack_packets,
attack_bytes = EXCLUDED.attack_bytes,
attack_unique_ips = EXCLUDED.attack_unique_ips
""", (
target_date,
daily_stats['total_packets'],
daily_stats['total_bytes'],
daily_stats['unique_ips'],
daily_stats['normal_packets'],
daily_stats['normal_bytes'],
daily_stats['normal_unique_ips'],
daily_stats['attack_packets'],
daily_stats['attack_bytes'],
daily_stats['attack_unique_ips']
))
conn.commit()
print(f"[ANALYTICS] ✅ Aggregazione giornaliera completata")
except Exception as e:
print(f"[ANALYTICS] ❌ Errore aggregazione giornaliera: {e}")
conn.rollback()
raise
finally:
cursor.close()
conn.close()
def main():
"""Entry point"""
aggregator = AnalyticsAggregator()
if len(sys.argv) > 1:
if sys.argv[1] == 'hourly':
aggregator.aggregate_hourly()
elif sys.argv[1] == 'daily':
aggregator.aggregate_daily()
else:
print("Usage: python analytics_aggregator.py [hourly|daily]")
else:
# Default: aggregazione oraria
aggregator.aggregate_hourly()
if __name__ == '__main__':
main()