""" MikroTik Manager - Gestione router tramite API REST Più veloce e affidabile di SSH per 10+ router Porte REST API: 80 (HTTP) o 443 (HTTPS) """ import httpx import asyncio import ssl from typing import List, Dict, Optional, Set from datetime import datetime import base64 class MikroTikManager: """ Gestisce comunicazione con router MikroTik tramite API REST Supporta operazioni parallele su multipli router Porte default: 80 (HTTP REST) o 443 (HTTPS REST) """ def __init__(self, timeout: int = 15): self.timeout = timeout self.clients = {} def _get_client(self, router_ip: str, username: str, password: str, port: int = 80, use_ssl: bool = False) -> httpx.AsyncClient: """Ottiene o crea client HTTP per un router""" key = f"{router_ip}:{port}:{use_ssl}" if key not in self.clients: protocol = "https" if use_ssl or port == 443 else "http" auth = base64.b64encode(f"{username}:{password}".encode()).decode() headers = { "Authorization": f"Basic {auth}", "Content-Type": "application/json" } ssl_context = None if protocol == "https": ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE try: ssl_context.minimum_version = ssl.TLSVersion.TLSv1 except AttributeError: pass ssl_context.set_ciphers('DEFAULT@SECLEVEL=1') self.clients[key] = httpx.AsyncClient( base_url=f"{protocol}://{router_ip}:{port}", headers=headers, timeout=self.timeout, verify=ssl_context if ssl_context else True ) return self.clients[key] async def test_connection(self, router_ip: str, username: str, password: str, port: int = 80, use_ssl: bool = False) -> bool: """Testa connessione a un router""" try: if port == 443: use_ssl = True client = self._get_client(router_ip, username, password, port, use_ssl) response = await client.get("/rest/system/identity") return response.status_code == 200 except Exception as e: print(f"[ERROR] Connessione a {router_ip}:{port} fallita: {e}") return False async def _get_existing_ips_set( self, router_ip: str, username: str, password: str, list_name: str, port: int = 80, use_ssl: bool = False ) -> Set[str]: """Scarica la address-list UNA VOLTA e ritorna un set di IP già presenti""" try: if port == 443: use_ssl = True client = self._get_client(router_ip, username, password, port, use_ssl) response = await client.get(f"/rest/ip/firewall/address-list", params={"list": list_name}) if response.status_code == 200: entries = response.json() return {entry.get('address', '') for entry in entries if entry.get('list') == list_name} return set() except Exception as e: print(f"[ERROR] Lettura address-list da {router_ip}: {e}") return set() async def add_address_list( self, router_ip: str, username: str, password: str, ip_address: str, list_name: str = "ddos_blocked", comment: str = "", timeout_duration: str = "1h", port: int = 80, use_ssl: bool = False, skip_check: bool = False, existing_ips: Optional[Set[str]] = None ) -> bool: """ Aggiunge IP alla address-list del router skip_check: se True, non verifica se l'IP esiste già (per bulk operations) existing_ips: set di IP già nella lista (per evitare GET per ogni IP) """ try: if port == 443: use_ssl = True client = self._get_client(router_ip, username, password, port, use_ssl) if not skip_check: if existing_ips is not None: if ip_address in existing_ips: return True else: response = await client.get("/rest/ip/firewall/address-list") if response.status_code == 200: for entry in response.json(): if entry.get('address') == ip_address and entry.get('list') == list_name: return True data = { "list": list_name, "address": ip_address, "comment": comment or f"IDS block {datetime.now().isoformat()}", "timeout": timeout_duration } response = await client.post("/rest/ip/firewall/address-list/add", json=data) if response.status_code in (200, 201): print(f"[SUCCESS] IP {ip_address} aggiunto a {list_name} su {router_ip}") return True elif response.status_code in (400, 409): resp_text = response.text.lower() if "already" in resp_text or "exists" in resp_text or "duplicate" in resp_text or "failure: already" in resp_text: return True try: verify_resp = await client.get("/rest/ip/firewall/address-list", params={"address": ip_address}) if verify_resp.status_code == 200: for entry in verify_resp.json(): if entry.get('address') == ip_address and entry.get('list') == list_name: return True except Exception: pass print(f"[ERROR] IP {ip_address} su {router_ip}: {response.status_code} - {response.text}") return False else: print(f"[ERROR] Aggiunta IP {ip_address} su {router_ip}: {response.status_code} - {response.text}") return False except Exception as e: print(f"[ERROR] Eccezione aggiunta IP {ip_address} su {router_ip}: {e}") return False async def remove_address_list( self, router_ip: str, username: str, password: str, ip_address: str, list_name: str = "ddos_blocked", port: int = 80, use_ssl: bool = False ) -> bool: """Rimuove IP dalla address-list del router""" try: if port == 443: use_ssl = True client = self._get_client(router_ip, username, password, port, use_ssl) response = await client.get("/rest/ip/firewall/address-list") if response.status_code != 200: return False entries = response.json() for entry in entries: if entry.get('address') == ip_address and entry.get('list') == list_name: entry_id = entry.get('.id') response = await client.delete(f"/rest/ip/firewall/address-list/{entry_id}") if response.status_code == 200: print(f"[SUCCESS] IP {ip_address} rimosso da {list_name} su {router_ip}") return True print(f"[INFO] IP {ip_address} non trovato in {list_name} su {router_ip}") return False except Exception as e: print(f"[ERROR] Eccezione rimozione IP {ip_address} da {router_ip}: {e}") return False async def get_address_list( self, router_ip: str, username: str, password: str, list_name: Optional[str] = None, port: int = 80, use_ssl: bool = False ) -> List[Dict]: """Ottiene address-list da router""" try: if port == 443: use_ssl = True client = self._get_client(router_ip, username, password, port, use_ssl) response = await client.get("/rest/ip/firewall/address-list") if response.status_code == 200: entries = response.json() if list_name: entries = [e for e in entries if e.get('list') == list_name] return entries return [] except Exception as e: print(f"[ERROR] Eccezione lettura address-list da {router_ip}: {e}") return [] async def block_ip_on_all_routers( self, routers: List[Dict], ip_address: str, list_name: str = "ddos_blocked", comment: str = "", timeout_duration: str = "1h" ) -> Dict[str, bool]: """ Blocca IP su tutti i router in parallelo Returns: dict con {router_ip: success_bool} """ tasks = [] router_ips = [] for router in routers: if not router.get('enabled', True): continue task = self.add_address_list( router_ip=router['ip_address'], username=router['username'], password=router['password'], ip_address=ip_address, list_name=list_name, comment=comment, timeout_duration=timeout_duration, port=router.get('api_port', 80) ) tasks.append(task) router_ips.append(router['ip_address']) results = await asyncio.gather(*tasks, return_exceptions=True) return { router_ip: result if not isinstance(result, Exception) else False for router_ip, result in zip(router_ips, results) } async def bulk_block_ips_on_all_routers( self, routers: List[Dict], ip_list: List[str], list_name: str = "ddos_blocked", comment_prefix: str = "IDS bulk-block", timeout_duration: str = "1h", concurrency: int = 10, progress_callback=None ) -> Dict[str, Dict[str, bool]]: """ Blocco massivo ottimizzato: scarica address-list UNA VOLTA per router, poi aggiunge solo IP non presenti con concurrency limitata. Returns: {ip: {router_ip: success_bool}} """ enabled_routers = [r for r in routers if r.get('enabled', True)] if not enabled_routers: return {} print(f"[BULK] Inizio blocco massivo: {len(ip_list)} IP su {len(enabled_routers)} router") existing_cache = {} for router in enabled_routers: router_ip = router['ip_address'] port = router.get('api_port', 80) use_ssl = port == 443 existing_ips = await self._get_existing_ips_set( router_ip, router['username'], router['password'], list_name, port, use_ssl ) existing_cache[router_ip] = existing_ips print(f"[BULK] Router {router_ip}: {len(existing_ips)} IP già in lista") new_ips = [] for ip in ip_list: is_new_on_any = False for router in enabled_routers: if ip not in existing_cache.get(router['ip_address'], set()): is_new_on_any = True break if is_new_on_any: new_ips.append(ip) already_blocked = len(ip_list) - len(new_ips) print(f"[BULK] {already_blocked} IP già bloccati, {len(new_ips)} nuovi da bloccare") results = {} semaphore = asyncio.Semaphore(concurrency) blocked_count = 0 async def block_single_ip(ip: str) -> Dict[str, bool]: nonlocal blocked_count async with semaphore: router_results = {} tasks = [] r_ips = [] for router in enabled_routers: r_ip = router['ip_address'] if ip in existing_cache.get(r_ip, set()): router_results[r_ip] = True continue task = self.add_address_list( router_ip=r_ip, username=router['username'], password=router['password'], ip_address=ip, list_name=list_name, comment=f"{comment_prefix} {ip}", timeout_duration=timeout_duration, port=router.get('api_port', 80), skip_check=True ) tasks.append(task) r_ips.append(r_ip) if tasks: task_results = await asyncio.gather(*tasks, return_exceptions=True) for r_ip, result in zip(r_ips, task_results): router_results[r_ip] = result if not isinstance(result, Exception) else False blocked_count += 1 if progress_callback and blocked_count % 50 == 0: await progress_callback(blocked_count, len(new_ips)) return router_results batch_tasks = [block_single_ip(ip) for ip in new_ips] batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) for ip, result in zip(new_ips, batch_results): if isinstance(result, Exception): results[ip] = {r['ip_address']: False for r in enabled_routers} else: results[ip] = result for ip in ip_list: if ip not in results: results[ip] = {r['ip_address']: True for r in enabled_routers} total_success = sum(1 for ip_results in results.values() if any(ip_results.values())) print(f"[BULK] Completato: {total_success}/{len(ip_list)} IP bloccati con successo") return results async def unblock_ip_on_all_routers( self, routers: List[Dict], ip_address: str, list_name: str = "ddos_blocked" ) -> Dict[str, bool]: """Sblocca IP da tutti i router in parallelo""" tasks = [] router_ips = [] for router in routers: if not router.get('enabled', True): continue task = self.remove_address_list( router_ip=router['ip_address'], username=router['username'], password=router['password'], ip_address=ip_address, list_name=list_name, port=router.get('api_port', 80) ) tasks.append(task) router_ips.append(router['ip_address']) results = await asyncio.gather(*tasks, return_exceptions=True) return { router_ip: result if not isinstance(result, Exception) else False for router_ip, result in zip(router_ips, results) } async def close_all(self): """Chiude tutti i client HTTP""" for client in self.clients.values(): await client.aclose() self.clients.clear() class MikroTikSSHManager: """Fallback usando SSH se API REST non disponibile""" def __init__(self): print("[WARN] SSH Manager è un fallback. Usa API REST per migliori performance.") async def add_address_list(self, *args, **kwargs) -> bool: """Implementazione SSH fallback (da implementare se necessario)""" print("[WARN] SSH fallback non ancora implementato. Usa API REST.") return False if __name__ == "__main__": async def test(): manager = MikroTikManager() test_router = { 'ip_address': '192.168.1.1', 'username': 'admin', 'password': 'password', 'api_port': 80, 'enabled': True } print("Testing connection...") connected = await manager.test_connection( test_router['ip_address'], test_router['username'], test_router['password'], port=test_router['api_port'] ) print(f"Connected: {connected}") if connected: print("\nTesting IP block...") result = await manager.add_address_list( test_router['ip_address'], test_router['username'], test_router['password'], ip_address='10.0.0.100', list_name='ddos_test', comment='Test IDS', timeout_duration='10m', port=test_router['api_port'] ) print(f"Block result: {result}") print("\nReading address list...") entries = await manager.get_address_list( test_router['ip_address'], test_router['username'], test_router['password'], list_name='ddos_test', port=test_router['api_port'] ) print(f"Entries: {entries}") await manager.close_all() print("=== TEST MIKROTIK MANAGER ===\n") asyncio.run(test())