Enhance security by removing hardcoded paths, implementing a wrapper script for manual execution, and adding robust credential validation in the analytics aggregator. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 47661cde-4285-4ce6-8d00-fb236a5a01b7 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/oGXAoP7
337 lines
13 KiB
Python
337 lines
13 KiB
Python
"""
|
|
Network Analytics Aggregator
|
|
Aggrega statistiche traffico (normale + attacchi) ogni ora e giorno
|
|
Mantiene dati permanenti per analytics long-term
|
|
"""
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
from typing import Dict, List, Optional
|
|
from collections import defaultdict
|
|
import sys
|
|
|
|
|
|
class AnalyticsAggregator:
|
|
"""
|
|
Aggregatore analytics per traffico normale + attacchi
|
|
Salva statistiche permanenti in network_analytics
|
|
|
|
SECURITY: Richiede variabili d'ambiente per credenziali DB.
|
|
- Production: Gestite da systemd EnvironmentFile
|
|
- Manual: Usare script wrapper run_analytics.sh
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Leggi credenziali da variabili d'ambiente (iniettate da systemd o wrapper)
|
|
self.db_params = {
|
|
'host': os.getenv('PGHOST', 'localhost'),
|
|
'port': int(os.getenv('PGPORT', 5432)),
|
|
'database': os.getenv('PGDATABASE', 'ids_db'),
|
|
'user': os.getenv('PGUSER', 'ids'),
|
|
'password': os.getenv('PGPASSWORD', ''),
|
|
}
|
|
|
|
# Fail-fast: verifica credenziali obbligatorie
|
|
missing = []
|
|
for key in ['PGHOST', 'PGDATABASE', 'PGUSER', 'PGPASSWORD']:
|
|
if not os.getenv(key):
|
|
missing.append(key)
|
|
|
|
if missing:
|
|
raise ValueError(
|
|
f"Credenziali database mancanti: {', '.join(missing)}\n"
|
|
f"Esecuzione manuale: usa ./deployment/run_analytics.sh\n"
|
|
f"Systemd: verifica EnvironmentFile in ids-analytics-aggregator.service"
|
|
)
|
|
|
|
def get_connection(self):
|
|
"""Crea connessione database"""
|
|
return psycopg2.connect(**self.db_params)
|
|
|
|
def aggregate_hourly(self, target_hour: Optional[datetime] = None):
|
|
"""
|
|
Aggrega statistiche per una specifica ora
|
|
Se target_hour è None, usa l'ora precedente
|
|
"""
|
|
if target_hour is None:
|
|
# Usa ora precedente (es: se ora sono le 10:30, aggrega le 09:00-10:00)
|
|
now = datetime.now()
|
|
target_hour = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=1)
|
|
|
|
hour_start = target_hour
|
|
hour_end = hour_start + timedelta(hours=1)
|
|
|
|
print(f"[ANALYTICS] Aggregazione oraria: {hour_start.strftime('%Y-%m-%d %H:00')}")
|
|
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
try:
|
|
# 1. Analizza network_logs nell'ora target
|
|
cursor.execute("""
|
|
SELECT
|
|
source_ip,
|
|
COUNT(*) as packets,
|
|
SUM(packet_length) as bytes
|
|
FROM network_logs
|
|
WHERE timestamp >= %s AND timestamp < %s
|
|
GROUP BY source_ip
|
|
""", (hour_start, hour_end))
|
|
|
|
traffic_by_ip = {row['source_ip']: row for row in cursor.fetchall()}
|
|
|
|
if not traffic_by_ip:
|
|
print(f"[ANALYTICS] Nessun traffico nell'ora {hour_start.strftime('%H:00')}")
|
|
cursor.close()
|
|
conn.close()
|
|
return
|
|
|
|
# 2. Identifica IP attaccanti (detections nell'ora)
|
|
cursor.execute("""
|
|
SELECT DISTINCT source_ip, anomaly_type, risk_score, country
|
|
FROM detections
|
|
WHERE detected_at >= %s AND detected_at < %s
|
|
""", (hour_start, hour_end))
|
|
|
|
attacker_ips = {}
|
|
attacks_by_type = defaultdict(int)
|
|
|
|
for row in cursor.fetchall():
|
|
ip = row['source_ip']
|
|
attacker_ips[ip] = row
|
|
attacks_by_type[row['anomaly_type']] += 1
|
|
|
|
# 3. Classifica traffico: normale vs attacco
|
|
total_packets = 0
|
|
total_bytes = 0
|
|
normal_packets = 0
|
|
normal_bytes = 0
|
|
attack_packets = 0
|
|
attack_bytes = 0
|
|
|
|
traffic_by_country = defaultdict(lambda: {'normal': 0, 'attacks': 0})
|
|
attacks_by_country = defaultdict(int)
|
|
|
|
top_normal = []
|
|
top_attackers = []
|
|
|
|
for ip, stats in traffic_by_ip.items():
|
|
packets = stats['packets']
|
|
bytes_count = stats['bytes'] or 0
|
|
|
|
total_packets += packets
|
|
total_bytes += bytes_count
|
|
|
|
if ip in attacker_ips:
|
|
# IP attaccante
|
|
attack_packets += packets
|
|
attack_bytes += bytes_count
|
|
|
|
country = attacker_ips[ip].get('country')
|
|
if country:
|
|
traffic_by_country[country]['attacks'] += packets
|
|
attacks_by_country[country] += 1
|
|
|
|
top_attackers.append({
|
|
'ip': ip,
|
|
'country': country,
|
|
'risk_score': float(attacker_ips[ip]['risk_score']),
|
|
'packets': packets,
|
|
'bytes': bytes_count
|
|
})
|
|
else:
|
|
# IP normale
|
|
normal_packets += packets
|
|
normal_bytes += bytes_count
|
|
|
|
# Lookup paese per IP normale (da detections precedenti o geo cache)
|
|
cursor.execute("""
|
|
SELECT country FROM detections
|
|
WHERE source_ip = %s AND country IS NOT NULL
|
|
ORDER BY detected_at DESC LIMIT 1
|
|
""", (ip,))
|
|
|
|
geo_row = cursor.fetchone()
|
|
country = geo_row['country'] if geo_row else None
|
|
|
|
if country:
|
|
traffic_by_country[country]['normal'] += packets
|
|
|
|
top_normal.append({
|
|
'ip': ip,
|
|
'packets': packets,
|
|
'bytes': bytes_count,
|
|
'country': country
|
|
})
|
|
|
|
# 4. Top 10 per categoria
|
|
top_normal = sorted(top_normal, key=lambda x: x['packets'], reverse=True)[:10]
|
|
top_attackers = sorted(top_attackers, key=lambda x: x['risk_score'], reverse=True)[:10]
|
|
|
|
# 5. Salva aggregazione
|
|
cursor.execute("""
|
|
INSERT INTO network_analytics (
|
|
date, hour,
|
|
total_packets, total_bytes, unique_ips,
|
|
normal_packets, normal_bytes, normal_unique_ips, top_normal_ips,
|
|
attack_packets, attack_bytes, attack_unique_ips,
|
|
attacks_by_country, attacks_by_type, top_attackers,
|
|
traffic_by_country
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (date, hour) DO UPDATE SET
|
|
total_packets = EXCLUDED.total_packets,
|
|
total_bytes = EXCLUDED.total_bytes,
|
|
unique_ips = EXCLUDED.unique_ips,
|
|
normal_packets = EXCLUDED.normal_packets,
|
|
normal_bytes = EXCLUDED.normal_bytes,
|
|
normal_unique_ips = EXCLUDED.normal_unique_ips,
|
|
top_normal_ips = EXCLUDED.top_normal_ips,
|
|
attack_packets = EXCLUDED.attack_packets,
|
|
attack_bytes = EXCLUDED.attack_bytes,
|
|
attack_unique_ips = EXCLUDED.attack_unique_ips,
|
|
attacks_by_country = EXCLUDED.attacks_by_country,
|
|
attacks_by_type = EXCLUDED.attacks_by_type,
|
|
top_attackers = EXCLUDED.top_attackers,
|
|
traffic_by_country = EXCLUDED.traffic_by_country
|
|
""", (
|
|
target_hour.date(),
|
|
target_hour.hour,
|
|
total_packets,
|
|
total_bytes,
|
|
len(traffic_by_ip),
|
|
normal_packets,
|
|
normal_bytes,
|
|
len(traffic_by_ip) - len(attacker_ips),
|
|
json.dumps(top_normal),
|
|
attack_packets,
|
|
attack_bytes,
|
|
len(attacker_ips),
|
|
json.dumps(dict(attacks_by_country)),
|
|
json.dumps(dict(attacks_by_type)),
|
|
json.dumps(top_attackers),
|
|
json.dumps({k: dict(v) for k, v in traffic_by_country.items()})
|
|
))
|
|
|
|
conn.commit()
|
|
|
|
print(f"[ANALYTICS] ✅ Aggregazione completata:")
|
|
print(f" - Totale: {total_packets} pacchetti, {len(traffic_by_ip)} IP unici")
|
|
print(f" - Normale: {normal_packets} pacchetti ({normal_packets*100//total_packets if total_packets else 0}%)")
|
|
print(f" - Attacchi: {attack_packets} pacchetti ({attack_packets*100//total_packets if total_packets else 0}%), {len(attacker_ips)} IP")
|
|
|
|
except Exception as e:
|
|
print(f"[ANALYTICS] ❌ Errore aggregazione oraria: {e}")
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
def aggregate_daily(self, target_date: Optional[datetime] = None):
|
|
"""
|
|
Aggrega statistiche giornaliere (somma delle ore)
|
|
Se target_date è None, usa giorno precedente
|
|
"""
|
|
if target_date is None:
|
|
target_date = datetime.now().date() - timedelta(days=1)
|
|
|
|
print(f"[ANALYTICS] Aggregazione giornaliera: {target_date}")
|
|
|
|
conn = self.get_connection()
|
|
cursor = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
try:
|
|
# Somma aggregazioni orarie del giorno
|
|
cursor.execute("""
|
|
SELECT
|
|
SUM(total_packets) as total_packets,
|
|
SUM(total_bytes) as total_bytes,
|
|
MAX(unique_ips) as unique_ips,
|
|
SUM(normal_packets) as normal_packets,
|
|
SUM(normal_bytes) as normal_bytes,
|
|
MAX(normal_unique_ips) as normal_unique_ips,
|
|
SUM(attack_packets) as attack_packets,
|
|
SUM(attack_bytes) as attack_bytes,
|
|
MAX(attack_unique_ips) as attack_unique_ips
|
|
FROM network_analytics
|
|
WHERE date = %s AND hour IS NOT NULL
|
|
""", (target_date,))
|
|
|
|
daily_stats = cursor.fetchone()
|
|
|
|
if not daily_stats or not daily_stats['total_packets']:
|
|
print(f"[ANALYTICS] Nessun dato per {target_date}")
|
|
cursor.close()
|
|
conn.close()
|
|
return
|
|
|
|
# Merge JSON fields (countries, types, top IPs)
|
|
# TODO: Implementare merge intelligente se necessario
|
|
|
|
# Salva aggregazione giornaliera (hour = NULL)
|
|
cursor.execute("""
|
|
INSERT INTO network_analytics (
|
|
date, hour,
|
|
total_packets, total_bytes, unique_ips,
|
|
normal_packets, normal_bytes, normal_unique_ips,
|
|
attack_packets, attack_bytes, attack_unique_ips
|
|
)
|
|
VALUES (%s, NULL, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT (date, hour) DO UPDATE SET
|
|
total_packets = EXCLUDED.total_packets,
|
|
total_bytes = EXCLUDED.total_bytes,
|
|
unique_ips = EXCLUDED.unique_ips,
|
|
normal_packets = EXCLUDED.normal_packets,
|
|
normal_bytes = EXCLUDED.normal_bytes,
|
|
normal_unique_ips = EXCLUDED.normal_unique_ips,
|
|
attack_packets = EXCLUDED.attack_packets,
|
|
attack_bytes = EXCLUDED.attack_bytes,
|
|
attack_unique_ips = EXCLUDED.attack_unique_ips
|
|
""", (
|
|
target_date,
|
|
daily_stats['total_packets'],
|
|
daily_stats['total_bytes'],
|
|
daily_stats['unique_ips'],
|
|
daily_stats['normal_packets'],
|
|
daily_stats['normal_bytes'],
|
|
daily_stats['normal_unique_ips'],
|
|
daily_stats['attack_packets'],
|
|
daily_stats['attack_bytes'],
|
|
daily_stats['attack_unique_ips']
|
|
))
|
|
|
|
conn.commit()
|
|
print(f"[ANALYTICS] ✅ Aggregazione giornaliera completata")
|
|
|
|
except Exception as e:
|
|
print(f"[ANALYTICS] ❌ Errore aggregazione giornaliera: {e}")
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
|
|
def main():
|
|
"""Entry point"""
|
|
aggregator = AnalyticsAggregator()
|
|
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] == 'hourly':
|
|
aggregator.aggregate_hourly()
|
|
elif sys.argv[1] == 'daily':
|
|
aggregator.aggregate_daily()
|
|
else:
|
|
print("Usage: python analytics_aggregator.py [hourly|daily]")
|
|
else:
|
|
# Default: aggregazione oraria
|
|
aggregator.aggregate_hourly()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|