""" IP Geolocation Service Usa ip-api.com per ottenere informazioni geografiche e AS per IP address Free tier: 45 richieste/minuto Supporta lookup async batch per performance ottimali """ import httpx from typing import Dict, Optional, List import time import asyncio class IPGeolocationService: """ Servizio per geolocalizzazione IP usando ip-api.com Include caching per ridurre chiamate API """ def __init__(self): self.api_url = "http://ip-api.com/json/{ip}" self.batch_api_url = "http://ip-api.com/batch" self.cache: Dict[str, Dict] = {} self.last_request_time = 0 self.min_delay = 1.5 # secondi tra richieste (per restare sotto 45/min) self.max_batch_size = 100 # ip-api.com supporta max 100 IP per batch def lookup(self, ip_address: str) -> Optional[Dict]: """ Ottieni informazioni geografiche per un IP Returns: Dict con country, city, org, as, isp o None se errore """ # Check cache if ip_address in self.cache: return self.cache[ip_address] # Rate limiting: attendi se troppo veloce current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.min_delay: time.sleep(self.min_delay - time_since_last) try: # Richiesta API url = self.api_url.format(ip=ip_address) response = httpx.get(url, timeout=5.0) self.last_request_time = time.time() if response.status_code == 200: data = response.json() # Controlla se successo if data.get('status') == 'success': geo_info = { 'country': data.get('country'), 'country_code': data.get('countryCode'), 'city': data.get('city'), 'organization': data.get('org'), 'as_number': data.get('as', '').split()[0] if data.get('as') else None, # es. "AS14061" da "AS14061 DigitalOcean, LLC" 'as_name': data.get('as', '').split(maxsplit=1)[1] if data.get('as') and ' ' in data.get('as') else data.get('as'), 'isp': data.get('isp'), } # Salva in cache self.cache[ip_address] = geo_info return geo_info else: # Errore API (es. IP privato) print(f"[GEO] Errore lookup {ip_address}: {data.get('message', 'Unknown error')}") return None else: print(f"[GEO] HTTP {response.status_code} per {ip_address}") return None except Exception as e: print(f"[GEO] Errore lookup {ip_address}: {e}") return None async def lookup_async(self, ip_address: str, client: httpx.AsyncClient) -> Optional[Dict]: """ Async lookup di un singolo IP """ # Check cache if ip_address in self.cache: return self.cache[ip_address] try: url = self.api_url.format(ip=ip_address) response = await client.get(url, timeout=5.0) if response.status_code == 200: data = response.json() if data.get('status') == 'success': geo_info = self._parse_geo_data(data) self.cache[ip_address] = geo_info return geo_info else: print(f"[GEO] Errore lookup {ip_address}: {data.get('message', 'Unknown')}") return None else: print(f"[GEO] HTTP {response.status_code} per {ip_address}") return None except Exception as e: print(f"[GEO] Errore async lookup {ip_address}: {e}") return None async def lookup_batch_async(self, ip_addresses: List[str]) -> Dict[str, Optional[Dict]]: """ Async batch lookup di multiple IPs (VELOCE - parallelo!) Usa batch API di ip-api.com per massima efficienza Returns: Dict {ip: geo_info} """ results = {} # Filtra IP giĆ  in cache uncached_ips = [ip for ip in ip_addresses if ip not in self.cache] # Aggiungi IP cached for ip in ip_addresses: if ip in self.cache: results[ip] = self.cache[ip] if not uncached_ips: return results # Tutti in cache! try: async with httpx.AsyncClient() as client: # Batch API supporta max 100 IP alla volta for i in range(0, len(uncached_ips), self.max_batch_size): batch = uncached_ips[i:i + self.max_batch_size] # Rate limiting await asyncio.sleep(1.5) # Batch request response = await client.post( self.batch_api_url, json=batch, timeout=10.0 ) if response.status_code == 200: batch_data = response.json() for data in batch_data: if data.get('status') == 'success': ip = data.get('query') geo_info = self._parse_geo_data(data) self.cache[ip] = geo_info results[ip] = geo_info else: # IP non valido o errore ip = data.get('query') results[ip] = None else: print(f"[GEO] Batch API HTTP {response.status_code}") # Fallback su lookup singoli for ip in batch: results[ip] = None except Exception as e: print(f"[GEO] Errore batch lookup: {e}") # Set None per tutti gli IP non processati for ip in uncached_ips: if ip not in results: results[ip] = None return results def _parse_geo_data(self, data: Dict) -> Dict: """Parse geo data from API response""" return { 'country': data.get('country'), 'country_code': data.get('countryCode'), 'city': data.get('city'), 'organization': data.get('org'), 'as_number': data.get('as', '').split()[0] if data.get('as') else None, 'as_name': data.get('as', '').split(maxsplit=1)[1] if data.get('as') and ' ' in data.get('as') else data.get('as'), 'isp': data.get('isp'), } def clear_cache(self): """Svuota cache""" self.cache.clear() def get_cache_size(self) -> int: """Numero IP in cache""" return len(self.cache) # Singleton instance _geo_service = None def get_geo_service() -> IPGeolocationService: """Get or create singleton instance""" global _geo_service if _geo_service is None: _geo_service = IPGeolocationService() return _geo_service