ids.alfacom.it/python_ml/ip_geolocation.py
marco370 1b9df79d56 Add IP geolocation and AS information to detection records
Integrates IP geolocation and Autonomous System (AS) information into detection records by modifying the frontend to display this data and updating the backend to perform asynchronous batch lookups for efficiency. This enhancement includes database schema updates and the creation of a new IP geolocation service.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: intermediate_checkpoint
Replit-Commit-Event-Id: e81fd4a1-b7b0-48d2-ae38-f5905e278343
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/SXFWABi
2025-11-22 10:59:50 +00:00

204 lines
7.5 KiB
Python

"""
IP Geolocation Service
Usa ip-api.com per ottenere informazioni geografiche e AS per IP address
Free tier: 45 richieste/minuto
Supporta lookup async batch per performance ottimali
"""
import httpx
from typing import Dict, Optional, List
import time
import asyncio
class IPGeolocationService:
"""
Servizio per geolocalizzazione IP usando ip-api.com
Include caching per ridurre chiamate API
"""
def __init__(self):
self.api_url = "http://ip-api.com/json/{ip}"
self.batch_api_url = "http://ip-api.com/batch"
self.cache: Dict[str, Dict] = {}
self.last_request_time = 0
self.min_delay = 1.5 # secondi tra richieste (per restare sotto 45/min)
self.max_batch_size = 100 # ip-api.com supporta max 100 IP per batch
def lookup(self, ip_address: str) -> Optional[Dict]:
"""
Ottieni informazioni geografiche per un IP
Returns: Dict con country, city, org, as, isp o None se errore
"""
# Check cache
if ip_address in self.cache:
return self.cache[ip_address]
# Rate limiting: attendi se troppo veloce
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_delay:
time.sleep(self.min_delay - time_since_last)
try:
# Richiesta API
url = self.api_url.format(ip=ip_address)
response = httpx.get(url, timeout=5.0)
self.last_request_time = time.time()
if response.status_code == 200:
data = response.json()
# Controlla se successo
if data.get('status') == 'success':
geo_info = {
'country': data.get('country'),
'country_code': data.get('countryCode'),
'city': data.get('city'),
'organization': data.get('org'),
'as_number': data.get('as', '').split()[0] if data.get('as') else None, # es. "AS14061" da "AS14061 DigitalOcean, LLC"
'as_name': data.get('as', '').split(maxsplit=1)[1] if data.get('as') and ' ' in data.get('as') else data.get('as'),
'isp': data.get('isp'),
}
# Salva in cache
self.cache[ip_address] = geo_info
return geo_info
else:
# Errore API (es. IP privato)
print(f"[GEO] Errore lookup {ip_address}: {data.get('message', 'Unknown error')}")
return None
else:
print(f"[GEO] HTTP {response.status_code} per {ip_address}")
return None
except Exception as e:
print(f"[GEO] Errore lookup {ip_address}: {e}")
return None
async def lookup_async(self, ip_address: str, client: httpx.AsyncClient) -> Optional[Dict]:
"""
Async lookup di un singolo IP
"""
# Check cache
if ip_address in self.cache:
return self.cache[ip_address]
try:
url = self.api_url.format(ip=ip_address)
response = await client.get(url, timeout=5.0)
if response.status_code == 200:
data = response.json()
if data.get('status') == 'success':
geo_info = self._parse_geo_data(data)
self.cache[ip_address] = geo_info
return geo_info
else:
print(f"[GEO] Errore lookup {ip_address}: {data.get('message', 'Unknown')}")
return None
else:
print(f"[GEO] HTTP {response.status_code} per {ip_address}")
return None
except Exception as e:
print(f"[GEO] Errore async lookup {ip_address}: {e}")
return None
async def lookup_batch_async(self, ip_addresses: List[str]) -> Dict[str, Optional[Dict]]:
"""
Async batch lookup di multiple IPs (VELOCE - parallelo!)
Usa batch API di ip-api.com per massima efficienza
Returns: Dict {ip: geo_info}
"""
results = {}
# Filtra IP già in cache
uncached_ips = [ip for ip in ip_addresses if ip not in self.cache]
# Aggiungi IP cached
for ip in ip_addresses:
if ip in self.cache:
results[ip] = self.cache[ip]
if not uncached_ips:
return results # Tutti in cache!
try:
async with httpx.AsyncClient() as client:
# Batch API supporta max 100 IP alla volta
for i in range(0, len(uncached_ips), self.max_batch_size):
batch = uncached_ips[i:i + self.max_batch_size]
# Rate limiting
await asyncio.sleep(1.5)
# Batch request
response = await client.post(
self.batch_api_url,
json=batch,
timeout=10.0
)
if response.status_code == 200:
batch_data = response.json()
for data in batch_data:
if data.get('status') == 'success':
ip = data.get('query')
geo_info = self._parse_geo_data(data)
self.cache[ip] = geo_info
results[ip] = geo_info
else:
# IP non valido o errore
ip = data.get('query')
results[ip] = None
else:
print(f"[GEO] Batch API HTTP {response.status_code}")
# Fallback su lookup singoli
for ip in batch:
results[ip] = None
except Exception as e:
print(f"[GEO] Errore batch lookup: {e}")
# Set None per tutti gli IP non processati
for ip in uncached_ips:
if ip not in results:
results[ip] = None
return results
def _parse_geo_data(self, data: Dict) -> Dict:
"""Parse geo data from API response"""
return {
'country': data.get('country'),
'country_code': data.get('countryCode'),
'city': data.get('city'),
'organization': data.get('org'),
'as_number': data.get('as', '').split()[0] if data.get('as') else None,
'as_name': data.get('as', '').split(maxsplit=1)[1] if data.get('as') and ' ' in data.get('as') else data.get('as'),
'isp': data.get('isp'),
}
def clear_cache(self):
"""Svuota cache"""
self.cache.clear()
def get_cache_size(self) -> int:
"""Numero IP in cache"""
return len(self.cache)
# Singleton instance
_geo_service = None
def get_geo_service() -> IPGeolocationService:
"""Get or create singleton instance"""
global _geo_service
if _geo_service is None:
_geo_service = IPGeolocationService()
return _geo_service