Fix critical bug in `mikrotik_manager.py` where the API port was not included in the base URL, leading to connection failures. Also, added SSL support detection and a new script for testing MikroTik API connections. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: intermediate_checkpoint Replit-Commit-Event-Id: 22d233cb-3add-46fa-b4e7-ead2de638008 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/jFtLBWL
238 lines
7.7 KiB
Python
238 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script di test connessione MikroTik API
|
|
Verifica connessione a tutti i router configurati nel database
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
from dotenv import load_dotenv
|
|
import psycopg2
|
|
from mikrotik_manager import MikroTikManager
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
def get_routers_from_db():
|
|
"""Recupera router configurati dal database"""
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=os.getenv("PGHOST"),
|
|
port=os.getenv("PGPORT"),
|
|
database=os.getenv("PGDATABASE"),
|
|
user=os.getenv("PGUSER"),
|
|
password=os.getenv("PGPASSWORD")
|
|
)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT
|
|
id, name, ip_address, api_port,
|
|
username, password, enabled
|
|
FROM routers
|
|
ORDER BY name
|
|
""")
|
|
|
|
routers = []
|
|
for row in cursor.fetchall():
|
|
routers.append({
|
|
'id': row[0],
|
|
'name': row[1],
|
|
'ip_address': row[2],
|
|
'api_port': row[3],
|
|
'username': row[4],
|
|
'password': row[5],
|
|
'enabled': row[6]
|
|
})
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
return routers
|
|
|
|
except Exception as e:
|
|
print(f"❌ Errore connessione database: {e}")
|
|
return []
|
|
|
|
|
|
async def test_router_connection(manager, router):
|
|
"""Testa connessione a un singolo router"""
|
|
print(f"\n{'='*60}")
|
|
print(f"🔍 Test Router: {router['name']}")
|
|
print(f"{'='*60}")
|
|
print(f" IP: {router['ip_address']}")
|
|
print(f" Porta: {router['api_port']}")
|
|
print(f" Username: {router['username']}")
|
|
print(f" Enabled: {'✅ Sì' if router['enabled'] else '❌ No'}")
|
|
|
|
if not router['enabled']:
|
|
print(f" ⚠️ Router disabilitato - skip test")
|
|
return False
|
|
|
|
# Test connessione
|
|
print(f"\n 📡 Test connessione...")
|
|
try:
|
|
connected = await manager.test_connection(
|
|
router_ip=router['ip_address'],
|
|
username=router['username'],
|
|
password=router['password'],
|
|
port=router['api_port']
|
|
)
|
|
|
|
if connected:
|
|
print(f" ✅ Connessione OK!")
|
|
|
|
# Test lettura address-list
|
|
print(f" 📋 Lettura address-list...")
|
|
entries = await manager.get_address_list(
|
|
router_ip=router['ip_address'],
|
|
username=router['username'],
|
|
password=router['password'],
|
|
list_name="ddos_blocked",
|
|
port=router['api_port']
|
|
)
|
|
print(f" ✅ Trovati {len(entries)} IP in lista 'ddos_blocked'")
|
|
|
|
# Mostra primi 5 IP
|
|
if entries:
|
|
print(f"\n 📌 Primi 5 IP bloccati:")
|
|
for entry in entries[:5]:
|
|
ip = entry.get('address', 'N/A')
|
|
comment = entry.get('comment', 'N/A')
|
|
timeout = entry.get('timeout', 'N/A')
|
|
print(f" - {ip} | {comment} | timeout: {timeout}")
|
|
|
|
return True
|
|
else:
|
|
print(f" ❌ Connessione FALLITA")
|
|
print(f"\n 🔧 Suggerimenti:")
|
|
print(f" 1. Verifica che il router sia raggiungibile:")
|
|
print(f" ping {router['ip_address']}")
|
|
print(f" 2. Verifica che il servizio API sia abilitato sul router:")
|
|
print(f" /ip service print (deve mostrare 'api' o 'api-ssl' enabled)")
|
|
print(f" 3. Verifica firewall non blocchi porta {router['api_port']}")
|
|
print(f" 4. Verifica credenziali (username/password)")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f" ❌ Errore durante test: {e}")
|
|
print(f" 📋 Dettagli errore: {type(e).__name__}")
|
|
return False
|
|
|
|
|
|
async def test_block_unblock(manager, router, test_ip="1.2.3.4"):
|
|
"""Testa blocco/sblocco IP"""
|
|
print(f"\n 🧪 Test blocco/sblocco IP {test_ip}...")
|
|
|
|
# Test blocco
|
|
print(f" Blocco IP...")
|
|
blocked = await manager.add_address_list(
|
|
router_ip=router['ip_address'],
|
|
username=router['username'],
|
|
password=router['password'],
|
|
ip_address=test_ip,
|
|
list_name="ids_test",
|
|
comment="Test IDS API Fix",
|
|
timeout_duration="5m",
|
|
port=router['api_port']
|
|
)
|
|
|
|
if blocked:
|
|
print(f" ✅ IP bloccato con successo!")
|
|
|
|
# Aspetta 2 secondi
|
|
await asyncio.sleep(2)
|
|
|
|
# Test sblocco
|
|
print(f" Sblocco IP...")
|
|
unblocked = await manager.remove_address_list(
|
|
router_ip=router['ip_address'],
|
|
username=router['username'],
|
|
password=router['password'],
|
|
ip_address=test_ip,
|
|
list_name="ids_test",
|
|
port=router['api_port']
|
|
)
|
|
|
|
if unblocked:
|
|
print(f" ✅ IP sbloccato con successo!")
|
|
return True
|
|
else:
|
|
print(f" ⚠️ Sblocco fallito (ma blocco OK)")
|
|
return True
|
|
else:
|
|
print(f" ❌ Blocco IP fallito")
|
|
return False
|
|
|
|
|
|
async def main():
|
|
"""Test principale"""
|
|
print("╔════════════════════════════════════════════════════════════╗")
|
|
print("║ TEST CONNESSIONE MIKROTIK API REST ║")
|
|
print("║ IDS v2.0.0 - Hybrid Detector ║")
|
|
print("╚════════════════════════════════════════════════════════════╝")
|
|
|
|
# Recupera router dal database
|
|
print("\n📊 Caricamento router dal database...")
|
|
routers = get_routers_from_db()
|
|
|
|
if not routers:
|
|
print("❌ Nessun router trovato nel database!")
|
|
print("\n💡 Aggiungi router da: https://ids.alfacom.it/routers")
|
|
return
|
|
|
|
print(f"✅ Trovati {len(routers)} router configurati\n")
|
|
|
|
# Crea manager
|
|
manager = MikroTikManager(timeout=10)
|
|
|
|
# Test ogni router
|
|
results = []
|
|
for router in routers:
|
|
result = await test_router_connection(manager, router)
|
|
results.append({
|
|
'name': router['name'],
|
|
'ip': router['ip_address'],
|
|
'connected': result
|
|
})
|
|
|
|
# Se connesso, testa blocco/sblocco
|
|
if result and router['enabled']:
|
|
test_ok = await test_block_unblock(manager, router)
|
|
results[-1]['block_test'] = test_ok
|
|
|
|
# Riepilogo
|
|
print(f"\n{'='*60}")
|
|
print("📊 RIEPILOGO TEST")
|
|
print(f"{'='*60}\n")
|
|
|
|
for r in results:
|
|
conn_status = "✅ OK" if r['connected'] else "❌ FAIL"
|
|
block_status = ""
|
|
if 'block_test' in r:
|
|
block_status = " | Blocco: " + ("✅ OK" if r['block_test'] else "❌ FAIL")
|
|
print(f" {r['name']:20s} ({r['ip']:15s}): {conn_status}{block_status}")
|
|
|
|
success_count = sum(1 for r in results if r['connected'])
|
|
print(f"\n Totale: {success_count}/{len(results)} router connessi\n")
|
|
|
|
# Cleanup
|
|
await manager.close_all()
|
|
|
|
# Exit code
|
|
sys.exit(0 if success_count == len(results) else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Test interrotto dall'utente")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n\n❌ Errore critico: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|