ids.alfacom.it/extracted_idf/white.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

270 lines
9.5 KiB
Python

import requests
import ipaddress
import json
import logging
import os
import time
import random
from typing import List, Set
# Configurazione del logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
WHITELIST_FILE = 'whitelist.txt'
AZURE_SERVICE_TAGS_FILE = 'AzureServiceTags.json'
def fetch_aws_ip_ranges() -> List[str]:
"""
Scarica e restituisce gli intervalli di IP di AWS.
"""
url = "https://ip-ranges.amazonaws.com/ip-ranges.json"
logging.info("Scaricando gli IP ranges di AWS...")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
aws_ips = [prefix['ip_prefix'] for prefix in data.get('prefixes', [])]
logging.info(f"Scaricati {len(aws_ips)} IP ranges di AWS.")
return aws_ips
except Exception as e:
logging.error(f"Errore durante il download degli IP di AWS: {e}")
return []
def fetch_azure_ip_ranges() -> List[str]:
"""
Carica e restituisce gli intervalli di IP di Azure dai Service Tags JSON.
"""
logging.info("Caricando gli IP ranges di Azure dai Service Tags JSON...")
if not os.path.exists(AZURE_SERVICE_TAGS_FILE):
logging.error(f"Il file {AZURE_SERVICE_TAGS_FILE} non esiste. Per favore, scarica manualmente il file JSON dei Service Tags di Azure dalla pagina ufficiale e salvalo come '{AZURE_SERVICE_TAGS_FILE}' nella stessa directory dello script.")
return []
try:
with open(AZURE_SERVICE_TAGS_FILE, 'r') as f:
data = json.load(f)
azure_ips = []
for service in data.get('values', []):
for prefix in service.get('properties', {}).get('addressPrefixes', []):
azure_ips.append(prefix)
logging.info(f"Scaricati {len(azure_ips)} IP ranges di Azure dai Service Tags.")
return azure_ips
except Exception as e:
logging.error(f"Errore durante il caricamento degli IP di Azure dal file JSON: {e}")
return []
def fetch_gcp_ip_ranges() -> List[str]:
"""
Scarica e restituisce gli intervalli di IP di GCP.
"""
url = "https://www.gstatic.com/ipranges/cloud.json"
logging.info("Scaricando gli IP ranges di GCP...")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
gcp_ips = [prefix['ipv4Prefix'] for prefix in data.get('prefixes', []) if 'ipv4Prefix' in prefix]
logging.info(f"Scaricati {len(gcp_ips)} IP ranges di GCP.")
return gcp_ips
except Exception as e:
logging.error(f"Errore durante il download degli IP di GCP: {e}")
return []
def fetch_bgpview_ip_ranges(asns: List[int]) -> List[str]:
"""
Scarica e restituisce gli intervalli di IP associati agli ASN specificati da BGPView.
Implementa un sistema di retry e backoff esponenziale per gestire i limiti dell'API.
"""
ripe_ips = []
for asn in asns:
url = f"https://api.bgpview.io/asn/{asn}/prefixes"
logging.info(f"Scaricando gli IP ranges per ASN {asn} da BGPView...")
# Parametri di retry
max_attempts = 3
base_delay = 3 # secondi
for attempt in range(max_attempts):
try:
# Aggiungi un ritardo casuale per evitare richieste sincronizzate
if attempt > 0:
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
logging.info(f"Ritentativo {attempt+1}/{max_attempts} dopo {delay:.2f} secondi...")
time.sleep(delay)
response = requests.get(url, timeout=10)
# Se riceviamo un 429, aspettiamo e ritentiamo
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
logging.warning(f"Rate limit raggiunto per ASN {asn}. Attesa di {retry_after} secondi...")
time.sleep(retry_after)
continue
response.raise_for_status()
data = response.json()
# Verifica se ci sono prefix nella risposta
prefixes = data.get('data', {}).get('ipv4_prefixes', []) + data.get('data', {}).get('ipv6_prefixes', [])
for prefix in prefixes:
ripe_ips.append(prefix['prefix'])
logging.info(f"Scaricati {len(prefixes)} IP ranges per ASN {asn}.")
# Aggiungi un ritardo tra le richieste successive per rispettare il rate limit
time.sleep(1)
break # Esci dal ciclo di retry se abbiamo successo
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429 and attempt < max_attempts - 1:
retry_after = int(e.response.headers.get('Retry-After', base_delay * (2 ** attempt)))
logging.warning(f"Rate limit raggiunto per ASN {asn}. Attesa di {retry_after} secondi...")
time.sleep(retry_after)
else:
logging.error(f"Errore durante il download degli IP per ASN {asn}: {e}")
break
except Exception as e:
logging.error(f"Errore durante il download degli IP per ASN {asn}: {e}")
break
return ripe_ips
def write_whitelist(ip_ranges: Set[str], filename: str):
"""
Scrive gli intervalli di IP nella whitelist.txt, separando IPv4 e IPv6.
"""
try:
ipv4_networks = []
ipv6_networks = []
for ip in ip_ranges:
try:
network = ipaddress.ip_network(ip, strict=False)
if isinstance(network, ipaddress.IPv4Network):
ipv4_networks.append(network)
elif isinstance(network, ipaddress.IPv6Network):
ipv6_networks.append(network)
except ValueError as ve:
logging.warning(f"Formato IP non valido: {ip}. Errore: {ve}")
# Ordina IPv4 e IPv6 separatamente
ipv4_networks_sorted = sorted(ipv4_networks, key=lambda x: x.network_address)
ipv6_networks_sorted = sorted(ipv6_networks, key=lambda x: x.network_address)
with open(filename, 'w') as f:
for ip in ipv4_networks_sorted:
f.write(f"{ip}\n")
for ip in ipv6_networks_sorted:
f.write(f"{ip}\n")
logging.info(f"Whitelist aggiornata con {len(ip_ranges)} IP ranges (IPv4: {len(ipv4_networks_sorted)}, IPv6: {len(ipv6_networks_sorted)}).")
except Exception as e:
logging.error(f"Errore durante la scrittura della whitelist: {e}")
def add_manual_oracle_ips() -> List[str]:
"""
Aggiunge manualmente blocchi IP di Oracle che potrebbero non essere inclusi negli ASN.
Questi IP sono stati identificati come appartenenti a Oracle ma non sono stati
rilevati tramite le query ASN standard.
"""
logging.info("Aggiungendo manualmente blocchi IP di Oracle...")
oracle_ips = [
# Oracle Cloud IP noti ma non inclusi negli ASN standard
"64.181.0.0/17", # Blocco che include 64.181.233.158
"129.152.0.0/16",
"132.145.0.0/16",
"138.1.0.0/16",
"138.2.0.0/16",
"140.91.0.0/16",
"147.154.0.0/16",
"152.67.0.0/16",
"158.101.0.0/16",
"192.29.0.0/16"
]
logging.info(f"Aggiunti manualmente {len(oracle_ips)} blocchi IP di Oracle.")
return oracle_ips
def main():
# Lista di ASN per le aziende affidabili
# AWS: 16509, 14618
# Microsoft Azure: 8075
# Google Cloud: 15169
# Facebook (Meta): 32934
# Oracle Cloud: 31898, 43454
# Cloudflare: 13335, 209242
# Akamai: 16625, 20940, 35994
# IBM Cloud: 36351
# OVH: 16276
# Digital Ocean: 14061
# Fastly: 54113
# Alibaba Cloud: 45102
# GitHub: 36459
# Linode/Akamai: 63949, 398101
# Hetzner: 24940
# Rackspace: 33070
# CDNJS: 13949
# Equinix: 394354, 395089
# Baidu Cloud: 55967
asns = [
# AWS
16509, 14618,
# Microsoft Azure
8075,
# Google Cloud
15169,
# Facebook (Meta)
32934,
# Akamai
16625, 20940, 35994,
# Level3/Lumen
6762, 3356, 3549,
# Oracle Cloud - aggiunti altri ASN di Oracle
31898, 43454, 7160, 6507, 3944,
# Cloudflare
13335, 209242,
# IBM Cloud
36351,
# OVH
16276,
# Digital Ocean
14061,
# Fastly
54113,
# Alibaba Cloud
45102,
# GitHub
36459,
# Linode/Akamai
63949, 398101,
# Hetzner
24940,
# Rackspace
33070,
# Equinix
394354, 395089,
# Baidu Cloud
55967
]
# Fetch IP ranges da diverse fonti
aws_ips = fetch_aws_ip_ranges()
azure_ips = fetch_azure_ip_ranges()
gcp_ips = fetch_gcp_ip_ranges()
bgpview_ips = fetch_bgpview_ip_ranges(asns)
# Aggiungi manualmente blocchi IP di Oracle
oracle_manual_ips = add_manual_oracle_ips()
# Combina tutti gli IP in un set per rimuovere duplicati
combined_ips = set(aws_ips + azure_ips + gcp_ips + bgpview_ips + oracle_manual_ips)
# Scrivi il tutto nel file whitelist.txt
write_whitelist(combined_ips, WHITELIST_FILE)
if __name__ == "__main__":
main()