import requests import ipaddress import json import logging import os import time import random from typing import List, Set # Configurazione del logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) WHITELIST_FILE = 'whitelist.txt' AZURE_SERVICE_TAGS_FILE = 'AzureServiceTags.json' def fetch_aws_ip_ranges() -> List[str]: """ Scarica e restituisce gli intervalli di IP di AWS. """ url = "https://ip-ranges.amazonaws.com/ip-ranges.json" logging.info("Scaricando gli IP ranges di AWS...") try: response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() aws_ips = [prefix['ip_prefix'] for prefix in data.get('prefixes', [])] logging.info(f"Scaricati {len(aws_ips)} IP ranges di AWS.") return aws_ips except Exception as e: logging.error(f"Errore durante il download degli IP di AWS: {e}") return [] def fetch_azure_ip_ranges() -> List[str]: """ Carica e restituisce gli intervalli di IP di Azure dai Service Tags JSON. """ logging.info("Caricando gli IP ranges di Azure dai Service Tags JSON...") if not os.path.exists(AZURE_SERVICE_TAGS_FILE): logging.error(f"Il file {AZURE_SERVICE_TAGS_FILE} non esiste. Per favore, scarica manualmente il file JSON dei Service Tags di Azure dalla pagina ufficiale e salvalo come '{AZURE_SERVICE_TAGS_FILE}' nella stessa directory dello script.") return [] try: with open(AZURE_SERVICE_TAGS_FILE, 'r') as f: data = json.load(f) azure_ips = [] for service in data.get('values', []): for prefix in service.get('properties', {}).get('addressPrefixes', []): azure_ips.append(prefix) logging.info(f"Scaricati {len(azure_ips)} IP ranges di Azure dai Service Tags.") return azure_ips except Exception as e: logging.error(f"Errore durante il caricamento degli IP di Azure dal file JSON: {e}") return [] def fetch_gcp_ip_ranges() -> List[str]: """ Scarica e restituisce gli intervalli di IP di GCP. """ url = "https://www.gstatic.com/ipranges/cloud.json" logging.info("Scaricando gli IP ranges di GCP...") try: response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() gcp_ips = [prefix['ipv4Prefix'] for prefix in data.get('prefixes', []) if 'ipv4Prefix' in prefix] logging.info(f"Scaricati {len(gcp_ips)} IP ranges di GCP.") return gcp_ips except Exception as e: logging.error(f"Errore durante il download degli IP di GCP: {e}") return [] def fetch_bgpview_ip_ranges(asns: List[int]) -> List[str]: """ Scarica e restituisce gli intervalli di IP associati agli ASN specificati da BGPView. Implementa un sistema di retry e backoff esponenziale per gestire i limiti dell'API. """ ripe_ips = [] for asn in asns: url = f"https://api.bgpview.io/asn/{asn}/prefixes" logging.info(f"Scaricando gli IP ranges per ASN {asn} da BGPView...") # Parametri di retry max_attempts = 3 base_delay = 3 # secondi for attempt in range(max_attempts): try: # Aggiungi un ritardo casuale per evitare richieste sincronizzate if attempt > 0: delay = base_delay * (2 ** attempt) + random.uniform(0, 1) logging.info(f"Ritentativo {attempt+1}/{max_attempts} dopo {delay:.2f} secondi...") time.sleep(delay) response = requests.get(url, timeout=10) # Se riceviamo un 429, aspettiamo e ritentiamo if response.status_code == 429: retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt))) logging.warning(f"Rate limit raggiunto per ASN {asn}. Attesa di {retry_after} secondi...") time.sleep(retry_after) continue response.raise_for_status() data = response.json() # Verifica se ci sono prefix nella risposta prefixes = data.get('data', {}).get('ipv4_prefixes', []) + data.get('data', {}).get('ipv6_prefixes', []) for prefix in prefixes: ripe_ips.append(prefix['prefix']) logging.info(f"Scaricati {len(prefixes)} IP ranges per ASN {asn}.") # Aggiungi un ritardo tra le richieste successive per rispettare il rate limit time.sleep(1) break # Esci dal ciclo di retry se abbiamo successo except requests.exceptions.HTTPError as e: if e.response.status_code == 429 and attempt < max_attempts - 1: retry_after = int(e.response.headers.get('Retry-After', base_delay * (2 ** attempt))) logging.warning(f"Rate limit raggiunto per ASN {asn}. Attesa di {retry_after} secondi...") time.sleep(retry_after) else: logging.error(f"Errore durante il download degli IP per ASN {asn}: {e}") break except Exception as e: logging.error(f"Errore durante il download degli IP per ASN {asn}: {e}") break return ripe_ips def write_whitelist(ip_ranges: Set[str], filename: str): """ Scrive gli intervalli di IP nella whitelist.txt, separando IPv4 e IPv6. """ try: ipv4_networks = [] ipv6_networks = [] for ip in ip_ranges: try: network = ipaddress.ip_network(ip, strict=False) if isinstance(network, ipaddress.IPv4Network): ipv4_networks.append(network) elif isinstance(network, ipaddress.IPv6Network): ipv6_networks.append(network) except ValueError as ve: logging.warning(f"Formato IP non valido: {ip}. Errore: {ve}") # Ordina IPv4 e IPv6 separatamente ipv4_networks_sorted = sorted(ipv4_networks, key=lambda x: x.network_address) ipv6_networks_sorted = sorted(ipv6_networks, key=lambda x: x.network_address) with open(filename, 'w') as f: for ip in ipv4_networks_sorted: f.write(f"{ip}\n") for ip in ipv6_networks_sorted: f.write(f"{ip}\n") logging.info(f"Whitelist aggiornata con {len(ip_ranges)} IP ranges (IPv4: {len(ipv4_networks_sorted)}, IPv6: {len(ipv6_networks_sorted)}).") except Exception as e: logging.error(f"Errore durante la scrittura della whitelist: {e}") def add_manual_oracle_ips() -> List[str]: """ Aggiunge manualmente blocchi IP di Oracle che potrebbero non essere inclusi negli ASN. Questi IP sono stati identificati come appartenenti a Oracle ma non sono stati rilevati tramite le query ASN standard. """ logging.info("Aggiungendo manualmente blocchi IP di Oracle...") oracle_ips = [ # Oracle Cloud IP noti ma non inclusi negli ASN standard "64.181.0.0/17", # Blocco che include 64.181.233.158 "129.152.0.0/16", "132.145.0.0/16", "138.1.0.0/16", "138.2.0.0/16", "140.91.0.0/16", "147.154.0.0/16", "152.67.0.0/16", "158.101.0.0/16", "192.29.0.0/16" ] logging.info(f"Aggiunti manualmente {len(oracle_ips)} blocchi IP di Oracle.") return oracle_ips def main(): # Lista di ASN per le aziende affidabili # AWS: 16509, 14618 # Microsoft Azure: 8075 # Google Cloud: 15169 # Facebook (Meta): 32934 # Oracle Cloud: 31898, 43454 # Cloudflare: 13335, 209242 # Akamai: 16625, 20940, 35994 # IBM Cloud: 36351 # OVH: 16276 # Digital Ocean: 14061 # Fastly: 54113 # Alibaba Cloud: 45102 # GitHub: 36459 # Linode/Akamai: 63949, 398101 # Hetzner: 24940 # Rackspace: 33070 # CDNJS: 13949 # Equinix: 394354, 395089 # Baidu Cloud: 55967 asns = [ # AWS 16509, 14618, # Microsoft Azure 8075, # Google Cloud 15169, # Facebook (Meta) 32934, # Akamai 16625, 20940, 35994, # Level3/Lumen 6762, 3356, 3549, # Oracle Cloud - aggiunti altri ASN di Oracle 31898, 43454, 7160, 6507, 3944, # Cloudflare 13335, 209242, # IBM Cloud 36351, # OVH 16276, # Digital Ocean 14061, # Fastly 54113, # Alibaba Cloud 45102, # GitHub 36459, # Linode/Akamai 63949, 398101, # Hetzner 24940, # Rackspace 33070, # Equinix 394354, 395089, # Baidu Cloud 55967 ] # Fetch IP ranges da diverse fonti aws_ips = fetch_aws_ip_ranges() azure_ips = fetch_azure_ip_ranges() gcp_ips = fetch_gcp_ip_ranges() bgpview_ips = fetch_bgpview_ip_ranges(asns) # Aggiungi manualmente blocchi IP di Oracle oracle_manual_ips = add_manual_oracle_ips() # Combina tutti gli IP in un set per rimuovere duplicati combined_ips = set(aws_ips + azure_ips + gcp_ips + bgpview_ips + oracle_manual_ips) # Scrivi il tutto nel file whitelist.txt write_whitelist(combined_ips, WHITELIST_FILE) if __name__ == "__main__": main()