ids.alfacom.it/python_ml/list_fetcher/parsers.py
marco370 0a269a9032 Update list fetching to correctly parse Google IP ranges
Add 'google' as an alias for GCPParser in `python_ml/list_fetcher/parsers.py` to resolve issues with parsing Google Cloud and Google global IP lists.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 771e5bf9-f7cd-42b4-9abb-d79a800368ae
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/C6BdLIt
2026-01-02 16:16:15 +00:00

295 lines
8.6 KiB
Python

import re
import json
from typing import List, Dict, Set, Optional
from datetime import datetime
import ipaddress
class ListParser:
"""Base parser for public IP lists"""
@staticmethod
def validate_ip(ip_str: str) -> bool:
"""Validate IP address or CIDR range"""
try:
ipaddress.ip_network(ip_str, strict=False)
return True
except ValueError:
return False
@staticmethod
def normalize_cidr(ip_str: str) -> tuple[str, Optional[str]]:
"""
Normalize IP/CIDR to (ip_address, cidr_range)
For CIDR ranges, use the full CIDR notation as ip_address to ensure uniqueness
Example: '1.2.3.0/24' -> ('1.2.3.0/24', '1.2.3.0/24')
'1.2.3.4' -> ('1.2.3.4', None)
"""
try:
network = ipaddress.ip_network(ip_str, strict=False)
if '/' in ip_str:
normalized_cidr = str(network)
return (normalized_cidr, normalized_cidr)
else:
return (ip_str, None)
except ValueError:
return (ip_str, None)
class SpamhausParser(ListParser):
"""Parser for Spamhaus DROP list"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse Spamhaus DROP format:
- NDJSON (new): {"cidr":"1.2.3.0/24","sblid":"SBL12345","rir":"apnic"}
- Text (old): 1.2.3.0/24 ; SBL12345
"""
ips = set()
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith(';') or line.startswith('#'):
continue
# Try NDJSON format first (new Spamhaus format)
if line.startswith('{'):
try:
data = json.loads(line)
cidr = data.get('cidr')
if cidr and ListParser.validate_ip(cidr):
ips.add(ListParser.normalize_cidr(cidr))
continue
except json.JSONDecodeError:
pass
# Fallback: old text format
parts = line.split(';')
if parts:
ip_part = parts[0].strip()
if ip_part and ListParser.validate_ip(ip_part):
ips.add(ListParser.normalize_cidr(ip_part))
return ips
class TalosParser(ListParser):
"""Parser for Talos Intelligence blacklist"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse Talos format (plain IP list):
1.2.3.4
5.6.7.0/24
"""
ips = set()
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#') or line.startswith('//'):
continue
# Validate and add
if ListParser.validate_ip(line):
ips.add(ListParser.normalize_cidr(line))
return ips
class AWSParser(ListParser):
"""Parser for AWS IP ranges JSON"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse AWS JSON format:
{
"prefixes": [
{"ip_prefix": "1.2.3.0/24", "region": "us-east-1", "service": "EC2"}
]
}
"""
ips = set()
try:
data = json.loads(content)
# IPv4 prefixes
for prefix in data.get('prefixes', []):
ip_prefix = prefix.get('ip_prefix')
if ip_prefix and ListParser.validate_ip(ip_prefix):
ips.add(ListParser.normalize_cidr(ip_prefix))
# IPv6 prefixes (optional)
for prefix in data.get('ipv6_prefixes', []):
ipv6_prefix = prefix.get('ipv6_prefix')
if ipv6_prefix and ListParser.validate_ip(ipv6_prefix):
ips.add(ListParser.normalize_cidr(ipv6_prefix))
except json.JSONDecodeError:
pass
return ips
class GCPParser(ListParser):
"""Parser for Google Cloud IP ranges JSON"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse GCP JSON format:
{
"prefixes": [
{"ipv4Prefix": "1.2.3.0/24"},
{"ipv6Prefix": "2001:db8::/32"}
]
}
"""
ips = set()
try:
data = json.loads(content)
for prefix in data.get('prefixes', []):
# IPv4
ipv4 = prefix.get('ipv4Prefix')
if ipv4 and ListParser.validate_ip(ipv4):
ips.add(ListParser.normalize_cidr(ipv4))
# IPv6
ipv6 = prefix.get('ipv6Prefix')
if ipv6 and ListParser.validate_ip(ipv6):
ips.add(ListParser.normalize_cidr(ipv6))
except json.JSONDecodeError:
pass
return ips
class CloudflareParser(ListParser):
"""Parser for Cloudflare IP list"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse Cloudflare format (plain CIDR list):
1.2.3.0/24
5.6.7.0/24
"""
ips = set()
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
if ListParser.validate_ip(line):
ips.add(ListParser.normalize_cidr(line))
return ips
class IANAParser(ListParser):
"""Parser for IANA Root Servers"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse IANA root servers (extract IPs from HTML/text)
Look for IPv4 addresses in format XXX.XXX.XXX.XXX
"""
ips = set()
# Regex for IPv4 addresses
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
matches = re.findall(ipv4_pattern, content)
for ip in matches:
if ListParser.validate_ip(ip):
ips.add(ListParser.normalize_cidr(ip))
return ips
class NTPPoolParser(ListParser):
"""Parser for NTP Pool servers"""
@staticmethod
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse NTP pool format (plain IP list or JSON)
Tries multiple formats
"""
ips = set()
# Try JSON first
try:
data = json.loads(content)
if isinstance(data, list):
for item in data:
if isinstance(item, str) and ListParser.validate_ip(item):
ips.add(ListParser.normalize_cidr(item))
elif isinstance(item, dict):
ip = item.get('ip') or item.get('address')
if ip and ListParser.validate_ip(ip):
ips.add(ListParser.normalize_cidr(ip))
except json.JSONDecodeError:
# Fallback to plain text parsing
lines = content.strip().split('\n')
for line in lines:
line = line.strip()
if line and ListParser.validate_ip(line):
ips.add(ListParser.normalize_cidr(line))
return ips
# Parser registry
PARSERS: Dict[str, type[ListParser]] = {
'spamhaus': SpamhausParser,
'talos': TalosParser,
'aws': AWSParser,
'gcp': GCPParser,
'google': GCPParser,
'cloudflare': CloudflareParser,
'iana': IANAParser,
'ntp': NTPPoolParser,
}
def get_parser(list_name: str) -> Optional[type[ListParser]]:
"""Get parser by list name (case-insensitive match)"""
list_name_lower = list_name.lower()
for key, parser in PARSERS.items():
if key in list_name_lower:
return parser
# Default fallback: try plain text parser
return TalosParser
def parse_list(list_name: str, content: str) -> Set[tuple[str, Optional[str]]]:
"""
Parse list content using appropriate parser
Returns set of (ip_address, cidr_range) tuples
"""
parser_class = get_parser(list_name)
if parser_class:
parser = parser_class()
return parser.parse(content)
return set()