#!/usr/bin/env python3 """CrowdSec Bouncer for Bunny CDN — syncs ban decisions to Bunny Database via libSQL HTTP API.""" import logging import os import signal import sys import time from pathlib import Path import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- CROWDSEC_LAPI_URL = os.environ["CROWDSEC_LAPI_URL"].rstrip("/") CROWDSEC_LAPI_KEY = os.environ["CROWDSEC_LAPI_KEY"] BUNNY_DB_URL = os.environ["BUNNY_DB_URL"].rstrip("/") BUNNY_DB_TOKEN = os.environ["BUNNY_DB_TOKEN"] SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "60")) INCLUDE_CAPI = os.environ.get("INCLUDE_CAPI", "true").lower() in ("true", "1", "yes") FULL_RESYNC_INTERVAL = int(os.environ.get("FULL_RESYNC_INTERVAL", "21600")) # 6 hours LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() PIPELINE_URL = f"{BUNNY_DB_URL}/v2/pipeline" BATCH_SIZE = 500 HEALTHCHECK_FILE = Path("/tmp/bouncer-healthy") # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("bouncer") # --------------------------------------------------------------------------- # Graceful shutdown # --------------------------------------------------------------------------- _running = True def _shutdown(signum, _frame): global _running log.info("Received signal %s, shutting down…", signal.Signals(signum).name) _running = False signal.signal(signal.SIGTERM, _shutdown) signal.signal(signal.SIGINT, _shutdown) # --------------------------------------------------------------------------- # HTTP sessions with retry # --------------------------------------------------------------------------- def _make_session(headers: dict, retries: int = 3) -> requests.Session: s = requests.Session() s.headers.update(headers) adapter = HTTPAdapter( max_retries=Retry( total=retries, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST"], ) ) s.mount("http://", adapter) s.mount("https://", adapter) return s crowdsec_session = _make_session({"X-Api-Key": CROWDSEC_LAPI_KEY}) db_session = _make_session( {"Authorization": f"Bearer {BUNNY_DB_TOKEN}", "Content-Type": "application/json"} ) # --------------------------------------------------------------------------- # Bunny Database helpers (libSQL HTTP API) # --------------------------------------------------------------------------- def db_pipeline(statements: list[dict]) -> dict: """Send a batch of statements to Bunny Database via /v2/pipeline.""" reqs = [{"type": "execute", "stmt": s} for s in statements] reqs.append({"type": "close"}) resp = db_session.post(PIPELINE_URL, json={"requests": reqs}, timeout=30) resp.raise_for_status() data = resp.json() for r in data.get("results", []): if r.get("type") == "error": raise RuntimeError(f"DB error: {r.get('error', {}).get('message', r)}") return data def db_pipeline_batched(statements: list[dict]): """Send statements in batches of BATCH_SIZE.""" for i in range(0, len(statements), BATCH_SIZE): batch = statements[i : i + BATCH_SIZE] db_pipeline(batch) def init_db(): """Create the blocklist table if it doesn't exist.""" db_pipeline([ { "sql": ( "CREATE TABLE IF NOT EXISTS blocklist (" " ip TEXT PRIMARY KEY," " reason TEXT NOT NULL DEFAULT ''," " origin TEXT NOT NULL DEFAULT ''," " expires_at TEXT," " created_at TEXT NOT NULL DEFAULT (datetime('now'))" ")" ) } ]) log.info("Database table 'blocklist' ready") def db_count() -> int: """Return the number of rows in the blocklist table.""" data = db_pipeline([{"sql": "SELECT COUNT(*) FROM blocklist"}]) row = data["results"][0]["response"]["result"]["rows"][0] return int(row[0]["value"]) # --------------------------------------------------------------------------- # CrowdSec streaming helpers # --------------------------------------------------------------------------- def fetch_stream(startup: bool = False) -> dict: """Fetch decisions from CrowdSec LAPI streaming endpoint.""" url = f"{CROWDSEC_LAPI_URL}/v1/decisions/stream" params: dict = {"startup": "true" if startup else "false"} if not INCLUDE_CAPI: params["origins"] = "crowdsec,cscli" resp = crowdsec_session.get(url, params=params, timeout=30) resp.raise_for_status() return resp.json() # --------------------------------------------------------------------------- # Decision → SQL statement converters # --------------------------------------------------------------------------- def _normalize_ip(value: str) -> str: """Strip /32 suffix from single-host CIDR notation.""" return value[:-3] if value.endswith("/32") else value def _insert_stmt(d: dict) -> dict: """Convert a CrowdSec decision to an INSERT OR REPLACE statement.""" until = d.get("until") return { "sql": ( "INSERT OR REPLACE INTO blocklist (ip, reason, origin, expires_at) " "VALUES (?, ?, ?, ?)" ), "args": [ {"type": "text", "value": _normalize_ip(d.get("value", ""))}, {"type": "text", "value": d.get("scenario", "")}, {"type": "text", "value": d.get("origin", "")}, {"type": "text", "value": until} if until else {"type": "null"}, ], } def _delete_stmt(d: dict) -> dict: """Convert a CrowdSec decision to a DELETE statement.""" return { "sql": "DELETE FROM blocklist WHERE ip = ?", "args": [{"type": "text", "value": _normalize_ip(d.get("value", ""))}], } def _is_ip_ban(d: dict) -> bool: return d.get("scope", "").lower() == "ip" and d.get("type") == "ban" # --------------------------------------------------------------------------- # Sync logic # --------------------------------------------------------------------------- def full_sync() -> int: """Full resync: fetch all decisions and rebuild the database.""" log.info("Starting full resync (startup=true)") data = fetch_stream(startup=True) new_decisions = data.get("new") or [] stmts: list[dict] = [{"sql": "DELETE FROM blocklist"}] for d in new_decisions: if _is_ip_ban(d): stmts.append(_insert_stmt(d)) db_pipeline_batched(stmts) count = len(stmts) - 1 # exclude the DELETE statement log.info("Full resync complete: %d IPs written to blocklist", count) return count def incremental_sync(): """Incremental sync: fetch only new/deleted decisions since last poll.""" data = fetch_stream(startup=False) new_decisions = data.get("new") or [] deleted_decisions = data.get("deleted") or [] stmts: list[dict] = [] for d in deleted_decisions: if d.get("scope", "").lower() == "ip": stmts.append(_delete_stmt(d)) for d in new_decisions: if _is_ip_ban(d): stmts.append(_insert_stmt(d)) if stmts: db_pipeline_batched(stmts) added = sum(1 for d in new_decisions if _is_ip_ban(d)) removed = sum(1 for d in deleted_decisions if d.get("scope", "").lower() == "ip") if added or removed: log.info("Incremental sync: +%d -%d IPs", added, removed) else: log.debug("Incremental sync: no changes") # --------------------------------------------------------------------------- # Startup validation # --------------------------------------------------------------------------- def verify_connections(): """Check connectivity to CrowdSec LAPI and Bunny Database at startup.""" log.info("Verifying CrowdSec LAPI at %s …", CROWDSEC_LAPI_URL) resp = crowdsec_session.get( f"{CROWDSEC_LAPI_URL}/v1/decisions", params={"type": "ban", "limit": "1"}, timeout=10, ) resp.raise_for_status() log.info("CrowdSec LAPI: OK") log.info("Verifying Bunny Database at %s …", BUNNY_DB_URL) init_db() log.info("Bunny Database: OK") # --------------------------------------------------------------------------- # Main loop # --------------------------------------------------------------------------- def main(): log.info( "Starting CrowdSec Bunny Bouncer — db=%s interval=%ds include_capi=%s full_resync=%ds", BUNNY_DB_URL, SYNC_INTERVAL, INCLUDE_CAPI, FULL_RESYNC_INTERVAL, ) verify_connections() HEALTHCHECK_FILE.touch() # Initial full sync try: count = full_sync() log.info("Blocklist initialized with %d IPs", count) except Exception: log.exception("Initial full sync failed, will retry") HEALTHCHECK_FILE.unlink(missing_ok=True) last_full_sync = time.monotonic() while _running: try: if time.monotonic() - last_full_sync >= FULL_RESYNC_INTERVAL: full_sync() last_full_sync = time.monotonic() else: incremental_sync() except requests.RequestException: log.exception("Sync cycle failed, will retry next interval") HEALTHCHECK_FILE.unlink(missing_ok=True) except Exception: log.exception("Unexpected error in sync cycle") HEALTHCHECK_FILE.unlink(missing_ok=True) else: HEALTHCHECK_FILE.touch() for _ in range(SYNC_INTERVAL): if not _running: break time.sleep(1) HEALTHCHECK_FILE.unlink(missing_ok=True) log.info("Bouncer stopped") if __name__ == "__main__": try: main() except KeyboardInterrupt: log.info("Interrupted") sys.exit(0) except Exception: log.exception("Fatal error") sys.exit(1)