ids.alfacom.it/python_ml/list_fetcher/fetcher.py
marco370 77874c83bf Add functionality to manage and sync public blacklists and whitelists
Integrates external public IP lists for enhanced threat detection and whitelisting capabilities, including API endpoints, database schema changes, and a new fetching service.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 7a657272-55ba-4a79-9a2e-f1ed9bc7a528
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: b1366669-0ccd-493e-9e06-4e4168e2fa3b
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/449cf7c4-c97a-45ae-8234-e5c5b8d6a84f/7a657272-55ba-4a79-9a2e-f1ed9bc7a528/QKzTQQy
2025-11-26 09:21:43 +00:00

392 lines
14 KiB
Python

import asyncio
import httpx
from datetime import datetime
from typing import Dict, List, Set, Tuple, Optional
import psycopg2
from psycopg2.extras import execute_values
import os
import sys
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from list_fetcher.parsers import parse_list
class ListFetcher:
"""Fetches and synchronizes public IP lists"""
def __init__(self, database_url: str):
self.database_url = database_url
self.timeout = 30.0
self.max_retries = 3
def get_db_connection(self):
"""Create database connection"""
return psycopg2.connect(self.database_url)
async def fetch_url(self, url: str) -> Optional[str]:
"""Download content from URL with retry logic"""
async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True) as client:
for attempt in range(self.max_retries):
try:
response = await client.get(url)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
if attempt == self.max_retries - 1:
raise Exception(f"HTTP error after {self.max_retries} attempts: {e}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
if attempt == self.max_retries - 1:
raise Exception(f"Download failed: {e}")
await asyncio.sleep(2 ** attempt)
return None
def get_enabled_lists(self) -> List[Dict]:
"""Get all enabled public lists from database"""
conn = self.get_db_connection()
try:
with conn.cursor() as cur:
cur.execute("""
SELECT id, name, type, url, fetch_interval_minutes
FROM public_lists
WHERE enabled = true
ORDER BY type, name
""")
if cur.description is None:
return []
columns = [desc[0] for desc in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
finally:
conn.close()
def get_existing_ips(self, list_id: str, list_type: str) -> Set[str]:
"""Get existing IPs for a list from database"""
conn = self.get_db_connection()
try:
with conn.cursor() as cur:
if list_type == 'blacklist':
cur.execute("""
SELECT ip_address
FROM public_blacklist_ips
WHERE list_id = %s AND is_active = true
""", (list_id,))
else: # whitelist
cur.execute("""
SELECT ip_address
FROM whitelist
WHERE list_id = %s AND active = true
""", (list_id,))
return {row[0] for row in cur.fetchall()}
finally:
conn.close()
def sync_blacklist_ips(self, list_id: str, new_ips: Set[Tuple[str, Optional[str]]]):
"""Sync blacklist IPs: add new, mark inactive old ones"""
conn = self.get_db_connection()
try:
with conn.cursor() as cur:
# Get existing IPs
existing = self.get_existing_ips(list_id, 'blacklist')
new_ip_addresses = {ip for ip, _ in new_ips}
# Calculate diff
to_add = new_ip_addresses - existing
to_deactivate = existing - new_ip_addresses
to_update = existing & new_ip_addresses
# Mark old IPs as inactive
if to_deactivate:
cur.execute("""
UPDATE public_blacklist_ips
SET is_active = false
WHERE list_id = %s AND ip_address = ANY(%s)
""", (list_id, list(to_deactivate)))
# Update last_seen for existing active IPs
if to_update:
cur.execute("""
UPDATE public_blacklist_ips
SET last_seen = NOW()
WHERE list_id = %s AND ip_address = ANY(%s)
""", (list_id, list(to_update)))
# Add new IPs
if to_add:
values = []
for ip, cidr in new_ips:
if ip in to_add:
values.append((ip, cidr, list_id))
execute_values(cur, """
INSERT INTO public_blacklist_ips (ip_address, cidr_range, list_id)
VALUES %s
ON CONFLICT (ip_address, list_id) DO UPDATE
SET is_active = true, last_seen = NOW()
""", values)
# Update list stats
cur.execute("""
UPDATE public_lists
SET total_ips = %s,
active_ips = %s,
last_success = NOW()
WHERE id = %s
""", (len(new_ip_addresses), len(new_ip_addresses), list_id))
conn.commit()
return len(to_add), len(to_deactivate), len(to_update)
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def sync_whitelist_ips(self, list_id: str, list_name: str, new_ips: Set[Tuple[str, Optional[str]]]):
"""Sync whitelist IPs: add new, deactivate old ones"""
conn = self.get_db_connection()
try:
with conn.cursor() as cur:
# Get existing IPs
existing = self.get_existing_ips(list_id, 'whitelist')
new_ip_addresses = {ip for ip, _ in new_ips}
# Calculate diff
to_add = new_ip_addresses - existing
to_deactivate = existing - new_ip_addresses
to_update = existing & new_ip_addresses
# Determine source name from list name
source = 'other'
list_lower = list_name.lower()
if 'aws' in list_lower:
source = 'aws'
elif 'gcp' in list_lower or 'google' in list_lower:
source = 'gcp'
elif 'cloudflare' in list_lower:
source = 'cloudflare'
elif 'iana' in list_lower:
source = 'iana'
elif 'ntp' in list_lower:
source = 'ntp'
# Mark old IPs as inactive
if to_deactivate:
cur.execute("""
UPDATE whitelist
SET active = false
WHERE list_id = %s AND ip_address = ANY(%s)
""", (list_id, list(to_deactivate)))
# Add new IPs
if to_add:
values = []
for ip, cidr in new_ips:
if ip in to_add:
comment = f"Auto-imported from {list_name}"
if cidr:
comment += f" (CIDR: {cidr})"
values.append((ip, comment, source, list_id))
execute_values(cur, """
INSERT INTO whitelist (ip_address, comment, source, list_id)
VALUES %s
ON CONFLICT (ip_address) DO UPDATE
SET active = true, source = EXCLUDED.source, list_id = EXCLUDED.list_id
""", values)
# Update list stats
cur.execute("""
UPDATE public_lists
SET total_ips = %s,
active_ips = %s,
last_success = NOW()
WHERE id = %s
""", (len(new_ip_addresses), len(new_ip_addresses), list_id))
conn.commit()
return len(to_add), len(to_deactivate), len(to_update)
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
async def fetch_and_sync_list(self, list_config: Dict) -> Dict:
"""Fetch and sync a single list"""
list_id = list_config['id']
list_name = list_config['name']
list_type = list_config['type']
url = list_config['url']
result = {
'list_id': list_id,
'list_name': list_name,
'success': False,
'added': 0,
'removed': 0,
'updated': 0,
'error': None
}
conn = self.get_db_connection()
try:
# Update last_fetch timestamp
with conn.cursor() as cur:
cur.execute("""
UPDATE public_lists
SET last_fetch = NOW()
WHERE id = %s
""", (list_id,))
conn.commit()
# Download content
print(f"[{datetime.now().strftime('%H:%M:%S')}] Downloading {list_name} from {url}...")
content = await self.fetch_url(url)
if not content:
raise Exception("Empty response from server")
# Parse IPs
print(f"[{datetime.now().strftime('%H:%M:%S')}] Parsing {list_name}...")
ips = parse_list(list_name, content)
if not ips:
raise Exception("No valid IPs found in list")
print(f"[{datetime.now().strftime('%H:%M:%S')}] Found {len(ips)} IPs, syncing to database...")
# Sync to database
if list_type == 'blacklist':
added, removed, updated = self.sync_blacklist_ips(list_id, ips)
else:
added, removed, updated = self.sync_whitelist_ips(list_id, list_name, ips)
result.update({
'success': True,
'added': added,
'removed': removed,
'updated': updated
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] ✓ {list_name}: +{added} -{removed} ~{updated}")
# Reset error count on success
with conn.cursor() as cur:
cur.execute("""
UPDATE public_lists
SET error_count = 0, last_error = NULL
WHERE id = %s
""", (list_id,))
conn.commit()
except Exception as e:
error_msg = str(e)
result['error'] = error_msg
print(f"[{datetime.now().strftime('%H:%M:%S')}] ✗ {list_name}: {error_msg}")
# Increment error count
with conn.cursor() as cur:
cur.execute("""
UPDATE public_lists
SET error_count = error_count + 1,
last_error = %s
WHERE id = %s
""", (error_msg[:500], list_id))
conn.commit()
finally:
conn.close()
return result
async def fetch_all_lists(self) -> List[Dict]:
"""Fetch and sync all enabled lists"""
print(f"\n{'='*60}")
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] PUBLIC LISTS SYNC")
print(f"{'='*60}\n")
# Get enabled lists
lists = self.get_enabled_lists()
if not lists:
print("No enabled lists found")
return []
print(f"Found {len(lists)} enabled lists\n")
# Fetch all lists in parallel
tasks = [self.fetch_and_sync_list(list_config) for list_config in lists]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Summary
print(f"\n{'='*60}")
print("SYNC SUMMARY")
print(f"{'='*60}")
success_count = sum(1 for r in results if isinstance(r, dict) and r.get('success'))
error_count = len(results) - success_count
total_added = sum(r.get('added', 0) for r in results if isinstance(r, dict))
total_removed = sum(r.get('removed', 0) for r in results if isinstance(r, dict))
print(f"Success: {success_count}/{len(results)}")
print(f"Errors: {error_count}/{len(results)}")
print(f"Total IPs Added: {total_added}")
print(f"Total IPs Removed: {total_removed}")
print(f"{'='*60}\n")
return [r for r in results if isinstance(r, dict)]
async def main():
"""Main entry point for list fetcher"""
database_url = os.getenv('DATABASE_URL')
if not database_url:
print("ERROR: DATABASE_URL environment variable not set")
return 1
fetcher = ListFetcher(database_url)
try:
# Fetch and sync all lists
await fetcher.fetch_all_lists()
# Run merge logic to sync detections with blacklist/whitelist priority
print("\n" + "="*60)
print("RUNNING MERGE LOGIC")
print("="*60 + "\n")
# Import merge logic (avoid circular imports)
import sys
from pathlib import Path
merge_logic_path = Path(__file__).parent.parent
sys.path.insert(0, str(merge_logic_path))
from merge_logic import MergeLogic
merge = MergeLogic(database_url)
stats = merge.sync_public_blacklist_detections()
print(f"\nMerge Logic Stats:")
print(f" Created detections: {stats['created']}")
print(f" Cleaned invalid detections: {stats['cleaned']}")
print(f" Skipped (whitelisted): {stats['skipped_whitelisted']}")
print("="*60 + "\n")
return 0
except Exception as e:
print(f"FATAL ERROR: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)