#!/usr/bin/env python3 """ CrowdSec → BunnyCDN Bouncer Sync CrowdSec LAPI에서 ban 결정을 가져와 bloom filter로 변환 후 BunnyCDN Edge Script 코드에 임베딩하여 배포. Usage: python3 bouncer.py # 스트림 모드 (delta) python3 bouncer.py --startup # 전체 동기화 python3 bouncer.py --dry-run # 변경 없이 출력만 Environment: CROWDSEC_LAPI_URL CrowdSec LAPI URL (default: http://10.253.100.240:8080) CROWDSEC_BOUNCER_KEY Bouncer API key BUNNY_API_KEY BunnyCDN account API key BUNNY_SCRIPT_ID Edge Script ID (default: 64811) """ import argparse import base64 import hashlib import json import logging import math import os import re import struct import sys import time from ipaddress import IPv4Network, ip_address from typing import Optional import requests logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) log = logging.getLogger("bouncer") # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- CROWDSEC_LAPI_URL = os.environ.get("CROWDSEC_LAPI_URL", "http://10.253.100.240:8080") CROWDSEC_BOUNCER_KEY = os.environ.get("CROWDSEC_BOUNCER_KEY", "") BUNNY_API_KEY = os.environ.get("BUNNY_API_KEY", "") BUNNY_SCRIPT_ID = int(os.environ.get("BUNNY_SCRIPT_ID", "64811")) # Bloom filter parameters BLOOM_FP_RATE = 0.001 # 0.1% false positive rate BLOOM_MIN_ITEMS = 100 # minimum expected items (avoid tiny filters) # State file for stream cursor STATE_FILE = os.environ.get("STATE_FILE", "/var/lib/crowdsec-bouncer/state.json") # --------------------------------------------------------------------------- # Bloom filter (FNV-1a — must match edge script) # --------------------------------------------------------------------------- def fnv1a32(data: bytes) -> int: h = 0x811C9DC5 for b in data: h ^= b h = (h * 0x01000193) & 0xFFFFFFFF return h class BloomFilter: def __init__(self, expected_items: int, fp_rate: float = 0.001): n = max(expected_items, BLOOM_MIN_ITEMS) self.m = max(self._optimal_m(n, fp_rate), 64) # bits self.k = max(self._optimal_k(self.m, n), 1) # hash functions byte_count = (self.m + 7) // 8 self.bits = bytearray(byte_count) @staticmethod def _optimal_m(n: int, p: float) -> int: return int(-n * math.log(p) / (math.log(2) ** 2)) @staticmethod def _optimal_k(m: int, n: int) -> int: return max(int((m / n) * math.log(2)), 1) def add(self, item: str) -> None: ip_bytes = item.encode("utf-8") ip_bytes_ff = ip_bytes + b"\xff" h1 = fnv1a32(ip_bytes) h2 = fnv1a32(ip_bytes_ff) for i in range(self.k): pos = (h1 + i * h2) % self.m self.bits[pos >> 3] |= 1 << (pos & 7) def to_base64(self) -> str: # Header: 4 bytes m (little-endian), 4 bytes k (little-endian) header = struct.pack(" str: return hashlib.md5(self.bits).hexdigest()[:16] # --------------------------------------------------------------------------- # CIDR expansion # --------------------------------------------------------------------------- def expand_cidr(value: str) -> list[str]: """Expand a CIDR range to individual IPs (max /16).""" try: net = IPv4Network(value, strict=False) if net.prefixlen < 16: log.warning("Skipping too-large range: %s", value) return [] if net.prefixlen == 32: return [str(net.network_address)] return [str(ip) for ip in net.hosts()] except ValueError: # Not a CIDR, treat as single IP try: ip_address(value) return [value] except ValueError: log.warning("Invalid IP/CIDR: %s", value) return [] # --------------------------------------------------------------------------- # CrowdSec LAPI client # --------------------------------------------------------------------------- def load_state() -> Optional[str]: """Load last stream cursor timestamp.""" try: with open(STATE_FILE) as f: data = json.load(f) return data.get("last_pull") except (FileNotFoundError, json.JSONDecodeError): return None def save_state(timestamp: str) -> None: """Save stream cursor timestamp.""" os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) with open(STATE_FILE, "w") as f: json.dump({"last_pull": timestamp}, f) def fetch_decisions(startup: bool = False) -> tuple[list[str], list[str]]: """Fetch decisions from CrowdSec LAPI stream endpoint. Returns (new_ips, deleted_ips). """ url = f"{CROWDSEC_LAPI_URL}/v1/decisions/stream" params = {"startup": "true"} if startup else {} if not startup: last_pull = load_state() if last_pull: # For non-startup, we only care about changes since last pull pass headers = {"X-Api-Key": CROWDSEC_BOUNCER_KEY} try: resp = requests.get(url, headers=headers, params=params, timeout=30) resp.raise_for_status() except requests.RequestException as e: log.error("Failed to fetch decisions: %s", e) return [], [] data = resp.json() new_ips = [] deleted_ips = [] for decision in data.get("new") or []: value = decision.get("value", "") if decision.get("type", "").lower() == "ban": new_ips.extend(expand_cidr(value)) for decision in data.get("deleted") or []: value = decision.get("value", "") deleted_ips.extend(expand_cidr(value)) save_state(time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())) return new_ips, deleted_ips def fetch_all_active_decisions() -> list[str]: """Fetch all currently active ban decisions.""" url = f"{CROWDSEC_LAPI_URL}/v1/decisions" headers = {"X-Api-Key": CROWDSEC_BOUNCER_KEY} try: resp = requests.get(url, headers=headers, params={"type": "ban"}, timeout=30) resp.raise_for_status() except requests.RequestException as e: log.error("Failed to fetch all decisions: %s", e) return [] decisions = resp.json() or [] ips = [] for d in decisions: value = d.get("value", "") ips.extend(expand_cidr(value)) return ips # --------------------------------------------------------------------------- # IP state management # --------------------------------------------------------------------------- class IPState: """Manages the set of currently blocked IPs.""" def __init__(self, state_file: str): self._file = state_file.replace("state.json", "blocked_ips.json") self.ips: set[str] = set() self._load() def _load(self) -> None: try: with open(self._file) as f: data = json.load(f) self.ips = set(data.get("ips", [])) except (FileNotFoundError, json.JSONDecodeError): self.ips = set() def save(self) -> None: os.makedirs(os.path.dirname(self._file), exist_ok=True) with open(self._file, "w") as f: json.dump({"ips": sorted(self.ips)}, f) def apply_delta(self, new_ips: list[str], deleted_ips: list[str]) -> bool: """Apply changes and return True if the set changed.""" before = len(self.ips) self.ips.update(new_ips) self.ips -= set(deleted_ips) after = len(self.ips) changed = before != after or bool(new_ips) or bool(deleted_ips) if changed: self.save() return changed def full_sync(self, all_ips: list[str]) -> bool: """Full replacement. Returns True if changed.""" new_set = set(all_ips) if new_set == self.ips: return False self.ips = new_set self.save() return True def build_bloom(self) -> BloomFilter: bf = BloomFilter(len(self.ips)) for ip in self.ips: bf.add(ip) return bf # --------------------------------------------------------------------------- # BunnyCDN Edge Script updater # --------------------------------------------------------------------------- BLOOM_PATTERN = re.compile( r'(const BLOOM_B64\s*=\s*")([^"]*)(";)', re.DOTALL, ) VERSION_PATTERN = re.compile( r'(const BLOOM_VERSION\s*=\s*")([^"]*)(";)', ) def get_current_script() -> Optional[str]: """Get current Edge Script source code.""" url = f"https://api.bunny.net/compute/script/{BUNNY_SCRIPT_ID}/code" headers = {"AccessKey": BUNNY_API_KEY} try: resp = requests.get(url, headers=headers, timeout=30) resp.raise_for_status() return resp.json().get("Code", "") except requests.RequestException as e: log.error("Failed to get script: %s", e) return None def update_script(code: str) -> bool: """Update Edge Script source code and publish.""" url = f"https://api.bunny.net/compute/script/{BUNNY_SCRIPT_ID}/code" headers = { "AccessKey": BUNNY_API_KEY, "Content-Type": "application/json", } try: resp = requests.post(url, headers=headers, json={"Code": code}, timeout=60) resp.raise_for_status() except requests.RequestException as e: log.error("Failed to update script: %s", e) return False # Publish pub_url = f"https://api.bunny.net/compute/script/{BUNNY_SCRIPT_ID}/publish" try: resp = requests.post(pub_url, headers=headers, json={}, timeout=60) resp.raise_for_status() log.info("Script published successfully") return True except requests.RequestException as e: log.error("Failed to publish script: %s", e) return False def embed_bloom_in_script(code: str, bloom: BloomFilter) -> Optional[str]: """Replace BLOOM_B64 and BLOOM_VERSION in script code.""" b64 = bloom.to_base64() version = bloom.version_hash() new_code = BLOOM_PATTERN.sub(rf'\g<1>{b64}\g<3>', code) if new_code == code: log.error("Could not find BLOOM_B64 in script") return None new_code = VERSION_PATTERN.sub(rf'\g<1>{version}\g<3>', new_code) return new_code # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser(description="CrowdSec → BunnyCDN Bouncer") parser.add_argument("--startup", action="store_true", help="Full sync (startup mode)") parser.add_argument("--dry-run", action="store_true", help="Don't deploy changes") parser.add_argument("--verbose", "-v", action="store_true") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) if not CROWDSEC_BOUNCER_KEY: log.error("CROWDSEC_BOUNCER_KEY not set") return 1 if not BUNNY_API_KEY: log.error("BUNNY_API_KEY not set") return 1 state = IPState(STATE_FILE) log.info("Current blocked IPs: %d", len(state.ips)) if args.startup: log.info("Full sync mode (startup)") new_ips, _ = fetch_decisions(startup=True) # startup=true returns all active decisions in "new" changed = state.full_sync(new_ips) else: log.info("Delta sync mode") new_ips, deleted_ips = fetch_decisions(startup=False) if new_ips: log.info("New bans: %d IPs", len(new_ips)) if deleted_ips: log.info("Removed bans: %d IPs", len(deleted_ips)) changed = state.apply_delta(new_ips, deleted_ips) log.info("Total blocked IPs after sync: %d", len(state.ips)) if not changed: log.info("No changes — skipping deploy") return 0 bloom = state.build_bloom() b64 = bloom.to_base64() version = bloom.version_hash() log.info("Bloom filter: m=%d, k=%d, size=%d bytes, version=%s", bloom.m, bloom.k, len(b64), version) if args.dry_run: log.info("[DRY RUN] Would update script with %d IPs", len(state.ips)) log.info("[DRY RUN] Bloom b64 length: %d chars", len(b64)) return 0 code = get_current_script() if code is None: return 1 new_code = embed_bloom_in_script(code, bloom) if new_code is None: return 1 # Check if bloom actually changed if new_code == code: log.info("Bloom filter unchanged — skipping deploy") return 0 if update_script(new_code): log.info("Successfully deployed bloom filter with %d IPs", len(state.ips)) return 0 return 1 if __name__ == "__main__": sys.exit(main())