Integrates IP geolocation and Autonomous System (AS) information into detection records by modifying the frontend to display this data and updating the backend to perform asynchronous batch lookups for efficiency. This enhancement includes database schema updates and the creation of a new IP geolocation service. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: intermediate_checkpoint Replit-Commit-Event-Id: e81fd4a1-b7b0-48d2-ae38-f5905e278343 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/SXFWABi
204 lines
7.5 KiB
Python
204 lines
7.5 KiB
Python
"""
|
|
IP Geolocation Service
|
|
Usa ip-api.com per ottenere informazioni geografiche e AS per IP address
|
|
Free tier: 45 richieste/minuto
|
|
Supporta lookup async batch per performance ottimali
|
|
"""
|
|
|
|
import httpx
|
|
from typing import Dict, Optional, List
|
|
import time
|
|
import asyncio
|
|
|
|
|
|
class IPGeolocationService:
|
|
"""
|
|
Servizio per geolocalizzazione IP usando ip-api.com
|
|
Include caching per ridurre chiamate API
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.api_url = "http://ip-api.com/json/{ip}"
|
|
self.batch_api_url = "http://ip-api.com/batch"
|
|
self.cache: Dict[str, Dict] = {}
|
|
self.last_request_time = 0
|
|
self.min_delay = 1.5 # secondi tra richieste (per restare sotto 45/min)
|
|
self.max_batch_size = 100 # ip-api.com supporta max 100 IP per batch
|
|
|
|
def lookup(self, ip_address: str) -> Optional[Dict]:
|
|
"""
|
|
Ottieni informazioni geografiche per un IP
|
|
Returns: Dict con country, city, org, as, isp o None se errore
|
|
"""
|
|
# Check cache
|
|
if ip_address in self.cache:
|
|
return self.cache[ip_address]
|
|
|
|
# Rate limiting: attendi se troppo veloce
|
|
current_time = time.time()
|
|
time_since_last = current_time - self.last_request_time
|
|
if time_since_last < self.min_delay:
|
|
time.sleep(self.min_delay - time_since_last)
|
|
|
|
try:
|
|
# Richiesta API
|
|
url = self.api_url.format(ip=ip_address)
|
|
response = httpx.get(url, timeout=5.0)
|
|
|
|
self.last_request_time = time.time()
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
|
|
# Controlla se successo
|
|
if data.get('status') == 'success':
|
|
geo_info = {
|
|
'country': data.get('country'),
|
|
'country_code': data.get('countryCode'),
|
|
'city': data.get('city'),
|
|
'organization': data.get('org'),
|
|
'as_number': data.get('as', '').split()[0] if data.get('as') else None, # es. "AS14061" da "AS14061 DigitalOcean, LLC"
|
|
'as_name': data.get('as', '').split(maxsplit=1)[1] if data.get('as') and ' ' in data.get('as') else data.get('as'),
|
|
'isp': data.get('isp'),
|
|
}
|
|
|
|
# Salva in cache
|
|
self.cache[ip_address] = geo_info
|
|
|
|
return geo_info
|
|
else:
|
|
# Errore API (es. IP privato)
|
|
print(f"[GEO] Errore lookup {ip_address}: {data.get('message', 'Unknown error')}")
|
|
return None
|
|
else:
|
|
print(f"[GEO] HTTP {response.status_code} per {ip_address}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"[GEO] Errore lookup {ip_address}: {e}")
|
|
return None
|
|
|
|
async def lookup_async(self, ip_address: str, client: httpx.AsyncClient) -> Optional[Dict]:
|
|
"""
|
|
Async lookup di un singolo IP
|
|
"""
|
|
# Check cache
|
|
if ip_address in self.cache:
|
|
return self.cache[ip_address]
|
|
|
|
try:
|
|
url = self.api_url.format(ip=ip_address)
|
|
response = await client.get(url, timeout=5.0)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
|
|
if data.get('status') == 'success':
|
|
geo_info = self._parse_geo_data(data)
|
|
self.cache[ip_address] = geo_info
|
|
return geo_info
|
|
else:
|
|
print(f"[GEO] Errore lookup {ip_address}: {data.get('message', 'Unknown')}")
|
|
return None
|
|
else:
|
|
print(f"[GEO] HTTP {response.status_code} per {ip_address}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"[GEO] Errore async lookup {ip_address}: {e}")
|
|
return None
|
|
|
|
async def lookup_batch_async(self, ip_addresses: List[str]) -> Dict[str, Optional[Dict]]:
|
|
"""
|
|
Async batch lookup di multiple IPs (VELOCE - parallelo!)
|
|
Usa batch API di ip-api.com per massima efficienza
|
|
Returns: Dict {ip: geo_info}
|
|
"""
|
|
results = {}
|
|
|
|
# Filtra IP già in cache
|
|
uncached_ips = [ip for ip in ip_addresses if ip not in self.cache]
|
|
|
|
# Aggiungi IP cached
|
|
for ip in ip_addresses:
|
|
if ip in self.cache:
|
|
results[ip] = self.cache[ip]
|
|
|
|
if not uncached_ips:
|
|
return results # Tutti in cache!
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
# Batch API supporta max 100 IP alla volta
|
|
for i in range(0, len(uncached_ips), self.max_batch_size):
|
|
batch = uncached_ips[i:i + self.max_batch_size]
|
|
|
|
# Rate limiting
|
|
await asyncio.sleep(1.5)
|
|
|
|
# Batch request
|
|
response = await client.post(
|
|
self.batch_api_url,
|
|
json=batch,
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
batch_data = response.json()
|
|
|
|
for data in batch_data:
|
|
if data.get('status') == 'success':
|
|
ip = data.get('query')
|
|
geo_info = self._parse_geo_data(data)
|
|
self.cache[ip] = geo_info
|
|
results[ip] = geo_info
|
|
else:
|
|
# IP non valido o errore
|
|
ip = data.get('query')
|
|
results[ip] = None
|
|
else:
|
|
print(f"[GEO] Batch API HTTP {response.status_code}")
|
|
# Fallback su lookup singoli
|
|
for ip in batch:
|
|
results[ip] = None
|
|
|
|
except Exception as e:
|
|
print(f"[GEO] Errore batch lookup: {e}")
|
|
# Set None per tutti gli IP non processati
|
|
for ip in uncached_ips:
|
|
if ip not in results:
|
|
results[ip] = None
|
|
|
|
return results
|
|
|
|
def _parse_geo_data(self, data: Dict) -> Dict:
|
|
"""Parse geo data from API response"""
|
|
return {
|
|
'country': data.get('country'),
|
|
'country_code': data.get('countryCode'),
|
|
'city': data.get('city'),
|
|
'organization': data.get('org'),
|
|
'as_number': data.get('as', '').split()[0] if data.get('as') else None,
|
|
'as_name': data.get('as', '').split(maxsplit=1)[1] if data.get('as') and ' ' in data.get('as') else data.get('as'),
|
|
'isp': data.get('isp'),
|
|
}
|
|
|
|
def clear_cache(self):
|
|
"""Svuota cache"""
|
|
self.cache.clear()
|
|
|
|
def get_cache_size(self) -> int:
|
|
"""Numero IP in cache"""
|
|
return len(self.cache)
|
|
|
|
|
|
# Singleton instance
|
|
_geo_service = None
|
|
|
|
def get_geo_service() -> IPGeolocationService:
|
|
"""Get or create singleton instance"""
|
|
global _geo_service
|
|
if _geo_service is None:
|
|
_geo_service = IPGeolocationService()
|
|
return _geo_service
|