#!/usr/bin/env python3 import sys import os import logging import pandas as pd from sqlalchemy import create_engine, text import traceback # Configurazione del logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) # Configurazione del database DB_USER = os.environ.get('DB_USER', 'root') DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-') DB_HOST = os.environ.get('DB_HOST', 'localhost') DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK') CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}' def test_connection(): """ Testa la connessione al database """ try: logging.info("Test di connessione al database...") engine = create_engine(CONN_STRING) with engine.connect() as conn: result = conn.execute(text("SELECT 1")).fetchone() if result and result[0] == 1: logging.info("โœ… Connessione al database riuscita!") return engine else: logging.error("โŒ Test di connessione fallito!") return None except Exception as e: logging.error(f"โŒ Errore di connessione: {e}") logging.error(traceback.format_exc()) return None def check_tables(engine): """ Controlla le tabelle disponibili nel database """ try: with engine.connect() as conn: tables = conn.execute(text("SHOW TABLES")).fetchall() table_names = [t[0] for t in tables] logging.info(f"๐Ÿ“Š Tabelle disponibili: {table_names}") if 'known_attackers' in table_names: logging.info("โœ… Tabella 'known_attackers' trovata") # Controlla struttura columns = conn.execute(text("DESCRIBE known_attackers")).fetchall() column_names = [c[0] for c in columns] logging.info(f"๐Ÿ“‹ Colonne in known_attackers: {column_names}") # Controlla indici indexes = conn.execute(text("SHOW INDEXES FROM known_attackers")).fetchall() logging.info(f"๐Ÿ”‘ Numero di indici: {len(indexes)}") # Controlla contenuto count = conn.execute(text("SELECT COUNT(*) FROM known_attackers")).fetchone()[0] logging.info(f"๐Ÿ“ Numero di record: {count}") if count > 0: sample = conn.execute(text("SELECT * FROM known_attackers LIMIT 5")).fetchall() for row in sample: logging.info(f"๐Ÿ“Œ Esempio record: {row}") else: logging.warning("โš ๏ธ La tabella 'known_attackers' รจ vuota") else: logging.error("โŒ Tabella 'known_attackers' NON trovata") if 'ip_list' in table_names: logging.info("โœ… Tabella 'ip_list' trovata") # Controlla struttura columns = conn.execute(text("DESCRIBE ip_list")).fetchall() column_names = [c[0] for c in columns] logging.info(f"๐Ÿ“‹ Colonne in ip_list: {column_names}") # Controlla contenuto count = conn.execute(text("SELECT COUNT(*) FROM ip_list")).fetchone()[0] logging.info(f"๐Ÿ“ Numero di record: {count}") else: logging.error("โŒ Tabella 'ip_list' NON trovata") except Exception as e: logging.error(f"โŒ Errore nel controllo delle tabelle: {e}") logging.error(traceback.format_exc()) def test_insert(engine, ip_address="192.168.100.100"): """ Testa l'inserimento di un record nella tabella known_attackers """ try: conn = engine.connect() trans = conn.begin() try: # Prima controlla se l'IP esiste giร  check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip") result = conn.execute(check_query, {"ip": ip_address}).fetchone() if result: logging.info(f"โœ… L'IP {ip_address} esiste giร  nella tabella") trans.rollback() # Non modifichiamo nulla return True # Inserisci il record logging.info(f"Inserimento IP di test {ip_address}...") insert_query = text(""" INSERT INTO known_attackers (ip_address, risk_level, ports_used, attack_patterns, is_blocked) VALUES (:ip, 'TEST', '80', 'Test diagnostico', 0) """) result = conn.execute(insert_query, {"ip": ip_address}) logging.info(f"๐Ÿ“ Righe inserite: {result.rowcount}") # Verifica l'inserimento check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip") result = conn.execute(check_query, {"ip": ip_address}).fetchone() if result: logging.info(f"โœ… Test inserimento riuscito: {result}") trans.commit() return True else: logging.error("โŒ Test inserimento fallito: record non trovato dopo l'inserimento") trans.rollback() return False except Exception as e: logging.error(f"โŒ Errore durante il test di inserimento: {e}") logging.error(traceback.format_exc()) trans.rollback() return False finally: conn.close() except Exception as e: logging.error(f"โŒ Errore nel creare la connessione: {e}") logging.error(traceback.format_exc()) return False def main(): """ Funzione principale """ logging.info("๐Ÿ”Ž Avvio diagnosi database") # Test di connessione engine = test_connection() if not engine: logging.error("โŒ Impossibile continuare senza connessione") sys.exit(1) # Controllo delle tabelle check_tables(engine) # Test di inserimento test_insert(engine) logging.info("โœ… Diagnosi completata") if __name__ == "__main__": main()