Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 1c71ce6e-1a3e-4f53-bb5d-77cdd22b8ea3
136 lines
5.2 KiB
Python
136 lines
5.2 KiB
Python
import os
|
|
import paramiko
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
def load_ip_block(file_path):
|
|
"""
|
|
Carica gli IP bloccati dal file di blocco.
|
|
Formato per ogni riga: IP:count:last_seen_timestamp
|
|
"""
|
|
ip_set = set()
|
|
if not os.path.exists(file_path):
|
|
logging.error(f"Il file {file_path} non esiste.")
|
|
return ip_set
|
|
with open(file_path, 'r') as f:
|
|
for line in f:
|
|
parts = line.strip().split(':', 2) # Split solo sui primi due due punti
|
|
if len(parts) == 3:
|
|
ip, count, last_seen_str = parts
|
|
ip_set.add(ip)
|
|
else:
|
|
logging.warning(f"Linea non valida nel file di blocco: {line.strip()}")
|
|
return ip_set
|
|
|
|
def get_current_ddos_ia(ssh, list_name='ddos_ia'):
|
|
"""
|
|
Recupera gli IP attualmente presenti nella lista ddos_ia del router.
|
|
"""
|
|
command = f"/ip firewall address-list print where list={list_name}"
|
|
stdin, stdout, stderr = ssh.exec_command(command)
|
|
output = stdout.read().decode('utf-8')
|
|
error = stderr.read().decode('utf-8')
|
|
if error:
|
|
logging.error(f"Errore durante l'esecuzione del comando: {error}")
|
|
return set()
|
|
|
|
current_ips = set()
|
|
for line in output.splitlines():
|
|
# MikroTik format: *1 35.190.18.168 ddos_ia 480:00:00
|
|
parts = line.strip().split()
|
|
if len(parts) >= 3:
|
|
ip = parts[1]
|
|
current_ips.add(ip)
|
|
return current_ips
|
|
|
|
def add_ip_to_ddos_ia(ssh, ip, timeout='480:00:00', comment='DDoS Attacker', list_name='ddos_ia'):
|
|
"""
|
|
Aggiunge un IP alla lista ddos_ia del router.
|
|
"""
|
|
command = f"/ip firewall address-list add list={list_name} address={ip} timeout={timeout} comment=\"{comment}\""
|
|
logging.info(f"Aggiungo {ip} alla lista {list_name}...")
|
|
stdin, stdout, stderr = ssh.exec_command(command, timeout=10)
|
|
output = stdout.read().decode('utf-8')
|
|
error = stderr.read().decode('utf-8')
|
|
if output:
|
|
logging.debug(f"Output: {output}")
|
|
if error:
|
|
logging.error(f"Errore durante l'aggiunta di {ip}: {error}")
|
|
|
|
def remove_ip_from_ddos_ia(ssh, ip, list_name='ddos_ia'):
|
|
"""
|
|
Rimuove un IP dalla lista ddos_ia del router.
|
|
"""
|
|
command = f"/ip firewall address-list remove [find address={ip} list={list_name}]"
|
|
logging.info(f"Rimuovo {ip} dalla lista {list_name}...")
|
|
stdin, stdout, stderr = ssh.exec_command(command, timeout=10)
|
|
output = stdout.read().decode('utf-8')
|
|
error = stderr.read().decode('utf-8')
|
|
if output:
|
|
logging.debug(f"Output: {output}")
|
|
if error:
|
|
logging.error(f"Errore durante la rimozione di {ip}: {error}")
|
|
|
|
def run_ssh_commands(router_ip, username, password, port, ip_block_file, list_name='ddos_ia'):
|
|
"""
|
|
Sincronizza gli IP bloccati con la lista ddos_ia sul router MikroTik.
|
|
"""
|
|
try:
|
|
# Crea una connessione SSH
|
|
logging.info(f"Connettendo a {router_ip} sulla porta {port}...")
|
|
ssh = paramiko.SSHClient()
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
ssh.connect(router_ip, username=username, password=password, port=int(port))
|
|
logging.info("Connessione SSH riuscita.")
|
|
|
|
# Carica gli IP dal file di blocco
|
|
blocked_ips = load_ip_block(ip_block_file)
|
|
logging.info(f"IP bloccati da aggiungere: {len(blocked_ips)}")
|
|
|
|
# Ottieni gli IP attuali nella lista ddos_ia
|
|
current_ddos_ips = get_current_ddos_ia(ssh, list_name)
|
|
logging.info(f"IP attualmente nella lista {list_name}: {len(current_ddos_ips)}")
|
|
|
|
# Determina quali IP aggiungere e quali rimuovere
|
|
ips_to_add = blocked_ips - current_ddos_ips
|
|
ips_to_remove = current_ddos_ips - blocked_ips
|
|
logging.info(f"IP da aggiungere: {len(ips_to_add)}")
|
|
logging.info(f"IP da rimuovere: {len(ips_to_remove)}")
|
|
|
|
# Aggiungi gli IP mancanti
|
|
for ip in ips_to_add:
|
|
add_ip_to_ddos_ia(ssh, ip, list_name=list_name)
|
|
|
|
# Rimuovi gli IP non più presenti
|
|
for ip in ips_to_remove:
|
|
remove_ip_from_ddos_ia(ssh, ip, list_name=list_name)
|
|
|
|
# Chiudi la connessione SSH
|
|
ssh.close()
|
|
logging.info("Connessione SSH chiusa.")
|
|
logging.info("Aggiornamento della lista ddos_ia completato.")
|
|
|
|
except paramiko.SSHException as ssh_error:
|
|
logging.error(f"Errore SSH: {ssh_error}")
|
|
except FileNotFoundError as fnf_error:
|
|
logging.error(f"Errore file non trovato: {fnf_error}")
|
|
except Exception as e:
|
|
logging.error(f"Errore durante l'esecuzione del comando SSH: {e}")
|
|
|
|
def main():
|
|
# Credenziali SSH del router MikroTik
|
|
router_ip = '185.203.27.254' # Sostituisci con l'IP del tuo router MikroTik
|
|
port = 22 # Porta SSH del router
|
|
username = 'admin' # Username SSH
|
|
password = 'NP2T3-ap32' # Password SSH
|
|
|
|
# File di blocco degli IP
|
|
ip_block_file = 'ip_block.txt'
|
|
list_name = 'ddos_ia' # Nome della lista sul router MikroTik
|
|
|
|
# Esecuzione della sincronizzazione
|
|
run_ssh_commands(router_ip, username, password, port, ip_block_file, list_name)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|