#!/usr/bin/env python3 """ Seed default public lists into database Run after migration 006 to populate initial lists """ import psycopg2 import os import sys import argparse # Add parent directory to path sys.path.append(os.path.dirname(os.path.dirname(__file__))) from list_fetcher.fetcher import ListFetcher import asyncio DEFAULT_LISTS = [ # Blacklists { 'name': 'Spamhaus DROP', 'type': 'blacklist', 'url': 'https://www.spamhaus.org/drop/drop.txt', 'enabled': True, 'fetch_interval_minutes': 10 }, { 'name': 'Talos Intelligence IP Blacklist', 'type': 'blacklist', 'url': 'https://talosintelligence.com/documents/ip-blacklist', 'enabled': False, # Disabled by default - verify URL first 'fetch_interval_minutes': 10 }, # Whitelists { 'name': 'AWS IP Ranges', 'type': 'whitelist', 'url': 'https://ip-ranges.amazonaws.com/ip-ranges.json', 'enabled': True, 'fetch_interval_minutes': 10 }, { 'name': 'Google Cloud IP Ranges', 'type': 'whitelist', 'url': 'https://www.gstatic.com/ipranges/cloud.json', 'enabled': True, 'fetch_interval_minutes': 10 }, { 'name': 'Cloudflare IPv4', 'type': 'whitelist', 'url': 'https://www.cloudflare.com/ips-v4', 'enabled': True, 'fetch_interval_minutes': 10 }, { 'name': 'IANA Root Servers', 'type': 'whitelist', 'url': 'https://www.iana.org/domains/root/servers', 'enabled': True, 'fetch_interval_minutes': 10 }, { 'name': 'NTP Pool Servers', 'type': 'whitelist', 'url': 'https://www.ntppool.org/zone/@', 'enabled': False, # Disabled by default - zone parameter needed 'fetch_interval_minutes': 10 } ] def seed_lists(database_url: str, dry_run: bool = False): """Insert default lists into database""" conn = psycopg2.connect(database_url) try: with conn.cursor() as cur: # Check if lists already exist cur.execute("SELECT COUNT(*) FROM public_lists") result = cur.fetchone() existing_count = result[0] if result else 0 if existing_count > 0 and not dry_run: print(f"⚠️ Warning: {existing_count} lists already exist in database") response = input("Continue and add default lists? (y/n): ") if response.lower() != 'y': print("Aborted") return print(f"\n{'='*60}") print("SEEDING DEFAULT PUBLIC LISTS") print(f"{'='*60}\n") for list_config in DEFAULT_LISTS: if dry_run: status = "✓ ENABLED" if list_config['enabled'] else "○ DISABLED" print(f"{status} {list_config['type'].upper()}: {list_config['name']}") print(f" URL: {list_config['url']}") print() else: cur.execute(""" INSERT INTO public_lists (name, type, url, enabled, fetch_interval_minutes) VALUES (%s, %s, %s, %s, %s) RETURNING id, name """, ( list_config['name'], list_config['type'], list_config['url'], list_config['enabled'], list_config['fetch_interval_minutes'] )) result = cur.fetchone() if result: list_id, list_name = result status = "✓" if list_config['enabled'] else "○" print(f"{status} Added: {list_name} (ID: {list_id})") if not dry_run: conn.commit() print(f"\n✓ Successfully seeded {len(DEFAULT_LISTS)} lists") print(f"{'='*60}\n") else: print(f"\n{'='*60}") print(f"DRY RUN: Would seed {len(DEFAULT_LISTS)} lists") print(f"{'='*60}\n") except Exception as e: conn.rollback() print(f"✗ Error: {e}") import traceback traceback.print_exc() return 1 finally: conn.close() return 0 async def sync_lists(database_url: str): """Run initial sync of all enabled lists""" print("\nRunning initial sync of enabled lists...\n") fetcher = ListFetcher(database_url) await fetcher.fetch_all_lists() def main(): parser = argparse.ArgumentParser(description='Seed default public lists') parser.add_argument('--dry-run', action='store_true', help='Show what would be added without inserting') parser.add_argument('--sync', action='store_true', help='Run initial sync after seeding') args = parser.parse_args() database_url = os.getenv('DATABASE_URL') if not database_url: print("ERROR: DATABASE_URL environment variable not set") return 1 # Seed lists exit_code = seed_lists(database_url, dry_run=args.dry_run) if exit_code != 0: return exit_code # Optionally sync if args.sync and not args.dry_run: asyncio.run(sync_lists(database_url)) return 0 if __name__ == "__main__": sys.exit(main())