""" Network Analytics Aggregator Aggrega statistiche traffico (normale + attacchi) ogni ora e giorno Mantiene dati permanenti per analytics long-term """ import psycopg2 from psycopg2.extras import RealDictCursor import os from datetime import datetime, timedelta, date import json from typing import Dict, List, Optional from collections import defaultdict import sys class AnalyticsAggregator: """ Aggregatore analytics per traffico normale + attacchi Salva statistiche permanenti in network_analytics SECURITY: Richiede variabili d'ambiente per credenziali DB. - Production: Gestite da systemd EnvironmentFile - Manual: Usare script wrapper run_analytics.sh """ def __init__(self): # Leggi credenziali da variabili d'ambiente (iniettate da systemd o wrapper) self.db_params = { 'host': os.getenv('PGHOST', 'localhost'), 'port': int(os.getenv('PGPORT', 5432)), 'database': os.getenv('PGDATABASE', 'ids_db'), 'user': os.getenv('PGUSER', 'ids'), 'password': os.getenv('PGPASSWORD', ''), } # Fail-fast: verifica credenziali obbligatorie missing = [] for key in ['PGHOST', 'PGDATABASE', 'PGUSER', 'PGPASSWORD']: if not os.getenv(key): missing.append(key) if missing: raise ValueError( f"Credenziali database mancanti: {', '.join(missing)}\n" f"Esecuzione manuale: usa ./deployment/run_analytics.sh\n" f"Systemd: verifica EnvironmentFile in ids-analytics-aggregator.service" ) def get_connection(self): """Crea connessione database""" return psycopg2.connect(**self.db_params) def aggregate_hourly(self, target_hour: Optional[datetime] = None): """ Aggrega statistiche per una specifica ora Se target_hour è None, usa l'ora precedente """ if target_hour is None: # Usa ora precedente (es: se ora sono le 10:30, aggrega le 09:00-10:00) now = datetime.now() target_hour = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=1) hour_start = target_hour hour_end = hour_start + timedelta(hours=1) print(f"[ANALYTICS] Aggregazione oraria: {hour_start.strftime('%Y-%m-%d %H:00')}") conn = self.get_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) try: # 1. Analizza network_logs nell'ora target cursor.execute(""" SELECT source_ip, COUNT(*) as packets, SUM(packet_length) as bytes FROM network_logs WHERE timestamp >= %s AND timestamp < %s GROUP BY source_ip """, (hour_start, hour_end)) traffic_by_ip = {row['source_ip']: row for row in cursor.fetchall()} if not traffic_by_ip: print(f"[ANALYTICS] Nessun traffico nell'ora {hour_start.strftime('%H:00')}") cursor.close() conn.close() return # 2. Identifica IP attaccanti (detections nell'ora) # NOTA: SELECT DISTINCT su tutte le colonne per gestire IP con più anomaly_type cursor.execute(""" SELECT source_ip, anomaly_type, risk_score, country FROM detections WHERE detected_at >= %s AND detected_at < %s ORDER BY source_ip, risk_score DESC """, (hour_start, hour_end)) attacker_ips = {} for row in cursor.fetchall(): ip = row['source_ip'] # Mantieni solo la detection con risk_score più alto per IP # (evita duplicati ma tiene traccia del tipo principale) if ip not in attacker_ips: attacker_ips[ip] = row # 3. Classifica traffico: normale vs attacco total_packets = 0 total_bytes = 0 normal_packets = 0 normal_bytes = 0 attack_packets = 0 attack_bytes = 0 traffic_by_country = defaultdict(lambda: {'normal': 0, 'attacks': 0}) attacks_by_country = defaultdict(int) attacks_by_type = defaultdict(int) top_normal = [] top_attackers = [] for ip, stats in traffic_by_ip.items(): packets = stats['packets'] bytes_count = stats['bytes'] or 0 total_packets += packets total_bytes += bytes_count if ip in attacker_ips: # IP attaccante attack_packets += packets attack_bytes += bytes_count # Conta pacchetti per tipo di attacco (non solo occorrenze!) anomaly_type = attacker_ips[ip].get('anomaly_type') if anomaly_type: attacks_by_type[anomaly_type] += packets else: # Fallback per attacchi senza tipo classificato attacks_by_type['unknown'] += packets country = attacker_ips[ip].get('country') if country: traffic_by_country[country]['attacks'] += packets attacks_by_country[country] += packets else: # Fallback per attacchi senza geolocalizzazione traffic_by_country['Unknown']['attacks'] += packets attacks_by_country['Unknown'] += packets top_attackers.append({ 'ip': ip, 'country': country or 'Unknown', 'risk_score': float(attacker_ips[ip]['risk_score']), 'packets': packets, 'bytes': bytes_count }) else: # IP normale normal_packets += packets normal_bytes += bytes_count # Lookup paese per IP normale (da detections precedenti o geo cache) cursor.execute(""" SELECT country FROM detections WHERE source_ip = %s AND country IS NOT NULL ORDER BY detected_at DESC LIMIT 1 """, (ip,)) geo_row = cursor.fetchone() country = geo_row['country'] if geo_row else None if country: traffic_by_country[country]['normal'] += packets top_normal.append({ 'ip': ip, 'packets': packets, 'bytes': bytes_count, 'country': country }) # 4. Top 10 per categoria top_normal = sorted(top_normal, key=lambda x: x['packets'], reverse=True)[:10] top_attackers = sorted(top_attackers, key=lambda x: x['risk_score'], reverse=True)[:10] # 5. Salva aggregazione cursor.execute(""" INSERT INTO network_analytics ( date, hour, total_packets, total_bytes, unique_ips, normal_packets, normal_bytes, normal_unique_ips, top_normal_ips, attack_packets, attack_bytes, attack_unique_ips, attacks_by_country, attacks_by_type, top_attackers, traffic_by_country ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (date, hour) DO UPDATE SET total_packets = EXCLUDED.total_packets, total_bytes = EXCLUDED.total_bytes, unique_ips = EXCLUDED.unique_ips, normal_packets = EXCLUDED.normal_packets, normal_bytes = EXCLUDED.normal_bytes, normal_unique_ips = EXCLUDED.normal_unique_ips, top_normal_ips = EXCLUDED.top_normal_ips, attack_packets = EXCLUDED.attack_packets, attack_bytes = EXCLUDED.attack_bytes, attack_unique_ips = EXCLUDED.attack_unique_ips, attacks_by_country = EXCLUDED.attacks_by_country, attacks_by_type = EXCLUDED.attacks_by_type, top_attackers = EXCLUDED.top_attackers, traffic_by_country = EXCLUDED.traffic_by_country """, ( target_hour.date(), target_hour.hour, total_packets, total_bytes, len(traffic_by_ip), normal_packets, normal_bytes, len(traffic_by_ip) - len(attacker_ips), json.dumps(top_normal), attack_packets, attack_bytes, len(attacker_ips), json.dumps(dict(attacks_by_country)), json.dumps(dict(attacks_by_type)), json.dumps(top_attackers), json.dumps({k: dict(v) for k, v in traffic_by_country.items()}) )) conn.commit() # Validazione coerenza breakdown breakdown_total_type = sum(attacks_by_type.values()) breakdown_total_country = sum(attacks_by_country.values()) print(f"[ANALYTICS] ✅ Aggregazione completata:") print(f" - Totale: {total_packets} pacchetti, {len(traffic_by_ip)} IP unici") print(f" - Normale: {normal_packets} pacchetti ({normal_packets*100//total_packets if total_packets else 0}%)") print(f" - Attacchi: {attack_packets} pacchetti ({attack_packets*100//total_packets if total_packets else 0}%), {len(attacker_ips)} IP") print(f" - Breakdown types: {breakdown_total_type} pacchetti (match: {breakdown_total_type == attack_packets})") print(f" - Breakdown countries: {breakdown_total_country} pacchetti (match: {breakdown_total_country == attack_packets})") if breakdown_total_type != attack_packets: print(f" ⚠️ MISMATCH tipi: {attack_packets - breakdown_total_type} pacchetti non classificati") if breakdown_total_country != attack_packets: print(f" ⚠️ MISMATCH paesi: {attack_packets - breakdown_total_country} pacchetti senza geo") except Exception as e: print(f"[ANALYTICS] ❌ Errore aggregazione oraria: {e}") conn.rollback() raise finally: cursor.close() conn.close() def aggregate_daily(self, target_date: Optional[date] = None): """ Aggrega statistiche giornaliere (somma delle ore) Se target_date è None, usa giorno precedente """ if target_date is None: target_date = datetime.now().date() - timedelta(days=1) print(f"[ANALYTICS] Aggregazione giornaliera: {target_date}") conn = self.get_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) try: # Somma aggregazioni orarie del giorno cursor.execute(""" SELECT SUM(total_packets) as total_packets, SUM(total_bytes) as total_bytes, MAX(unique_ips) as unique_ips, SUM(normal_packets) as normal_packets, SUM(normal_bytes) as normal_bytes, MAX(normal_unique_ips) as normal_unique_ips, SUM(attack_packets) as attack_packets, SUM(attack_bytes) as attack_bytes, MAX(attack_unique_ips) as attack_unique_ips FROM network_analytics WHERE date = %s AND hour IS NOT NULL """, (target_date,)) daily_stats = cursor.fetchone() if not daily_stats or not daily_stats['total_packets']: print(f"[ANALYTICS] Nessun dato per {target_date}") cursor.close() conn.close() return # Merge JSON fields (countries, types, top IPs) # TODO: Implementare merge intelligente se necessario # Salva aggregazione giornaliera (hour = NULL) cursor.execute(""" INSERT INTO network_analytics ( date, hour, total_packets, total_bytes, unique_ips, normal_packets, normal_bytes, normal_unique_ips, attack_packets, attack_bytes, attack_unique_ips ) VALUES (%s, NULL, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (date, hour) DO UPDATE SET total_packets = EXCLUDED.total_packets, total_bytes = EXCLUDED.total_bytes, unique_ips = EXCLUDED.unique_ips, normal_packets = EXCLUDED.normal_packets, normal_bytes = EXCLUDED.normal_bytes, normal_unique_ips = EXCLUDED.normal_unique_ips, attack_packets = EXCLUDED.attack_packets, attack_bytes = EXCLUDED.attack_bytes, attack_unique_ips = EXCLUDED.attack_unique_ips """, ( target_date, daily_stats['total_packets'], daily_stats['total_bytes'], daily_stats['unique_ips'], daily_stats['normal_packets'], daily_stats['normal_bytes'], daily_stats['normal_unique_ips'], daily_stats['attack_packets'], daily_stats['attack_bytes'], daily_stats['attack_unique_ips'] )) conn.commit() print(f"[ANALYTICS] ✅ Aggregazione giornaliera completata") except Exception as e: print(f"[ANALYTICS] ❌ Errore aggregazione giornaliera: {e}") conn.rollback() raise finally: cursor.close() conn.close() def main(): """Entry point""" aggregator = AnalyticsAggregator() if len(sys.argv) > 1: if sys.argv[1] == 'hourly': aggregator.aggregate_hourly() elif sys.argv[1] == 'daily': aggregator.aggregate_daily() else: print("Usage: python analytics_aggregator.py [hourly|daily]") else: # Default: aggregazione oraria aggregator.aggregate_hourly() if __name__ == '__main__': main()