#!/usr/bin/env python3 """CrowdSec Bouncer for Bunny CDN — syncs ban decisions to Shield Access Lists.""" import logging import os import signal import sys import time from pathlib import Path import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- CROWDSEC_LAPI_URL = os.environ["CROWDSEC_LAPI_URL"].rstrip("/") CROWDSEC_LAPI_KEY = os.environ["CROWDSEC_LAPI_KEY"] BUNNY_API_KEY = os.environ["BUNNY_API_KEY"] BUNNY_SHIELD_ZONE_ID = os.environ["BUNNY_SHIELD_ZONE_ID"] BUNNY_ACCESS_LIST_ID = os.environ["BUNNY_ACCESS_LIST_ID"] SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "60")) MAX_IPS = int(os.environ.get("MAX_IPS", "1000")) LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() BUNNY_API_BASE = "https://api.bunny.net" HEALTHCHECK_FILE = Path("/tmp/bouncer-healthy") # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("bouncer") # --------------------------------------------------------------------------- # Graceful shutdown # --------------------------------------------------------------------------- _running = True def _shutdown(signum, _frame): global _running log.info("Received signal %s, shutting down…", signal.Signals(signum).name) _running = False signal.signal(signal.SIGTERM, _shutdown) signal.signal(signal.SIGINT, _shutdown) # --------------------------------------------------------------------------- # HTTP sessions with retry # --------------------------------------------------------------------------- def _make_session(headers: dict, retries: int = 3) -> requests.Session: s = requests.Session() s.headers.update(headers) adapter = HTTPAdapter( max_retries=Retry( total=retries, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST", "PATCH"], ) ) s.mount("http://", adapter) s.mount("https://", adapter) return s crowdsec_session = _make_session({"X-Api-Key": CROWDSEC_LAPI_KEY}) bunny_session = _make_session( {"AccessKey": BUNNY_API_KEY, "Content-Type": "application/json"} ) # --------------------------------------------------------------------------- # State # --------------------------------------------------------------------------- _last_pushed: set[str] = set() # --------------------------------------------------------------------------- # CrowdSec helpers # --------------------------------------------------------------------------- def fetch_banned_ips() -> list[str]: """Return locally-detected banned IPs from CrowdSec LAPI, capped at MAX_IPS.""" url = f"{CROWDSEC_LAPI_URL}/v1/decisions" resp = crowdsec_session.get(url, params={"type": "ban"}, timeout=10) resp.raise_for_status() data = resp.json() if data is None: return [] # Only keep locally-detected decisions (exclude CAPI community blocklists) seen: set[str] = set() unique: list[str] = [] for d in data: if d.get("origin") not in ("crowdsec", "cscli"): continue ip = d.get("value", "") if ip and ip not in seen: seen.add(ip) unique.append(ip) if len(unique) > MAX_IPS: log.warning( "CrowdSec has %d locally-detected banned IPs, truncating to %d", len(unique), MAX_IPS, ) unique = unique[:MAX_IPS] return unique # --------------------------------------------------------------------------- # Bunny CDN Shield Access Lists helpers # --------------------------------------------------------------------------- def _access_list_url() -> str: return f"{BUNNY_API_BASE}/shield/shield-zone/{BUNNY_SHIELD_ZONE_ID}/access-lists/{BUNNY_ACCESS_LIST_ID}" def sync_access_list(banned_ips: list[str]): """Update the Shield Access List with the current set of banned IPs.""" global _last_pushed ip_set = set(banned_ips) if ip_set == _last_pushed: log.debug("No changes, skipping sync") return content = "\n".join(banned_ips) if banned_ips else "" resp = bunny_session.patch( _access_list_url(), json={"content": content}, timeout=30, ) if not resp.ok: log.error("Bunny API %s: %s", resp.status_code, resp.text) resp.raise_for_status() _last_pushed = ip_set log.info("Synced %d IPs to Shield Access List", len(banned_ips)) # --------------------------------------------------------------------------- # Startup validation # --------------------------------------------------------------------------- def verify_connections(): """Check connectivity to CrowdSec LAPI and Bunny Shield API at startup.""" log.info("Verifying CrowdSec LAPI at %s …", CROWDSEC_LAPI_URL) resp = crowdsec_session.get( f"{CROWDSEC_LAPI_URL}/v1/decisions", params={"type": "ban"}, timeout=10 ) resp.raise_for_status() log.info("CrowdSec LAPI: OK") log.info("Verifying Bunny Shield zone %s …", BUNNY_SHIELD_ZONE_ID) resp = bunny_session.get( f"{BUNNY_API_BASE}/shield/shield-zone/{BUNNY_SHIELD_ZONE_ID}/access-lists", timeout=15, ) resp.raise_for_status() data = resp.json() found = False for cl in data.get("customLists", []): if str(cl.get("listId")) == BUNNY_ACCESS_LIST_ID: found = True log.info( "Access List '%s': enabled=%s, action=%s, entries=%s/%s", cl.get("name"), cl.get("isEnabled"), cl.get("action"), data.get("customEntryCount"), data.get("customEntryLimit"), ) break if not found: log.error("Access List ID %s not found in Shield zone", BUNNY_ACCESS_LIST_ID) sys.exit(1) log.info("Bunny Shield: OK") # --------------------------------------------------------------------------- # Main loop # --------------------------------------------------------------------------- def main(): log.info( "Starting CrowdSec Bunny Bouncer — shield_zone=%s access_list=%s interval=%ds max_ips=%d", BUNNY_SHIELD_ZONE_ID, BUNNY_ACCESS_LIST_ID, SYNC_INTERVAL, MAX_IPS, ) verify_connections() HEALTHCHECK_FILE.touch() while _running: try: banned = fetch_banned_ips() log.debug("Fetched %d banned IPs", len(banned)) sync_access_list(banned) except requests.RequestException: log.exception("Sync cycle failed, will retry next interval") HEALTHCHECK_FILE.unlink(missing_ok=True) except Exception: log.exception("Unexpected error in sync cycle") HEALTHCHECK_FILE.unlink(missing_ok=True) else: HEALTHCHECK_FILE.touch() for _ in range(SYNC_INTERVAL): if not _running: break time.sleep(1) HEALTHCHECK_FILE.unlink(missing_ok=True) log.info("Bouncer stopped") if __name__ == "__main__": try: main() except KeyboardInterrupt: log.info("Interrupted") sys.exit(0) except Exception: log.exception("Fatal error") sys.exit(1)