Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
174 lines
6.5 KiB
Python
174 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
import sys
|
|
import os
|
|
import logging
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine, text
|
|
import traceback
|
|
|
|
# Configurazione del logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout)
|
|
]
|
|
)
|
|
|
|
# Configurazione del database
|
|
DB_USER = os.environ.get('DB_USER', 'root')
|
|
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
|
|
DB_HOST = os.environ.get('DB_HOST', 'localhost')
|
|
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
|
|
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
|
|
|
|
def test_connection():
|
|
"""
|
|
Testa la connessione al database
|
|
"""
|
|
try:
|
|
logging.info("Test di connessione al database...")
|
|
engine = create_engine(CONN_STRING)
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("SELECT 1")).fetchone()
|
|
if result and result[0] == 1:
|
|
logging.info("✅ Connessione al database riuscita!")
|
|
return engine
|
|
else:
|
|
logging.error("❌ Test di connessione fallito!")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f"❌ Errore di connessione: {e}")
|
|
logging.error(traceback.format_exc())
|
|
return None
|
|
|
|
def check_tables(engine):
|
|
"""
|
|
Controlla le tabelle disponibili nel database
|
|
"""
|
|
try:
|
|
with engine.connect() as conn:
|
|
tables = conn.execute(text("SHOW TABLES")).fetchall()
|
|
table_names = [t[0] for t in tables]
|
|
logging.info(f"📊 Tabelle disponibili: {table_names}")
|
|
|
|
if 'known_attackers' in table_names:
|
|
logging.info("✅ Tabella 'known_attackers' trovata")
|
|
|
|
# Controlla struttura
|
|
columns = conn.execute(text("DESCRIBE known_attackers")).fetchall()
|
|
column_names = [c[0] for c in columns]
|
|
logging.info(f"📋 Colonne in known_attackers: {column_names}")
|
|
|
|
# Controlla indici
|
|
indexes = conn.execute(text("SHOW INDEXES FROM known_attackers")).fetchall()
|
|
logging.info(f"🔑 Numero di indici: {len(indexes)}")
|
|
|
|
# Controlla contenuto
|
|
count = conn.execute(text("SELECT COUNT(*) FROM known_attackers")).fetchone()[0]
|
|
logging.info(f"📝 Numero di record: {count}")
|
|
|
|
if count > 0:
|
|
sample = conn.execute(text("SELECT * FROM known_attackers LIMIT 5")).fetchall()
|
|
for row in sample:
|
|
logging.info(f"📌 Esempio record: {row}")
|
|
else:
|
|
logging.warning("⚠️ La tabella 'known_attackers' è vuota")
|
|
else:
|
|
logging.error("❌ Tabella 'known_attackers' NON trovata")
|
|
|
|
if 'ip_list' in table_names:
|
|
logging.info("✅ Tabella 'ip_list' trovata")
|
|
|
|
# Controlla struttura
|
|
columns = conn.execute(text("DESCRIBE ip_list")).fetchall()
|
|
column_names = [c[0] for c in columns]
|
|
logging.info(f"📋 Colonne in ip_list: {column_names}")
|
|
|
|
# Controlla contenuto
|
|
count = conn.execute(text("SELECT COUNT(*) FROM ip_list")).fetchone()[0]
|
|
logging.info(f"📝 Numero di record: {count}")
|
|
else:
|
|
logging.error("❌ Tabella 'ip_list' NON trovata")
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ Errore nel controllo delle tabelle: {e}")
|
|
logging.error(traceback.format_exc())
|
|
|
|
def test_insert(engine, ip_address="192.168.100.100"):
|
|
"""
|
|
Testa l'inserimento di un record nella tabella known_attackers
|
|
"""
|
|
try:
|
|
conn = engine.connect()
|
|
trans = conn.begin()
|
|
|
|
try:
|
|
# Prima controlla se l'IP esiste già
|
|
check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
|
|
result = conn.execute(check_query, {"ip": ip_address}).fetchone()
|
|
|
|
if result:
|
|
logging.info(f"✅ L'IP {ip_address} esiste già nella tabella")
|
|
trans.rollback() # Non modifichiamo nulla
|
|
return True
|
|
|
|
# Inserisci il record
|
|
logging.info(f"Inserimento IP di test {ip_address}...")
|
|
insert_query = text("""
|
|
INSERT INTO known_attackers
|
|
(ip_address, risk_level, ports_used, attack_patterns, is_blocked)
|
|
VALUES (:ip, 'TEST', '80', 'Test diagnostico', 0)
|
|
""")
|
|
|
|
result = conn.execute(insert_query, {"ip": ip_address})
|
|
logging.info(f"📝 Righe inserite: {result.rowcount}")
|
|
|
|
# Verifica l'inserimento
|
|
check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
|
|
result = conn.execute(check_query, {"ip": ip_address}).fetchone()
|
|
|
|
if result:
|
|
logging.info(f"✅ Test inserimento riuscito: {result}")
|
|
trans.commit()
|
|
return True
|
|
else:
|
|
logging.error("❌ Test inserimento fallito: record non trovato dopo l'inserimento")
|
|
trans.rollback()
|
|
return False
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ Errore durante il test di inserimento: {e}")
|
|
logging.error(traceback.format_exc())
|
|
trans.rollback()
|
|
return False
|
|
finally:
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
logging.error(f"❌ Errore nel creare la connessione: {e}")
|
|
logging.error(traceback.format_exc())
|
|
return False
|
|
|
|
def main():
|
|
"""
|
|
Funzione principale
|
|
"""
|
|
logging.info("🔎 Avvio diagnosi database")
|
|
|
|
# Test di connessione
|
|
engine = test_connection()
|
|
if not engine:
|
|
logging.error("❌ Impossibile continuare senza connessione")
|
|
sys.exit(1)
|
|
|
|
# Controllo delle tabelle
|
|
check_tables(engine)
|
|
|
|
# Test di inserimento
|
|
test_insert(engine)
|
|
|
|
logging.info("✅ Diagnosi completata")
|
|
|
|
if __name__ == "__main__":
|
|
main() |