ids.alfacom.it/python_ml/test_mikrotik_connection.py
marco370 fffc53d0a6 Improve error reporting and add a simple connection test script
Adds enhanced error logging with traceback to the main connection test script and introduces a new, simplified script for step-by-step MikroTik connection testing.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: e1e6bdd5-fda7-4085-ad95-6f07f4b68b3c
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/jFtLBWL
2025-11-25 18:00:33 +00:00

241 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""
Script di test connessione MikroTik API
Verifica connessione a tutti i router configurati nel database
"""
import asyncio
import os
import sys
from dotenv import load_dotenv
import psycopg2
from mikrotik_manager import MikroTikManager
# Load environment variables
load_dotenv()
def get_routers_from_db():
"""Recupera router configurati dal database"""
try:
conn = psycopg2.connect(
host=os.getenv("PGHOST"),
port=os.getenv("PGPORT"),
database=os.getenv("PGDATABASE"),
user=os.getenv("PGUSER"),
password=os.getenv("PGPASSWORD")
)
cursor = conn.cursor()
cursor.execute("""
SELECT
id, name, ip_address, api_port,
username, password, enabled
FROM routers
ORDER BY name
""")
routers = []
for row in cursor.fetchall():
routers.append({
'id': row[0],
'name': row[1],
'ip_address': row[2],
'api_port': row[3],
'username': row[4],
'password': row[5],
'enabled': row[6]
})
cursor.close()
conn.close()
return routers
except Exception as e:
print(f"❌ Errore connessione database: {e}")
return []
async def test_router_connection(manager, router):
"""Testa connessione a un singolo router"""
print(f"\n{'='*60}")
print(f"🔍 Test Router: {router['name']}")
print(f"{'='*60}")
print(f" IP: {router['ip_address']}")
print(f" Porta: {router['api_port']}")
print(f" Username: {router['username']}")
print(f" Enabled: {'✅ Sì' if router['enabled'] else '❌ No'}")
if not router['enabled']:
print(f" ⚠️ Router disabilitato - skip test")
return False
# Test connessione
print(f"\n 📡 Test connessione...")
try:
connected = await manager.test_connection(
router_ip=router['ip_address'],
username=router['username'],
password=router['password'],
port=router['api_port']
)
if connected:
print(f" ✅ Connessione OK!")
# Test lettura address-list
print(f" 📋 Lettura address-list...")
entries = await manager.get_address_list(
router_ip=router['ip_address'],
username=router['username'],
password=router['password'],
list_name="ddos_blocked",
port=router['api_port']
)
print(f" ✅ Trovati {len(entries)} IP in lista 'ddos_blocked'")
# Mostra primi 5 IP
if entries:
print(f"\n 📌 Primi 5 IP bloccati:")
for entry in entries[:5]:
ip = entry.get('address', 'N/A')
comment = entry.get('comment', 'N/A')
timeout = entry.get('timeout', 'N/A')
print(f" - {ip} | {comment} | timeout: {timeout}")
return True
else:
print(f" ❌ Connessione FALLITA")
print(f"\n 🔧 Suggerimenti:")
print(f" 1. Verifica che il router sia raggiungibile:")
print(f" ping {router['ip_address']}")
print(f" 2. Verifica che il servizio API sia abilitato sul router:")
print(f" /ip service print (deve mostrare 'api' o 'api-ssl' enabled)")
print(f" 3. Verifica firewall non blocchi porta {router['api_port']}")
print(f" 4. Verifica credenziali (username/password)")
return False
except Exception as e:
print(f" ❌ Errore durante test: {e}")
print(f" 📋 Tipo errore: {type(e).__name__}")
import traceback
print(f" 📋 Stack trace:")
traceback.print_exc()
return False
async def test_block_unblock(manager, router, test_ip="1.2.3.4"):
"""Testa blocco/sblocco IP"""
print(f"\n 🧪 Test blocco/sblocco IP {test_ip}...")
# Test blocco
print(f" Blocco IP...")
blocked = await manager.add_address_list(
router_ip=router['ip_address'],
username=router['username'],
password=router['password'],
ip_address=test_ip,
list_name="ids_test",
comment="Test IDS API Fix",
timeout_duration="5m",
port=router['api_port']
)
if blocked:
print(f" ✅ IP bloccato con successo!")
# Aspetta 2 secondi
await asyncio.sleep(2)
# Test sblocco
print(f" Sblocco IP...")
unblocked = await manager.remove_address_list(
router_ip=router['ip_address'],
username=router['username'],
password=router['password'],
ip_address=test_ip,
list_name="ids_test",
port=router['api_port']
)
if unblocked:
print(f" ✅ IP sbloccato con successo!")
return True
else:
print(f" ⚠️ Sblocco fallito (ma blocco OK)")
return True
else:
print(f" ❌ Blocco IP fallito")
return False
async def main():
"""Test principale"""
print("╔════════════════════════════════════════════════════════════╗")
print("║ TEST CONNESSIONE MIKROTIK API REST ║")
print("║ IDS v2.0.0 - Hybrid Detector ║")
print("╚════════════════════════════════════════════════════════════╝")
# Recupera router dal database
print("\n📊 Caricamento router dal database...")
routers = get_routers_from_db()
if not routers:
print("❌ Nessun router trovato nel database!")
print("\n💡 Aggiungi router da: https://ids.alfacom.it/routers")
return
print(f"✅ Trovati {len(routers)} router configurati\n")
# Crea manager
manager = MikroTikManager(timeout=10)
# Test ogni router
results = []
for router in routers:
result = await test_router_connection(manager, router)
results.append({
'name': router['name'],
'ip': router['ip_address'],
'connected': result
})
# Se connesso, testa blocco/sblocco
if result and router['enabled']:
test_ok = await test_block_unblock(manager, router)
results[-1]['block_test'] = test_ok
# Riepilogo
print(f"\n{'='*60}")
print("📊 RIEPILOGO TEST")
print(f"{'='*60}\n")
for r in results:
conn_status = "✅ OK" if r['connected'] else "❌ FAIL"
block_status = ""
if 'block_test' in r:
block_status = " | Blocco: " + ("✅ OK" if r['block_test'] else "❌ FAIL")
print(f" {r['name']:20s} ({r['ip']:15s}): {conn_status}{block_status}")
success_count = sum(1 for r in results if r['connected'])
print(f"\n Totale: {success_count}/{len(results)} router connessi\n")
# Cleanup
await manager.close_all()
# Exit code
sys.exit(0 if success_count == len(results) else 1)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n⚠️ Test interrotto dall'utente")
sys.exit(1)
except Exception as e:
print(f"\n\n❌ Errore critico: {e}")
import traceback
traceback.print_exc()
sys.exit(1)