#!/usr/bin/env python3 """ Script di test connessione MikroTik API Verifica connessione a tutti i router configurati nel database """ import asyncio import os import sys from dotenv import load_dotenv import psycopg2 from mikrotik_manager import MikroTikManager # Load environment variables load_dotenv() def get_routers_from_db(): """Recupera router configurati dal database""" try: conn = psycopg2.connect( host=os.getenv("PGHOST"), port=os.getenv("PGPORT"), database=os.getenv("PGDATABASE"), user=os.getenv("PGUSER"), password=os.getenv("PGPASSWORD") ) cursor = conn.cursor() cursor.execute(""" SELECT id, name, ip_address, api_port, username, password, enabled FROM routers ORDER BY name """) routers = [] for row in cursor.fetchall(): routers.append({ 'id': row[0], 'name': row[1], 'ip_address': row[2], 'api_port': row[3], 'username': row[4], 'password': row[5], 'enabled': row[6] }) cursor.close() conn.close() return routers except Exception as e: print(f"โŒ Errore connessione database: {e}") return [] async def test_router_connection(manager, router): """Testa connessione a un singolo router""" print(f"\n{'='*60}") print(f"๐Ÿ” Test Router: {router['name']}") print(f"{'='*60}") print(f" IP: {router['ip_address']}") print(f" Porta: {router['api_port']}") print(f" Username: {router['username']}") print(f" Enabled: {'โœ… Sรฌ' if router['enabled'] else 'โŒ No'}") if not router['enabled']: print(f" โš ๏ธ Router disabilitato - skip test") return False # Test connessione print(f"\n ๐Ÿ“ก Test connessione...") try: connected = await manager.test_connection( router_ip=router['ip_address'], username=router['username'], password=router['password'], port=router['api_port'] ) if connected: print(f" โœ… Connessione OK!") # Test lettura address-list print(f" ๐Ÿ“‹ Lettura address-list...") entries = await manager.get_address_list( router_ip=router['ip_address'], username=router['username'], password=router['password'], list_name="ddos_blocked", port=router['api_port'] ) print(f" โœ… Trovati {len(entries)} IP in lista 'ddos_blocked'") # Mostra primi 5 IP if entries: print(f"\n ๐Ÿ“Œ Primi 5 IP bloccati:") for entry in entries[:5]: ip = entry.get('address', 'N/A') comment = entry.get('comment', 'N/A') timeout = entry.get('timeout', 'N/A') print(f" - {ip} | {comment} | timeout: {timeout}") return True else: print(f" โŒ Connessione FALLITA") print(f"\n ๐Ÿ”ง Suggerimenti:") print(f" 1. Verifica che il router sia raggiungibile:") print(f" ping {router['ip_address']}") print(f" 2. Verifica che il servizio API sia abilitato sul router:") print(f" /ip service print (deve mostrare 'api' o 'api-ssl' enabled)") print(f" 3. Verifica firewall non blocchi porta {router['api_port']}") print(f" 4. Verifica credenziali (username/password)") return False except Exception as e: print(f" โŒ Errore durante test: {e}") print(f" ๐Ÿ“‹ Dettagli errore: {type(e).__name__}") return False async def test_block_unblock(manager, router, test_ip="1.2.3.4"): """Testa blocco/sblocco IP""" print(f"\n ๐Ÿงช Test blocco/sblocco IP {test_ip}...") # Test blocco print(f" Blocco IP...") blocked = await manager.add_address_list( router_ip=router['ip_address'], username=router['username'], password=router['password'], ip_address=test_ip, list_name="ids_test", comment="Test IDS API Fix", timeout_duration="5m", port=router['api_port'] ) if blocked: print(f" โœ… IP bloccato con successo!") # Aspetta 2 secondi await asyncio.sleep(2) # Test sblocco print(f" Sblocco IP...") unblocked = await manager.remove_address_list( router_ip=router['ip_address'], username=router['username'], password=router['password'], ip_address=test_ip, list_name="ids_test", port=router['api_port'] ) if unblocked: print(f" โœ… IP sbloccato con successo!") return True else: print(f" โš ๏ธ Sblocco fallito (ma blocco OK)") return True else: print(f" โŒ Blocco IP fallito") return False async def main(): """Test principale""" print("โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•—") print("โ•‘ TEST CONNESSIONE MIKROTIK API REST โ•‘") print("โ•‘ IDS v2.0.0 - Hybrid Detector โ•‘") print("โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•") # Recupera router dal database print("\n๐Ÿ“Š Caricamento router dal database...") routers = get_routers_from_db() if not routers: print("โŒ Nessun router trovato nel database!") print("\n๐Ÿ’ก Aggiungi router da: https://ids.alfacom.it/routers") return print(f"โœ… Trovati {len(routers)} router configurati\n") # Crea manager manager = MikroTikManager(timeout=10) # Test ogni router results = [] for router in routers: result = await test_router_connection(manager, router) results.append({ 'name': router['name'], 'ip': router['ip_address'], 'connected': result }) # Se connesso, testa blocco/sblocco if result and router['enabled']: test_ok = await test_block_unblock(manager, router) results[-1]['block_test'] = test_ok # Riepilogo print(f"\n{'='*60}") print("๐Ÿ“Š RIEPILOGO TEST") print(f"{'='*60}\n") for r in results: conn_status = "โœ… OK" if r['connected'] else "โŒ FAIL" block_status = "" if 'block_test' in r: block_status = " | Blocco: " + ("โœ… OK" if r['block_test'] else "โŒ FAIL") print(f" {r['name']:20s} ({r['ip']:15s}): {conn_status}{block_status}") success_count = sum(1 for r in results if r['connected']) print(f"\n Totale: {success_count}/{len(results)} router connessi\n") # Cleanup await manager.close_all() # Exit code sys.exit(0 if success_count == len(results) else 1) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n\nโš ๏ธ Test interrotto dall'utente") sys.exit(1) except Exception as e: print(f"\n\nโŒ Errore critico: {e}") import traceback traceback.print_exc() sys.exit(1)