ids.alfacom.it/python_ml/analytics_aggregator.py
marco370 cbd03d9e64 Add network analytics and live dashboard features
Introduce new network analytics capabilities with persistent storage, hourly and daily aggregations, and enhanced frontend visualizations. This includes API endpoints for retrieving analytics data, systemd services for automated aggregation, and UI updates for live and historical dashboards. Additionally, country flag emojis are now displayed on the detections page.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: intermediate_checkpoint
Replit-Commit-Event-Id: 3c14f651-7633-4128-8526-314b4942b3a0
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/oGXAoP7
2025-11-22 11:34:36 +00:00

319 lines
12 KiB
Python

"""
Network Analytics Aggregator
Aggrega statistiche traffico (normale + attacchi) ogni ora e giorno
Mantiene dati permanenti per analytics long-term
"""
import psycopg2
from psycopg2.extras import RealDictCursor
import os
from datetime import datetime, timedelta
import json
from typing import Dict, List, Optional
from collections import defaultdict
import sys
class AnalyticsAggregator:
"""
Aggregatore analytics per traffico normale + attacchi
Salva statistiche permanenti in network_analytics
"""
def __init__(self):
self.db_params = {
'host': os.getenv('PGHOST', 'localhost'),
'port': int(os.getenv('PGPORT', 5432)),
'database': os.getenv('PGDATABASE', 'ids_db'),
'user': os.getenv('PGUSER', 'ids'),
'password': os.getenv('PGPASSWORD', ''),
}
def get_connection(self):
"""Crea connessione database"""
return psycopg2.connect(**self.db_params)
def aggregate_hourly(self, target_hour: Optional[datetime] = None):
"""
Aggrega statistiche per una specifica ora
Se target_hour è None, usa l'ora precedente
"""
if target_hour is None:
# Usa ora precedente (es: se ora sono le 10:30, aggrega le 09:00-10:00)
now = datetime.now()
target_hour = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=1)
hour_start = target_hour
hour_end = hour_start + timedelta(hours=1)
print(f"[ANALYTICS] Aggregazione oraria: {hour_start.strftime('%Y-%m-%d %H:00')}")
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
# 1. Analizza network_logs nell'ora target
cursor.execute("""
SELECT
source_ip,
COUNT(*) as packets,
SUM(packet_length) as bytes
FROM network_logs
WHERE timestamp >= %s AND timestamp < %s
GROUP BY source_ip
""", (hour_start, hour_end))
traffic_by_ip = {row['source_ip']: row for row in cursor.fetchall()}
if not traffic_by_ip:
print(f"[ANALYTICS] Nessun traffico nell'ora {hour_start.strftime('%H:00')}")
cursor.close()
conn.close()
return
# 2. Identifica IP attaccanti (detections nell'ora)
cursor.execute("""
SELECT DISTINCT source_ip, anomaly_type, risk_score, country
FROM detections
WHERE detected_at >= %s AND detected_at < %s
""", (hour_start, hour_end))
attacker_ips = {}
attacks_by_type = defaultdict(int)
for row in cursor.fetchall():
ip = row['source_ip']
attacker_ips[ip] = row
attacks_by_type[row['anomaly_type']] += 1
# 3. Classifica traffico: normale vs attacco
total_packets = 0
total_bytes = 0
normal_packets = 0
normal_bytes = 0
attack_packets = 0
attack_bytes = 0
traffic_by_country = defaultdict(lambda: {'normal': 0, 'attacks': 0})
attacks_by_country = defaultdict(int)
top_normal = []
top_attackers = []
for ip, stats in traffic_by_ip.items():
packets = stats['packets']
bytes_count = stats['bytes'] or 0
total_packets += packets
total_bytes += bytes_count
if ip in attacker_ips:
# IP attaccante
attack_packets += packets
attack_bytes += bytes_count
country = attacker_ips[ip].get('country')
if country:
traffic_by_country[country]['attacks'] += packets
attacks_by_country[country] += 1
top_attackers.append({
'ip': ip,
'country': country,
'risk_score': float(attacker_ips[ip]['risk_score']),
'packets': packets,
'bytes': bytes_count
})
else:
# IP normale
normal_packets += packets
normal_bytes += bytes_count
# Lookup paese per IP normale (da detections precedenti o geo cache)
cursor.execute("""
SELECT country FROM detections
WHERE source_ip = %s AND country IS NOT NULL
ORDER BY detected_at DESC LIMIT 1
""", (ip,))
geo_row = cursor.fetchone()
country = geo_row['country'] if geo_row else None
if country:
traffic_by_country[country]['normal'] += packets
top_normal.append({
'ip': ip,
'packets': packets,
'bytes': bytes_count,
'country': country
})
# 4. Top 10 per categoria
top_normal = sorted(top_normal, key=lambda x: x['packets'], reverse=True)[:10]
top_attackers = sorted(top_attackers, key=lambda x: x['risk_score'], reverse=True)[:10]
# 5. Salva aggregazione
cursor.execute("""
INSERT INTO network_analytics (
date, hour,
total_packets, total_bytes, unique_ips,
normal_packets, normal_bytes, normal_unique_ips, top_normal_ips,
attack_packets, attack_bytes, attack_unique_ips,
attacks_by_country, attacks_by_type, top_attackers,
traffic_by_country
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (date, hour) DO UPDATE SET
total_packets = EXCLUDED.total_packets,
total_bytes = EXCLUDED.total_bytes,
unique_ips = EXCLUDED.unique_ips,
normal_packets = EXCLUDED.normal_packets,
normal_bytes = EXCLUDED.normal_bytes,
normal_unique_ips = EXCLUDED.normal_unique_ips,
top_normal_ips = EXCLUDED.top_normal_ips,
attack_packets = EXCLUDED.attack_packets,
attack_bytes = EXCLUDED.attack_bytes,
attack_unique_ips = EXCLUDED.attack_unique_ips,
attacks_by_country = EXCLUDED.attacks_by_country,
attacks_by_type = EXCLUDED.attacks_by_type,
top_attackers = EXCLUDED.top_attackers,
traffic_by_country = EXCLUDED.traffic_by_country
""", (
target_hour.date(),
target_hour.hour,
total_packets,
total_bytes,
len(traffic_by_ip),
normal_packets,
normal_bytes,
len(traffic_by_ip) - len(attacker_ips),
json.dumps(top_normal),
attack_packets,
attack_bytes,
len(attacker_ips),
json.dumps(dict(attacks_by_country)),
json.dumps(dict(attacks_by_type)),
json.dumps(top_attackers),
json.dumps({k: dict(v) for k, v in traffic_by_country.items()})
))
conn.commit()
print(f"[ANALYTICS] ✅ Aggregazione completata:")
print(f" - Totale: {total_packets} pacchetti, {len(traffic_by_ip)} IP unici")
print(f" - Normale: {normal_packets} pacchetti ({normal_packets*100//total_packets if total_packets else 0}%)")
print(f" - Attacchi: {attack_packets} pacchetti ({attack_packets*100//total_packets if total_packets else 0}%), {len(attacker_ips)} IP")
except Exception as e:
print(f"[ANALYTICS] ❌ Errore aggregazione oraria: {e}")
conn.rollback()
raise
finally:
cursor.close()
conn.close()
def aggregate_daily(self, target_date: Optional[datetime] = None):
"""
Aggrega statistiche giornaliere (somma delle ore)
Se target_date è None, usa giorno precedente
"""
if target_date is None:
target_date = datetime.now().date() - timedelta(days=1)
print(f"[ANALYTICS] Aggregazione giornaliera: {target_date}")
conn = self.get_connection()
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
# Somma aggregazioni orarie del giorno
cursor.execute("""
SELECT
SUM(total_packets) as total_packets,
SUM(total_bytes) as total_bytes,
MAX(unique_ips) as unique_ips,
SUM(normal_packets) as normal_packets,
SUM(normal_bytes) as normal_bytes,
MAX(normal_unique_ips) as normal_unique_ips,
SUM(attack_packets) as attack_packets,
SUM(attack_bytes) as attack_bytes,
MAX(attack_unique_ips) as attack_unique_ips
FROM network_analytics
WHERE date = %s AND hour IS NOT NULL
""", (target_date,))
daily_stats = cursor.fetchone()
if not daily_stats or not daily_stats['total_packets']:
print(f"[ANALYTICS] Nessun dato per {target_date}")
cursor.close()
conn.close()
return
# Merge JSON fields (countries, types, top IPs)
# TODO: Implementare merge intelligente se necessario
# Salva aggregazione giornaliera (hour = NULL)
cursor.execute("""
INSERT INTO network_analytics (
date, hour,
total_packets, total_bytes, unique_ips,
normal_packets, normal_bytes, normal_unique_ips,
attack_packets, attack_bytes, attack_unique_ips
)
VALUES (%s, NULL, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (date, hour) DO UPDATE SET
total_packets = EXCLUDED.total_packets,
total_bytes = EXCLUDED.total_bytes,
unique_ips = EXCLUDED.unique_ips,
normal_packets = EXCLUDED.normal_packets,
normal_bytes = EXCLUDED.normal_bytes,
normal_unique_ips = EXCLUDED.normal_unique_ips,
attack_packets = EXCLUDED.attack_packets,
attack_bytes = EXCLUDED.attack_bytes,
attack_unique_ips = EXCLUDED.attack_unique_ips
""", (
target_date,
daily_stats['total_packets'],
daily_stats['total_bytes'],
daily_stats['unique_ips'],
daily_stats['normal_packets'],
daily_stats['normal_bytes'],
daily_stats['normal_unique_ips'],
daily_stats['attack_packets'],
daily_stats['attack_bytes'],
daily_stats['attack_unique_ips']
))
conn.commit()
print(f"[ANALYTICS] ✅ Aggregazione giornaliera completata")
except Exception as e:
print(f"[ANALYTICS] ❌ Errore aggregazione giornaliera: {e}")
conn.rollback()
raise
finally:
cursor.close()
conn.close()
def main():
"""Entry point"""
aggregator = AnalyticsAggregator()
if len(sys.argv) > 1:
if sys.argv[1] == 'hourly':
aggregator.aggregate_hourly()
elif sys.argv[1] == 'daily':
aggregator.aggregate_daily()
else:
print("Usage: python analytics_aggregator.py [hourly|daily]")
else:
# Default: aggregazione oraria
aggregator.aggregate_hourly()
if __name__ == '__main__':
main()