""" MikroTik Manager - Gestione router tramite API REST Più veloce e affidabile di SSH per 10+ router """ import httpx import asyncio from typing import List, Dict, Optional from datetime import datetime import hashlib import base64 class MikroTikManager: """ Gestisce comunicazione con router MikroTik tramite API REST Supporta operazioni parallele su multipli router """ def __init__(self, timeout: int = 10): self.timeout = timeout self.clients = {} # Cache di client HTTP per router def _get_client(self, router_ip: str, username: str, password: str, port: int = 8728) -> httpx.AsyncClient: """Ottiene o crea client HTTP per un router""" key = f"{router_ip}:{port}" if key not in self.clients: # API REST MikroTik usa porta HTTP/HTTPS (default 80/443) # Per semplicità useremo richieste HTTP dirette auth = base64.b64encode(f"{username}:{password}".encode()).decode() headers = { "Authorization": f"Basic {auth}", "Content-Type": "application/json" } self.clients[key] = httpx.AsyncClient( base_url=f"http://{router_ip}", headers=headers, timeout=self.timeout ) return self.clients[key] async def test_connection(self, router_ip: str, username: str, password: str, port: int = 8728) -> bool: """Testa connessione a un router""" try: client = self._get_client(router_ip, username, password, port) # Prova a leggere system identity response = await client.get("/rest/system/identity") return response.status_code == 200 except Exception as e: print(f"[ERROR] Connessione a {router_ip} fallita: {e}") return False async def add_address_list( self, router_ip: str, username: str, password: str, ip_address: str, list_name: str = "ddos_blocked", comment: str = "", timeout_duration: str = "1h", port: int = 8728 ) -> bool: """ Aggiunge IP alla address-list del router timeout_duration: es. "1h", "30m", "1d" """ try: client = self._get_client(router_ip, username, password, port) # Controlla se IP già esiste response = await client.get("/rest/ip/firewall/address-list") if response.status_code == 200: existing = response.json() for entry in existing: if entry.get('address') == ip_address and entry.get('list') == list_name: print(f"[INFO] IP {ip_address} già in lista {list_name} su {router_ip}") return True # Aggiungi nuovo IP data = { "list": list_name, "address": ip_address, "comment": comment or f"IDS block {datetime.now().isoformat()}", "timeout": timeout_duration } response = await client.post("/rest/ip/firewall/address-list/add", json=data) if response.status_code == 201 or response.status_code == 200: print(f"[SUCCESS] IP {ip_address} aggiunto a {list_name} su {router_ip} (timeout: {timeout_duration})") return True else: print(f"[ERROR] Errore aggiunta IP {ip_address} su {router_ip}: {response.status_code} - {response.text}") return False except Exception as e: print(f"[ERROR] Eccezione aggiunta IP {ip_address} su {router_ip}: {e}") return False async def remove_address_list( self, router_ip: str, username: str, password: str, ip_address: str, list_name: str = "ddos_blocked", port: int = 8728 ) -> bool: """Rimuove IP dalla address-list del router""" try: client = self._get_client(router_ip, username, password, port) # Trova ID dell'entry response = await client.get("/rest/ip/firewall/address-list") if response.status_code != 200: return False entries = response.json() for entry in entries: if entry.get('address') == ip_address and entry.get('list') == list_name: entry_id = entry.get('.id') # Rimuovi entry response = await client.delete(f"/rest/ip/firewall/address-list/{entry_id}") if response.status_code == 200: print(f"[SUCCESS] IP {ip_address} rimosso da {list_name} su {router_ip}") return True print(f"[INFO] IP {ip_address} non trovato in {list_name} su {router_ip}") return False except Exception as e: print(f"[ERROR] Eccezione rimozione IP {ip_address} da {router_ip}: {e}") return False async def get_address_list( self, router_ip: str, username: str, password: str, list_name: Optional[str] = None, port: int = 8728 ) -> List[Dict]: """Ottiene address-list da router""" try: client = self._get_client(router_ip, username, password, port) response = await client.get("/rest/ip/firewall/address-list") if response.status_code == 200: entries = response.json() if list_name: entries = [e for e in entries if e.get('list') == list_name] return entries return [] except Exception as e: print(f"[ERROR] Eccezione lettura address-list da {router_ip}: {e}") return [] async def block_ip_on_all_routers( self, routers: List[Dict], ip_address: str, list_name: str = "ddos_blocked", comment: str = "", timeout_duration: str = "1h" ) -> Dict[str, bool]: """ Blocca IP su tutti i router in parallelo routers: lista di dict con {ip_address, username, password, api_port} Returns: dict con {router_ip: success_bool} """ tasks = [] router_ips = [] for router in routers: if not router.get('enabled', True): continue task = self.add_address_list( router_ip=router['ip_address'], username=router['username'], password=router['password'], ip_address=ip_address, list_name=list_name, comment=comment, timeout_duration=timeout_duration, port=router.get('api_port', 8728) ) tasks.append(task) router_ips.append(router['ip_address']) # Esegui in parallelo results = await asyncio.gather(*tasks, return_exceptions=True) # Combina risultati return { router_ip: result if not isinstance(result, Exception) else False for router_ip, result in zip(router_ips, results) } async def unblock_ip_on_all_routers( self, routers: List[Dict], ip_address: str, list_name: str = "ddos_blocked" ) -> Dict[str, bool]: """Sblocca IP da tutti i router in parallelo""" tasks = [] router_ips = [] for router in routers: if not router.get('enabled', True): continue task = self.remove_address_list( router_ip=router['ip_address'], username=router['username'], password=router['password'], ip_address=ip_address, list_name=list_name, port=router.get('api_port', 8728) ) tasks.append(task) router_ips.append(router['ip_address']) results = await asyncio.gather(*tasks, return_exceptions=True) return { router_ip: result if not isinstance(result, Exception) else False for router_ip, result in zip(router_ips, results) } async def close_all(self): """Chiude tutti i client HTTP""" for client in self.clients.values(): await client.aclose() self.clients.clear() # Fallback SSH per router che non supportano API REST class MikroTikSSHManager: """Fallback usando SSH se API REST non disponibile""" def __init__(self): print("[WARN] SSH Manager è un fallback. Usa API REST per migliori performance.") async def add_address_list(self, *args, **kwargs) -> bool: """Implementazione SSH fallback (da implementare se necessario)""" print("[WARN] SSH fallback non ancora implementato. Usa API REST.") return False if __name__ == "__main__": # Test MikroTik Manager async def test(): manager = MikroTikManager() # Test router demo (sostituire con dati reali) test_router = { 'ip_address': '192.168.1.1', 'username': 'admin', 'password': 'password', 'api_port': 8728, 'enabled': True } # Test connessione print("Testing connection...") connected = await manager.test_connection( test_router['ip_address'], test_router['username'], test_router['password'] ) print(f"Connected: {connected}") # Test blocco IP if connected: print("\nTesting IP block...") result = await manager.add_address_list( test_router['ip_address'], test_router['username'], test_router['password'], ip_address='10.0.0.100', list_name='ddos_test', comment='Test IDS', timeout_duration='10m' ) print(f"Block result: {result}") # Leggi lista print("\nReading address list...") entries = await manager.get_address_list( test_router['ip_address'], test_router['username'], test_router['password'], list_name='ddos_test' ) print(f"Entries: {entries}") await manager.close_all() # Esegui test print("=== TEST MIKROTIK MANAGER ===\n") asyncio.run(test())