import asyncio import httpx from datetime import datetime from typing import Dict, List, Set, Tuple, Optional import psycopg2 from psycopg2.extras import execute_values import os import sys # Add parent directory to path for imports sys.path.append(os.path.dirname(os.path.dirname(__file__))) from list_fetcher.parsers import parse_list class ListFetcher: """Fetches and synchronizes public IP lists""" def __init__(self, database_url: str): self.database_url = database_url self.timeout = 30.0 self.max_retries = 3 def get_db_connection(self): """Create database connection""" return psycopg2.connect(self.database_url) async def fetch_url(self, url: str) -> Optional[str]: """Download content from URL with retry logic""" async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True) as client: for attempt in range(self.max_retries): try: response = await client.get(url) response.raise_for_status() return response.text except httpx.HTTPError as e: if attempt == self.max_retries - 1: raise Exception(f"HTTP error after {self.max_retries} attempts: {e}") await asyncio.sleep(2 ** attempt) # Exponential backoff except Exception as e: if attempt == self.max_retries - 1: raise Exception(f"Download failed: {e}") await asyncio.sleep(2 ** attempt) return None def get_enabled_lists(self) -> List[Dict]: """Get all enabled public lists from database""" conn = self.get_db_connection() try: with conn.cursor() as cur: cur.execute(""" SELECT id, name, type, url, fetch_interval_minutes FROM public_lists WHERE enabled = true ORDER BY type, name """) if cur.description is None: return [] columns = [desc[0] for desc in cur.description] return [dict(zip(columns, row)) for row in cur.fetchall()] finally: conn.close() def get_existing_ips(self, list_id: str, list_type: str) -> Set[str]: """Get existing IPs for a list from database""" conn = self.get_db_connection() try: with conn.cursor() as cur: if list_type == 'blacklist': cur.execute(""" SELECT ip_address FROM public_blacklist_ips WHERE list_id = %s AND is_active = true """, (list_id,)) else: # whitelist cur.execute(""" SELECT ip_address FROM whitelist WHERE list_id = %s AND active = true """, (list_id,)) return {row[0] for row in cur.fetchall()} finally: conn.close() def sync_blacklist_ips(self, list_id: str, new_ips: Set[Tuple[str, Optional[str]]]): """Sync blacklist IPs: add new, mark inactive old ones""" conn = self.get_db_connection() try: with conn.cursor() as cur: # Get existing IPs existing = self.get_existing_ips(list_id, 'blacklist') new_ip_addresses = {ip for ip, _ in new_ips} # Calculate diff to_add = new_ip_addresses - existing to_deactivate = existing - new_ip_addresses to_update = existing & new_ip_addresses # Mark old IPs as inactive if to_deactivate: cur.execute(""" UPDATE public_blacklist_ips SET is_active = false WHERE list_id = %s AND ip_address = ANY(%s) """, (list_id, list(to_deactivate))) # Update last_seen for existing active IPs if to_update: cur.execute(""" UPDATE public_blacklist_ips SET last_seen = NOW() WHERE list_id = %s AND ip_address = ANY(%s) """, (list_id, list(to_update))) # Add new IPs with INET/CIDR support if to_add: values = [] for ip, cidr in new_ips: if ip in to_add: # Compute INET values for CIDR matching cidr_inet = cidr if cidr else f"{ip}/32" values.append((ip, cidr, ip, cidr_inet, list_id)) execute_values(cur, """ INSERT INTO public_blacklist_ips (ip_address, cidr_range, ip_inet, cidr_inet, list_id) VALUES %s ON CONFLICT (ip_address, list_id) DO UPDATE SET is_active = true, last_seen = NOW(), ip_inet = EXCLUDED.ip_inet, cidr_inet = EXCLUDED.cidr_inet """, values) # Update list stats cur.execute(""" UPDATE public_lists SET total_ips = %s, active_ips = %s, last_success = NOW() WHERE id = %s """, (len(new_ip_addresses), len(new_ip_addresses), list_id)) conn.commit() return len(to_add), len(to_deactivate), len(to_update) except Exception as e: conn.rollback() raise e finally: conn.close() def sync_whitelist_ips(self, list_id: str, list_name: str, new_ips: Set[Tuple[str, Optional[str]]]): """Sync whitelist IPs: add new, deactivate old ones""" conn = self.get_db_connection() try: with conn.cursor() as cur: # Get existing IPs existing = self.get_existing_ips(list_id, 'whitelist') new_ip_addresses = {ip for ip, _ in new_ips} # Calculate diff to_add = new_ip_addresses - existing to_deactivate = existing - new_ip_addresses to_update = existing & new_ip_addresses # Determine source name from list name source = 'other' list_lower = list_name.lower() if 'aws' in list_lower: source = 'aws' elif 'gcp' in list_lower or 'google' in list_lower: source = 'gcp' elif 'cloudflare' in list_lower: source = 'cloudflare' elif 'iana' in list_lower: source = 'iana' elif 'ntp' in list_lower: source = 'ntp' # Mark old IPs as inactive if to_deactivate: cur.execute(""" UPDATE whitelist SET active = false WHERE list_id = %s AND ip_address = ANY(%s) """, (list_id, list(to_deactivate))) # Add new IPs with INET support for CIDR matching if to_add: values = [] for ip, cidr in new_ips: if ip in to_add: comment = f"Auto-imported from {list_name}" if cidr: comment += f" (CIDR: {cidr})" # Compute ip_inet for CIDR-aware whitelisting ip_inet = cidr if cidr else ip values.append((ip, ip_inet, comment, source, list_id)) execute_values(cur, """ INSERT INTO whitelist (ip_address, ip_inet, comment, source, list_id) VALUES %s ON CONFLICT (ip_address) DO UPDATE SET active = true, ip_inet = EXCLUDED.ip_inet, source = EXCLUDED.source, list_id = EXCLUDED.list_id """, values) # Update list stats cur.execute(""" UPDATE public_lists SET total_ips = %s, active_ips = %s, last_success = NOW() WHERE id = %s """, (len(new_ip_addresses), len(new_ip_addresses), list_id)) conn.commit() return len(to_add), len(to_deactivate), len(to_update) except Exception as e: conn.rollback() raise e finally: conn.close() async def fetch_and_sync_list(self, list_config: Dict) -> Dict: """Fetch and sync a single list""" list_id = list_config['id'] list_name = list_config['name'] list_type = list_config['type'] url = list_config['url'] result = { 'list_id': list_id, 'list_name': list_name, 'success': False, 'added': 0, 'removed': 0, 'updated': 0, 'error': None } conn = self.get_db_connection() try: # Update last_fetch timestamp with conn.cursor() as cur: cur.execute(""" UPDATE public_lists SET last_fetch = NOW() WHERE id = %s """, (list_id,)) conn.commit() # Download content print(f"[{datetime.now().strftime('%H:%M:%S')}] Downloading {list_name} from {url}...") content = await self.fetch_url(url) if not content: raise Exception("Empty response from server") # Parse IPs print(f"[{datetime.now().strftime('%H:%M:%S')}] Parsing {list_name}...") ips = parse_list(list_name, content) if not ips: raise Exception("No valid IPs found in list") print(f"[{datetime.now().strftime('%H:%M:%S')}] Found {len(ips)} IPs, syncing to database...") # Sync to database if list_type == 'blacklist': added, removed, updated = self.sync_blacklist_ips(list_id, ips) else: added, removed, updated = self.sync_whitelist_ips(list_id, list_name, ips) result.update({ 'success': True, 'added': added, 'removed': removed, 'updated': updated }) print(f"[{datetime.now().strftime('%H:%M:%S')}] ✓ {list_name}: +{added} -{removed} ~{updated}") # Reset error count on success with conn.cursor() as cur: cur.execute(""" UPDATE public_lists SET error_count = 0, last_error = NULL WHERE id = %s """, (list_id,)) conn.commit() except Exception as e: error_msg = str(e) result['error'] = error_msg print(f"[{datetime.now().strftime('%H:%M:%S')}] ✗ {list_name}: {error_msg}") # Increment error count with conn.cursor() as cur: cur.execute(""" UPDATE public_lists SET error_count = error_count + 1, last_error = %s WHERE id = %s """, (error_msg[:500], list_id)) conn.commit() finally: conn.close() return result async def fetch_all_lists(self) -> List[Dict]: """Fetch and sync all enabled lists""" print(f"\n{'='*60}") print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] PUBLIC LISTS SYNC") print(f"{'='*60}\n") # Get enabled lists lists = self.get_enabled_lists() if not lists: print("No enabled lists found") return [] print(f"Found {len(lists)} enabled lists\n") # Fetch all lists in parallel tasks = [self.fetch_and_sync_list(list_config) for list_config in lists] results = await asyncio.gather(*tasks, return_exceptions=True) # Summary print(f"\n{'='*60}") print("SYNC SUMMARY") print(f"{'='*60}") success_count = sum(1 for r in results if isinstance(r, dict) and r.get('success')) error_count = len(results) - success_count total_added = sum(r.get('added', 0) for r in results if isinstance(r, dict)) total_removed = sum(r.get('removed', 0) for r in results if isinstance(r, dict)) print(f"Success: {success_count}/{len(results)}") print(f"Errors: {error_count}/{len(results)}") print(f"Total IPs Added: {total_added}") print(f"Total IPs Removed: {total_removed}") print(f"{'='*60}\n") return [r for r in results if isinstance(r, dict)] async def main(): """Main entry point for list fetcher""" database_url = os.getenv('DATABASE_URL') if not database_url: print("ERROR: DATABASE_URL environment variable not set") return 1 fetcher = ListFetcher(database_url) try: # Fetch and sync all lists await fetcher.fetch_all_lists() # Run merge logic to sync detections with blacklist/whitelist priority print("\n" + "="*60) print("RUNNING MERGE LOGIC") print("="*60 + "\n") # Import merge logic (avoid circular imports) import sys from pathlib import Path merge_logic_path = Path(__file__).parent.parent sys.path.insert(0, str(merge_logic_path)) from merge_logic import MergeLogic merge = MergeLogic(database_url) stats = merge.sync_public_blacklist_detections() print(f"\nMerge Logic Stats:") print(f" Created detections: {stats['created']}") print(f" Cleaned invalid detections: {stats['cleaned']}") print(f" Skipped (whitelisted): {stats['skipped_whitelisted']}") print("="*60 + "\n") return 0 except Exception as e: print(f"FATAL ERROR: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": exit_code = asyncio.run(main()) sys.exit(exit_code)