ids.alfacom.it/python_ml/list_fetcher/parsers.py
marco370 77874c83bf Add functionality to manage and sync public blacklists and whitelists
Integrates external public IP lists for enhanced threat detection and whitelisting capabilities, including API endpoints, database schema changes, and a new fetching service.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: b1366669-0ccd-493e-9e06-4e4168e2fa3b
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/QKzTQQy
2025-11-26 09:21:43 +00:00

281 lines
8.0 KiB
Python

import re
import json
from typing import List, Dict, Set, Optional
from datetime import datetime
import ipaddress
class ListParser:
"""Base parser for public IP lists"""
@staticmethod
def validate_ip(ip_str: str) -> bool:
"""Validate IP address or CIDR range"""
try:
ipaddress.ip_network(ip_str, strict=False)
return True
except ValueError:
return False
@staticmethod
def normalize_cidr(ip_str: str) -> tuple[str, Optional[str]]:
"""
Normalize IP/CIDR to (ip_address, cidr_range)
Example: '1.2.3.0/24' -> ('1.2.3.0', '1.2.3.0/24')
'1.2.3.4' -> ('1.2.3.4', None)
"""
try:
network = ipaddress.ip_network(ip_str, strict=False)
if '/' in ip_str:
return (str(network.network_address), str(network))
else:
return (ip_str, None)
except ValueError:
return (ip_str, None)
class SpamhausParser(ListParser):
"""Parser for Spamhaus DROP list"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse Spamhaus DROP format:
; Comment lines start with semicolon
1.2.3.0/24 ; SBL12345
"""
ips = set()
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith(';') or line.startswith('#'):
continue
# Extract IP/CIDR before comment
parts = line.split(';')
if parts:
ip_part = parts[0].strip()
if ip_part and ListParser.validate_ip(ip_part):
ips.add(ListParser.normalize_cidr(ip_part))
return ips
class TalosParser(ListParser):
"""Parser for Talos Intelligence blacklist"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse Talos format (plain IP list):
1.2.3.4
5.6.7.0/24
"""
ips = set()
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#') or line.startswith('//'):
continue
# Validate and add
if ListParser.validate_ip(line):
ips.add(ListParser.normalize_cidr(line))
return ips
class AWSParser(ListParser):
"""Parser for AWS IP ranges JSON"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse AWS JSON format:
{
"prefixes": [
{"ip_prefix": "1.2.3.0/24", "region": "us-east-1", "service": "EC2"}
]
}
"""
ips = set()
try:
data = json.loads(content)
# IPv4 prefixes
for prefix in data.get('prefixes', []):
ip_prefix = prefix.get('ip_prefix')
if ip_prefix and ListParser.validate_ip(ip_prefix):
ips.add(ListParser.normalize_cidr(ip_prefix))
# IPv6 prefixes (optional)
for prefix in data.get('ipv6_prefixes', []):
ipv6_prefix = prefix.get('ipv6_prefix')
if ipv6_prefix and ListParser.validate_ip(ipv6_prefix):
ips.add(ListParser.normalize_cidr(ipv6_prefix))
except json.JSONDecodeError:
pass
return ips
class GCPParser(ListParser):
"""Parser for Google Cloud IP ranges JSON"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse GCP JSON format:
{
"prefixes": [
{"ipv4Prefix": "1.2.3.0/24"},
{"ipv6Prefix": "2001:db8::/32"}
]
}
"""
ips = set()
try:
data = json.loads(content)
for prefix in data.get('prefixes', []):
# IPv4
ipv4 = prefix.get('ipv4Prefix')
if ipv4 and ListParser.validate_ip(ipv4):
ips.add(ListParser.normalize_cidr(ipv4))
# IPv6
ipv6 = prefix.get('ipv6Prefix')
if ipv6 and ListParser.validate_ip(ipv6):
ips.add(ListParser.normalize_cidr(ipv6))
except json.JSONDecodeError:
pass
return ips
class CloudflareParser(ListParser):
"""Parser for Cloudflare IP list"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse Cloudflare format (plain CIDR list):
1.2.3.0/24
5.6.7.0/24
"""
ips = set()
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
if ListParser.validate_ip(line):
ips.add(ListParser.normalize_cidr(line))
return ips
class IANAParser(ListParser):
"""Parser for IANA Root Servers"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse IANA root servers (extract IPs from HTML/text)
Look for IPv4 addresses in format XXX.XXX.XXX.XXX
"""
ips = set()
# Regex for IPv4 addresses
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
matches = re.findall(ipv4_pattern, content)
for ip in matches:
if ListParser.validate_ip(ip):
ips.add(ListParser.normalize_cidr(ip))
return ips
class NTPPoolParser(ListParser):
"""Parser for NTP Pool servers"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse NTP pool format (plain IP list or JSON)
Tries multiple formats
"""
ips = set()
# Try JSON first
try:
data = json.loads(content)
if isinstance(data, list):
for item in data:
if isinstance(item, str) and ListParser.validate_ip(item):
ips.add(ListParser.normalize_cidr(item))
elif isinstance(item, dict):
ip = item.get('ip') or item.get('address')
if ip and ListParser.validate_ip(ip):
ips.add(ListParser.normalize_cidr(ip))
except json.JSONDecodeError:
# Fallback to plain text parsing
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
if line and ListParser.validate_ip(line):
ips.add(ListParser.normalize_cidr(line))
return ips
# Parser registry
PARSERS: Dict[str, type[ListParser]] = {
'spamhaus': SpamhausParser,
'talos': TalosParser,
'aws': AWSParser,
'gcp': GCPParser,
'cloudflare': CloudflareParser,
'iana': IANAParser,
'ntp': NTPPoolParser,
}
def get_parser(list_name: str) -> Optional[type[ListParser]]:
"""Get parser by list name (case-insensitive match)"""
list_name_lower = list_name.lower()
for key, parser in PARSERS.items():
if key in list_name_lower:
return parser
# Default fallback: try plain text parser
return TalosParser
def parse_list(list_name: str, content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse list content using appropriate parser
Returns set of (ip_address, cidr_range) tuples
"""
parser_class = get_parser(list_name)
if parser_class:
parser = parser_class()
return parser.parse(content)
return set()