ids.alfacom.it/extracted_idf/diagnose_db.py
marco370 0bfe3258b5 Saved progress at the end of the loop
Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
2025-11-11 09:15:10 +00:00

174 lines
6.5 KiB
Python

#!/usr/bin/env python3
import sys
import os
import logging
import pandas as pd
from sqlalchemy import create_engine, text
import traceback
# Configurazione del logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
# Configurazione del database
DB_USER = os.environ.get('DB_USER', 'root')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'Hdgtejskjjc0-')
DB_HOST = os.environ.get('DB_HOST', 'localhost')
DB_NAME = os.environ.get('DB_DATABASE', 'LOG_MIKROTIK')
CONN_STRING = f'mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}'
def test_connection():
"""
Testa la connessione al database
"""
try:
logging.info("Test di connessione al database...")
engine = create_engine(CONN_STRING)
with engine.connect() as conn:
result = conn.execute(text("SELECT 1")).fetchone()
if result and result[0] == 1:
logging.info("✅ Connessione al database riuscita!")
return engine
else:
logging.error("❌ Test di connessione fallito!")
return None
except Exception as e:
logging.error(f"❌ Errore di connessione: {e}")
logging.error(traceback.format_exc())
return None
def check_tables(engine):
"""
Controlla le tabelle disponibili nel database
"""
try:
with engine.connect() as conn:
tables = conn.execute(text("SHOW TABLES")).fetchall()
table_names = [t[0] for t in tables]
logging.info(f"📊 Tabelle disponibili: {table_names}")
if 'known_attackers' in table_names:
logging.info("✅ Tabella 'known_attackers' trovata")
# Controlla struttura
columns = conn.execute(text("DESCRIBE known_attackers")).fetchall()
column_names = [c[0] for c in columns]
logging.info(f"📋 Colonne in known_attackers: {column_names}")
# Controlla indici
indexes = conn.execute(text("SHOW INDEXES FROM known_attackers")).fetchall()
logging.info(f"🔑 Numero di indici: {len(indexes)}")
# Controlla contenuto
count = conn.execute(text("SELECT COUNT(*) FROM known_attackers")).fetchone()[0]
logging.info(f"📝 Numero di record: {count}")
if count > 0:
sample = conn.execute(text("SELECT * FROM known_attackers LIMIT 5")).fetchall()
for row in sample:
logging.info(f"📌 Esempio record: {row}")
else:
logging.warning("⚠️ La tabella 'known_attackers' è vuota")
else:
logging.error("❌ Tabella 'known_attackers' NON trovata")
if 'ip_list' in table_names:
logging.info("✅ Tabella 'ip_list' trovata")
# Controlla struttura
columns = conn.execute(text("DESCRIBE ip_list")).fetchall()
column_names = [c[0] for c in columns]
logging.info(f"📋 Colonne in ip_list: {column_names}")
# Controlla contenuto
count = conn.execute(text("SELECT COUNT(*) FROM ip_list")).fetchone()[0]
logging.info(f"📝 Numero di record: {count}")
else:
logging.error("❌ Tabella 'ip_list' NON trovata")
except Exception as e:
logging.error(f"❌ Errore nel controllo delle tabelle: {e}")
logging.error(traceback.format_exc())
def test_insert(engine, ip_address="192.168.100.100"):
"""
Testa l'inserimento di un record nella tabella known_attackers
"""
try:
conn = engine.connect()
trans = conn.begin()
try:
# Prima controlla se l'IP esiste già
check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
result = conn.execute(check_query, {"ip": ip_address}).fetchone()
if result:
logging.info(f"✅ L'IP {ip_address} esiste già nella tabella")
trans.rollback() # Non modifichiamo nulla
return True
# Inserisci il record
logging.info(f"Inserimento IP di test {ip_address}...")
insert_query = text("""
INSERT INTO known_attackers
(ip_address, risk_level, ports_used, attack_patterns, is_blocked)
VALUES (:ip, 'TEST', '80', 'Test diagnostico', 0)
""")
result = conn.execute(insert_query, {"ip": ip_address})
logging.info(f"📝 Righe inserite: {result.rowcount}")
# Verifica l'inserimento
check_query = text("SELECT * FROM known_attackers WHERE ip_address = :ip")
result = conn.execute(check_query, {"ip": ip_address}).fetchone()
if result:
logging.info(f"✅ Test inserimento riuscito: {result}")
trans.commit()
return True
else:
logging.error("❌ Test inserimento fallito: record non trovato dopo l'inserimento")
trans.rollback()
return False
except Exception as e:
logging.error(f"❌ Errore durante il test di inserimento: {e}")
logging.error(traceback.format_exc())
trans.rollback()
return False
finally:
conn.close()
except Exception as e:
logging.error(f"❌ Errore nel creare la connessione: {e}")
logging.error(traceback.format_exc())
return False
def main():
"""
Funzione principale
"""
logging.info("🔎 Avvio diagnosi database")
# Test di connessione
engine = test_connection()
if not engine:
logging.error("❌ Impossibile continuare senza connessione")
sys.exit(1)
# Controllo delle tabelle
check_tables(engine)
# Test di inserimento
test_insert(engine)
logging.info("✅ Diagnosi completata")
if __name__ == "__main__":
main()