import re import json from typing import List, Dict, Set, Optional from datetime import datetime import ipaddress class ListParser: """Base parser for public IP lists""" @staticmethod def validate_ip(ip_str: str) -> bool: """Validate IP address or CIDR range""" try: ipaddress.ip_network(ip_str, strict=False) return True except ValueError: return False @staticmethod def normalize_cidr(ip_str: str) -> tuple[str, Optional[str]]: """ Normalize IP/CIDR to (ip_address, cidr_range) Example: '1.2.3.0/24' -> ('1.2.3.0', '1.2.3.0/24') '1.2.3.4' -> ('1.2.3.4', None) """ try: network = ipaddress.ip_network(ip_str, strict=False) if '/' in ip_str: return (str(network.network_address), str(network)) else: return (ip_str, None) except ValueError: return (ip_str, None) class SpamhausParser(ListParser): """Parser for Spamhaus DROP list""" @staticmethod def parse(content: str) -> Set[tuple[str, Optional[str]]]: """ Parse Spamhaus DROP format: ; Comment lines start with semicolon 1.2.3.0/24 ; SBL12345 """ ips = set() lines = content.strip().split('\n') for line in lines: line = line.strip() # Skip comments and empty lines if not line or line.startswith(';') or line.startswith('#'): continue # Extract IP/CIDR before comment parts = line.split(';') if parts: ip_part = parts[0].strip() if ip_part and ListParser.validate_ip(ip_part): ips.add(ListParser.normalize_cidr(ip_part)) return ips class TalosParser(ListParser): """Parser for Talos Intelligence blacklist""" @staticmethod def parse(content: str) -> Set[tuple[str, Optional[str]]]: """ Parse Talos format (plain IP list): 1.2.3.4 5.6.7.0/24 """ ips = set() lines = content.strip().split('\n') for line in lines: line = line.strip() # Skip comments and empty lines if not line or line.startswith('#') or line.startswith('//'): continue # Validate and add if ListParser.validate_ip(line): ips.add(ListParser.normalize_cidr(line)) return ips class AWSParser(ListParser): """Parser for AWS IP ranges JSON""" @staticmethod def parse(content: str) -> Set[tuple[str, Optional[str]]]: """ Parse AWS JSON format: { "prefixes": [ {"ip_prefix": "1.2.3.0/24", "region": "us-east-1", "service": "EC2"} ] } """ ips = set() try: data = json.loads(content) # IPv4 prefixes for prefix in data.get('prefixes', []): ip_prefix = prefix.get('ip_prefix') if ip_prefix and ListParser.validate_ip(ip_prefix): ips.add(ListParser.normalize_cidr(ip_prefix)) # IPv6 prefixes (optional) for prefix in data.get('ipv6_prefixes', []): ipv6_prefix = prefix.get('ipv6_prefix') if ipv6_prefix and ListParser.validate_ip(ipv6_prefix): ips.add(ListParser.normalize_cidr(ipv6_prefix)) except json.JSONDecodeError: pass return ips class GCPParser(ListParser): """Parser for Google Cloud IP ranges JSON""" @staticmethod def parse(content: str) -> Set[tuple[str, Optional[str]]]: """ Parse GCP JSON format: { "prefixes": [ {"ipv4Prefix": "1.2.3.0/24"}, {"ipv6Prefix": "2001:db8::/32"} ] } """ ips = set() try: data = json.loads(content) for prefix in data.get('prefixes', []): # IPv4 ipv4 = prefix.get('ipv4Prefix') if ipv4 and ListParser.validate_ip(ipv4): ips.add(ListParser.normalize_cidr(ipv4)) # IPv6 ipv6 = prefix.get('ipv6Prefix') if ipv6 and ListParser.validate_ip(ipv6): ips.add(ListParser.normalize_cidr(ipv6)) except json.JSONDecodeError: pass return ips class CloudflareParser(ListParser): """Parser for Cloudflare IP list""" @staticmethod def parse(content: str) -> Set[tuple[str, Optional[str]]]: """ Parse Cloudflare format (plain CIDR list): 1.2.3.0/24 5.6.7.0/24 """ ips = set() lines = content.strip().split('\n') for line in lines: line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue if ListParser.validate_ip(line): ips.add(ListParser.normalize_cidr(line)) return ips class IANAParser(ListParser): """Parser for IANA Root Servers""" @staticmethod def parse(content: str) -> Set[tuple[str, Optional[str]]]: """ Parse IANA root servers (extract IPs from HTML/text) Look for IPv4 addresses in format XXX.XXX.XXX.XXX """ ips = set() # Regex for IPv4 addresses ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b' matches = re.findall(ipv4_pattern, content) for ip in matches: if ListParser.validate_ip(ip): ips.add(ListParser.normalize_cidr(ip)) return ips class NTPPoolParser(ListParser): """Parser for NTP Pool servers""" @staticmethod def parse(content: str) -> Set[tuple[str, Optional[str]]]: """ Parse NTP pool format (plain IP list or JSON) Tries multiple formats """ ips = set() # Try JSON first try: data = json.loads(content) if isinstance(data, list): for item in data: if isinstance(item, str) and ListParser.validate_ip(item): ips.add(ListParser.normalize_cidr(item)) elif isinstance(item, dict): ip = item.get('ip') or item.get('address') if ip and ListParser.validate_ip(ip): ips.add(ListParser.normalize_cidr(ip)) except json.JSONDecodeError: # Fallback to plain text parsing lines = content.strip().split('\n') for line in lines: line = line.strip() if line and ListParser.validate_ip(line): ips.add(ListParser.normalize_cidr(line)) return ips # Parser registry PARSERS: Dict[str, type[ListParser]] = { 'spamhaus': SpamhausParser, 'talos': TalosParser, 'aws': AWSParser, 'gcp': GCPParser, 'cloudflare': CloudflareParser, 'iana': IANAParser, 'ntp': NTPPoolParser, } def get_parser(list_name: str) -> Optional[type[ListParser]]: """Get parser by list name (case-insensitive match)""" list_name_lower = list_name.lower() for key, parser in PARSERS.items(): if key in list_name_lower: return parser # Default fallback: try plain text parser return TalosParser def parse_list(list_name: str, content: str) -> Set[tuple[str, Optional[str]]]: """ Parse list content using appropriate parser Returns set of (ip_address, cidr_range) tuples """ parser_class = get_parser(list_name) if parser_class: parser = parser_class() return parser.parse(content) return set()