Update `normalize_cidr` function in `parsers.py` to use the full CIDR notation as the IP address for uniqueness, addressing duplicate entry errors during Spamhaus IP sync and resolving the `operator does not exist: inet = text` error related to the `whitelist` table by ensuring proper IP type handling. Replit-Commit-Author: Agent Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528 Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: 478f21ca-de02-4a5b-9eec-f73a3e16d0f0 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/rDib6Pq
294 lines
8.6 KiB
Python
294 lines
8.6 KiB
Python
import re
|
|
import json
|
|
from typing import List, Dict, Set, Optional
|
|
from datetime import datetime
|
|
import ipaddress
|
|
|
|
|
|
class ListParser:
|
|
"""Base parser for public IP lists"""
|
|
|
|
@staticmethod
|
|
def validate_ip(ip_str: str) -> bool:
|
|
"""Validate IP address or CIDR range"""
|
|
try:
|
|
ipaddress.ip_network(ip_str, strict=False)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
@staticmethod
|
|
def normalize_cidr(ip_str: str) -> tuple[str, Optional[str]]:
|
|
"""
|
|
Normalize IP/CIDR to (ip_address, cidr_range)
|
|
For CIDR ranges, use the full CIDR notation as ip_address to ensure uniqueness
|
|
Example: '1.2.3.0/24' -> ('1.2.3.0/24', '1.2.3.0/24')
|
|
'1.2.3.4' -> ('1.2.3.4', None)
|
|
"""
|
|
try:
|
|
network = ipaddress.ip_network(ip_str, strict=False)
|
|
if '/' in ip_str:
|
|
normalized_cidr = str(network)
|
|
return (normalized_cidr, normalized_cidr)
|
|
else:
|
|
return (ip_str, None)
|
|
except ValueError:
|
|
return (ip_str, None)
|
|
|
|
|
|
class SpamhausParser(ListParser):
|
|
"""Parser for Spamhaus DROP list"""
|
|
|
|
@staticmethod
|
|
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
|
|
"""
|
|
Parse Spamhaus DROP format:
|
|
- NDJSON (new): {"cidr":"1.2.3.0/24","sblid":"SBL12345","rir":"apnic"}
|
|
- Text (old): 1.2.3.0/24 ; SBL12345
|
|
"""
|
|
ips = set()
|
|
lines = content.strip().split('\n')
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
# Skip comments and empty lines
|
|
if not line or line.startswith(';') or line.startswith('#'):
|
|
continue
|
|
|
|
# Try NDJSON format first (new Spamhaus format)
|
|
if line.startswith('{'):
|
|
try:
|
|
data = json.loads(line)
|
|
cidr = data.get('cidr')
|
|
if cidr and ListParser.validate_ip(cidr):
|
|
ips.add(ListParser.normalize_cidr(cidr))
|
|
continue
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fallback: old text format
|
|
parts = line.split(';')
|
|
if parts:
|
|
ip_part = parts[0].strip()
|
|
if ip_part and ListParser.validate_ip(ip_part):
|
|
ips.add(ListParser.normalize_cidr(ip_part))
|
|
|
|
return ips
|
|
|
|
|
|
class TalosParser(ListParser):
|
|
"""Parser for Talos Intelligence blacklist"""
|
|
|
|
@staticmethod
|
|
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
|
|
"""
|
|
Parse Talos format (plain IP list):
|
|
1.2.3.4
|
|
5.6.7.0/24
|
|
"""
|
|
ips = set()
|
|
lines = content.strip().split('\n')
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
# Skip comments and empty lines
|
|
if not line or line.startswith('#') or line.startswith('//'):
|
|
continue
|
|
|
|
# Validate and add
|
|
if ListParser.validate_ip(line):
|
|
ips.add(ListParser.normalize_cidr(line))
|
|
|
|
return ips
|
|
|
|
|
|
class AWSParser(ListParser):
|
|
"""Parser for AWS IP ranges JSON"""
|
|
|
|
@staticmethod
|
|
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
|
|
"""
|
|
Parse AWS JSON format:
|
|
{
|
|
"prefixes": [
|
|
{"ip_prefix": "1.2.3.0/24", "region": "us-east-1", "service": "EC2"}
|
|
]
|
|
}
|
|
"""
|
|
ips = set()
|
|
|
|
try:
|
|
data = json.loads(content)
|
|
|
|
# IPv4 prefixes
|
|
for prefix in data.get('prefixes', []):
|
|
ip_prefix = prefix.get('ip_prefix')
|
|
if ip_prefix and ListParser.validate_ip(ip_prefix):
|
|
ips.add(ListParser.normalize_cidr(ip_prefix))
|
|
|
|
# IPv6 prefixes (optional)
|
|
for prefix in data.get('ipv6_prefixes', []):
|
|
ipv6_prefix = prefix.get('ipv6_prefix')
|
|
if ipv6_prefix and ListParser.validate_ip(ipv6_prefix):
|
|
ips.add(ListParser.normalize_cidr(ipv6_prefix))
|
|
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return ips
|
|
|
|
|
|
class GCPParser(ListParser):
|
|
"""Parser for Google Cloud IP ranges JSON"""
|
|
|
|
@staticmethod
|
|
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
|
|
"""
|
|
Parse GCP JSON format:
|
|
{
|
|
"prefixes": [
|
|
{"ipv4Prefix": "1.2.3.0/24"},
|
|
{"ipv6Prefix": "2001:db8::/32"}
|
|
]
|
|
}
|
|
"""
|
|
ips = set()
|
|
|
|
try:
|
|
data = json.loads(content)
|
|
|
|
for prefix in data.get('prefixes', []):
|
|
# IPv4
|
|
ipv4 = prefix.get('ipv4Prefix')
|
|
if ipv4 and ListParser.validate_ip(ipv4):
|
|
ips.add(ListParser.normalize_cidr(ipv4))
|
|
|
|
# IPv6
|
|
ipv6 = prefix.get('ipv6Prefix')
|
|
if ipv6 and ListParser.validate_ip(ipv6):
|
|
ips.add(ListParser.normalize_cidr(ipv6))
|
|
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return ips
|
|
|
|
|
|
class CloudflareParser(ListParser):
|
|
"""Parser for Cloudflare IP list"""
|
|
|
|
@staticmethod
|
|
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
|
|
"""
|
|
Parse Cloudflare format (plain CIDR list):
|
|
1.2.3.0/24
|
|
5.6.7.0/24
|
|
"""
|
|
ips = set()
|
|
lines = content.strip().split('\n')
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
if ListParser.validate_ip(line):
|
|
ips.add(ListParser.normalize_cidr(line))
|
|
|
|
return ips
|
|
|
|
|
|
class IANAParser(ListParser):
|
|
"""Parser for IANA Root Servers"""
|
|
|
|
@staticmethod
|
|
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
|
|
"""
|
|
Parse IANA root servers (extract IPs from HTML/text)
|
|
Look for IPv4 addresses in format XXX.XXX.XXX.XXX
|
|
"""
|
|
ips = set()
|
|
|
|
# Regex for IPv4 addresses
|
|
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
|
|
matches = re.findall(ipv4_pattern, content)
|
|
|
|
for ip in matches:
|
|
if ListParser.validate_ip(ip):
|
|
ips.add(ListParser.normalize_cidr(ip))
|
|
|
|
return ips
|
|
|
|
|
|
class NTPPoolParser(ListParser):
|
|
"""Parser for NTP Pool servers"""
|
|
|
|
@staticmethod
|
|
def parse(content: str) -> Set[tuple[str, Optional[str]]]:
|
|
"""
|
|
Parse NTP pool format (plain IP list or JSON)
|
|
Tries multiple formats
|
|
"""
|
|
ips = set()
|
|
|
|
# Try JSON first
|
|
try:
|
|
data = json.loads(content)
|
|
if isinstance(data, list):
|
|
for item in data:
|
|
if isinstance(item, str) and ListParser.validate_ip(item):
|
|
ips.add(ListParser.normalize_cidr(item))
|
|
elif isinstance(item, dict):
|
|
ip = item.get('ip') or item.get('address')
|
|
if ip and ListParser.validate_ip(ip):
|
|
ips.add(ListParser.normalize_cidr(ip))
|
|
except json.JSONDecodeError:
|
|
# Fallback to plain text parsing
|
|
lines = content.strip().split('\n')
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line and ListParser.validate_ip(line):
|
|
ips.add(ListParser.normalize_cidr(line))
|
|
|
|
return ips
|
|
|
|
|
|
# Parser registry
|
|
PARSERS: Dict[str, type[ListParser]] = {
|
|
'spamhaus': SpamhausParser,
|
|
'talos': TalosParser,
|
|
'aws': AWSParser,
|
|
'gcp': GCPParser,
|
|
'cloudflare': CloudflareParser,
|
|
'iana': IANAParser,
|
|
'ntp': NTPPoolParser,
|
|
}
|
|
|
|
|
|
def get_parser(list_name: str) -> Optional[type[ListParser]]:
|
|
"""Get parser by list name (case-insensitive match)"""
|
|
list_name_lower = list_name.lower()
|
|
|
|
for key, parser in PARSERS.items():
|
|
if key in list_name_lower:
|
|
return parser
|
|
|
|
# Default fallback: try plain text parser
|
|
return TalosParser
|
|
|
|
|
|
def parse_list(list_name: str, content: str) -> Set[tuple[str, Optional[str]]]:
|
|
"""
|
|
Parse list content using appropriate parser
|
|
Returns set of (ip_address, cidr_range) tuples
|
|
"""
|
|
parser_class = get_parser(list_name)
|
|
if parser_class:
|
|
parser = parser_class()
|
|
return parser.parse(content)
|
|
return set()
|