Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
270 lines
9.5 KiB
Python
270 lines
9.5 KiB
Python
import requests
|
|
import ipaddress
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import random
|
|
from typing import List, Set
|
|
|
|
# Configurazione del logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
WHITELIST_FILE = 'whitelist.txt'
|
|
AZURE_SERVICE_TAGS_FILE = 'AzureServiceTags.json'
|
|
|
|
def fetch_aws_ip_ranges() -> List[str]:
|
|
"""
|
|
Scarica e restituisce gli intervalli di IP di AWS.
|
|
"""
|
|
url = "https://ip-ranges.amazonaws.com/ip-ranges.json"
|
|
logging.info("Scaricando gli IP ranges di AWS...")
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
aws_ips = [prefix['ip_prefix'] for prefix in data.get('prefixes', [])]
|
|
logging.info(f"Scaricati {len(aws_ips)} IP ranges di AWS.")
|
|
return aws_ips
|
|
except Exception as e:
|
|
logging.error(f"Errore durante il download degli IP di AWS: {e}")
|
|
return []
|
|
|
|
def fetch_azure_ip_ranges() -> List[str]:
|
|
"""
|
|
Carica e restituisce gli intervalli di IP di Azure dai Service Tags JSON.
|
|
"""
|
|
logging.info("Caricando gli IP ranges di Azure dai Service Tags JSON...")
|
|
|
|
if not os.path.exists(AZURE_SERVICE_TAGS_FILE):
|
|
logging.error(f"Il file {AZURE_SERVICE_TAGS_FILE} non esiste. Per favore, scarica manualmente il file JSON dei Service Tags di Azure dalla pagina ufficiale e salvalo come '{AZURE_SERVICE_TAGS_FILE}' nella stessa directory dello script.")
|
|
return []
|
|
|
|
try:
|
|
with open(AZURE_SERVICE_TAGS_FILE, 'r') as f:
|
|
data = json.load(f)
|
|
azure_ips = []
|
|
for service in data.get('values', []):
|
|
for prefix in service.get('properties', {}).get('addressPrefixes', []):
|
|
azure_ips.append(prefix)
|
|
logging.info(f"Scaricati {len(azure_ips)} IP ranges di Azure dai Service Tags.")
|
|
return azure_ips
|
|
except Exception as e:
|
|
logging.error(f"Errore durante il caricamento degli IP di Azure dal file JSON: {e}")
|
|
return []
|
|
|
|
def fetch_gcp_ip_ranges() -> List[str]:
|
|
"""
|
|
Scarica e restituisce gli intervalli di IP di GCP.
|
|
"""
|
|
url = "https://www.gstatic.com/ipranges/cloud.json"
|
|
logging.info("Scaricando gli IP ranges di GCP...")
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
gcp_ips = [prefix['ipv4Prefix'] for prefix in data.get('prefixes', []) if 'ipv4Prefix' in prefix]
|
|
logging.info(f"Scaricati {len(gcp_ips)} IP ranges di GCP.")
|
|
return gcp_ips
|
|
except Exception as e:
|
|
logging.error(f"Errore durante il download degli IP di GCP: {e}")
|
|
return []
|
|
|
|
def fetch_bgpview_ip_ranges(asns: List[int]) -> List[str]:
|
|
"""
|
|
Scarica e restituisce gli intervalli di IP associati agli ASN specificati da BGPView.
|
|
Implementa un sistema di retry e backoff esponenziale per gestire i limiti dell'API.
|
|
"""
|
|
ripe_ips = []
|
|
|
|
for asn in asns:
|
|
url = f"https://api.bgpview.io/asn/{asn}/prefixes"
|
|
logging.info(f"Scaricando gli IP ranges per ASN {asn} da BGPView...")
|
|
|
|
# Parametri di retry
|
|
max_attempts = 3
|
|
base_delay = 3 # secondi
|
|
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
# Aggiungi un ritardo casuale per evitare richieste sincronizzate
|
|
if attempt > 0:
|
|
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
|
|
logging.info(f"Ritentativo {attempt+1}/{max_attempts} dopo {delay:.2f} secondi...")
|
|
time.sleep(delay)
|
|
|
|
response = requests.get(url, timeout=10)
|
|
|
|
# Se riceviamo un 429, aspettiamo e ritentiamo
|
|
if response.status_code == 429:
|
|
retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
|
|
logging.warning(f"Rate limit raggiunto per ASN {asn}. Attesa di {retry_after} secondi...")
|
|
time.sleep(retry_after)
|
|
continue
|
|
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Verifica se ci sono prefix nella risposta
|
|
prefixes = data.get('data', {}).get('ipv4_prefixes', []) + data.get('data', {}).get('ipv6_prefixes', [])
|
|
for prefix in prefixes:
|
|
ripe_ips.append(prefix['prefix'])
|
|
|
|
logging.info(f"Scaricati {len(prefixes)} IP ranges per ASN {asn}.")
|
|
|
|
# Aggiungi un ritardo tra le richieste successive per rispettare il rate limit
|
|
time.sleep(1)
|
|
break # Esci dal ciclo di retry se abbiamo successo
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 429 and attempt < max_attempts - 1:
|
|
retry_after = int(e.response.headers.get('Retry-After', base_delay * (2 ** attempt)))
|
|
logging.warning(f"Rate limit raggiunto per ASN {asn}. Attesa di {retry_after} secondi...")
|
|
time.sleep(retry_after)
|
|
else:
|
|
logging.error(f"Errore durante il download degli IP per ASN {asn}: {e}")
|
|
break
|
|
except Exception as e:
|
|
logging.error(f"Errore durante il download degli IP per ASN {asn}: {e}")
|
|
break
|
|
|
|
return ripe_ips
|
|
|
|
def write_whitelist(ip_ranges: Set[str], filename: str):
|
|
"""
|
|
Scrive gli intervalli di IP nella whitelist.txt, separando IPv4 e IPv6.
|
|
"""
|
|
try:
|
|
ipv4_networks = []
|
|
ipv6_networks = []
|
|
|
|
for ip in ip_ranges:
|
|
try:
|
|
network = ipaddress.ip_network(ip, strict=False)
|
|
if isinstance(network, ipaddress.IPv4Network):
|
|
ipv4_networks.append(network)
|
|
elif isinstance(network, ipaddress.IPv6Network):
|
|
ipv6_networks.append(network)
|
|
except ValueError as ve:
|
|
logging.warning(f"Formato IP non valido: {ip}. Errore: {ve}")
|
|
|
|
# Ordina IPv4 e IPv6 separatamente
|
|
ipv4_networks_sorted = sorted(ipv4_networks, key=lambda x: x.network_address)
|
|
ipv6_networks_sorted = sorted(ipv6_networks, key=lambda x: x.network_address)
|
|
|
|
with open(filename, 'w') as f:
|
|
for ip in ipv4_networks_sorted:
|
|
f.write(f"{ip}\n")
|
|
for ip in ipv6_networks_sorted:
|
|
f.write(f"{ip}\n")
|
|
|
|
logging.info(f"Whitelist aggiornata con {len(ip_ranges)} IP ranges (IPv4: {len(ipv4_networks_sorted)}, IPv6: {len(ipv6_networks_sorted)}).")
|
|
except Exception as e:
|
|
logging.error(f"Errore durante la scrittura della whitelist: {e}")
|
|
|
|
def add_manual_oracle_ips() -> List[str]:
|
|
"""
|
|
Aggiunge manualmente blocchi IP di Oracle che potrebbero non essere inclusi negli ASN.
|
|
Questi IP sono stati identificati come appartenenti a Oracle ma non sono stati
|
|
rilevati tramite le query ASN standard.
|
|
"""
|
|
logging.info("Aggiungendo manualmente blocchi IP di Oracle...")
|
|
oracle_ips = [
|
|
# Oracle Cloud IP noti ma non inclusi negli ASN standard
|
|
"64.181.0.0/17", # Blocco che include 64.181.233.158
|
|
"129.152.0.0/16",
|
|
"132.145.0.0/16",
|
|
"138.1.0.0/16",
|
|
"138.2.0.0/16",
|
|
"140.91.0.0/16",
|
|
"147.154.0.0/16",
|
|
"152.67.0.0/16",
|
|
"158.101.0.0/16",
|
|
"192.29.0.0/16"
|
|
]
|
|
logging.info(f"Aggiunti manualmente {len(oracle_ips)} blocchi IP di Oracle.")
|
|
return oracle_ips
|
|
|
|
def main():
|
|
# Lista di ASN per le aziende affidabili
|
|
# AWS: 16509, 14618
|
|
# Microsoft Azure: 8075
|
|
# Google Cloud: 15169
|
|
# Facebook (Meta): 32934
|
|
# Oracle Cloud: 31898, 43454
|
|
# Cloudflare: 13335, 209242
|
|
# Akamai: 16625, 20940, 35994
|
|
# IBM Cloud: 36351
|
|
# OVH: 16276
|
|
# Digital Ocean: 14061
|
|
# Fastly: 54113
|
|
# Alibaba Cloud: 45102
|
|
# GitHub: 36459
|
|
# Linode/Akamai: 63949, 398101
|
|
# Hetzner: 24940
|
|
# Rackspace: 33070
|
|
# CDNJS: 13949
|
|
# Equinix: 394354, 395089
|
|
# Baidu Cloud: 55967
|
|
asns = [
|
|
# AWS
|
|
16509, 14618,
|
|
# Microsoft Azure
|
|
8075,
|
|
# Google Cloud
|
|
15169,
|
|
# Facebook (Meta)
|
|
32934,
|
|
# Akamai
|
|
16625, 20940, 35994,
|
|
# Level3/Lumen
|
|
6762, 3356, 3549,
|
|
# Oracle Cloud - aggiunti altri ASN di Oracle
|
|
31898, 43454, 7160, 6507, 3944,
|
|
# Cloudflare
|
|
13335, 209242,
|
|
# IBM Cloud
|
|
36351,
|
|
# OVH
|
|
16276,
|
|
# Digital Ocean
|
|
14061,
|
|
# Fastly
|
|
54113,
|
|
# Alibaba Cloud
|
|
45102,
|
|
# GitHub
|
|
36459,
|
|
# Linode/Akamai
|
|
63949, 398101,
|
|
# Hetzner
|
|
24940,
|
|
# Rackspace
|
|
33070,
|
|
# Equinix
|
|
394354, 395089,
|
|
# Baidu Cloud
|
|
55967
|
|
]
|
|
|
|
# Fetch IP ranges da diverse fonti
|
|
aws_ips = fetch_aws_ip_ranges()
|
|
azure_ips = fetch_azure_ip_ranges()
|
|
gcp_ips = fetch_gcp_ip_ranges()
|
|
bgpview_ips = fetch_bgpview_ip_ranges(asns)
|
|
|
|
# Aggiungi manualmente blocchi IP di Oracle
|
|
oracle_manual_ips = add_manual_oracle_ips()
|
|
|
|
# Combina tutti gli IP in un set per rimuovere duplicati
|
|
combined_ips = set(aws_ips + azure_ips + gcp_ips + bgpview_ips + oracle_manual_ips)
|
|
|
|
# Scrivi il tutto nel file whitelist.txt
|
|
write_whitelist(combined_ips, WHITELIST_FILE)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|