From da199bce8c00f57e6f6fd86eeb41ffded594840d Mon Sep 17 00:00:00 2001 From: kappa Date: Fri, 13 Feb 2026 09:25:43 +0900 Subject: [PATCH] Switch to Edge Script + Bunny Database architecture for unlimited IP blocking Replace Shield Access List (5,000 IP limit) with Bunny Database (libSQL) + Edge Script middleware to support CAPI community blocklists (tens of thousands of IPs). Bouncer now uses CrowdSec streaming API for incremental sync with periodic full resync every 6 hours. Co-Authored-By: Claude Opus 4.6 --- .env.example | 14 ++- .gitignore | 2 + bouncer.py | 268 +++++++++++++++++++++++++++---------------- edge-script/index.ts | 91 +++++++++++++++ setup.py | 84 ++++++++++++++ 5 files changed, 357 insertions(+), 102 deletions(-) create mode 100644 edge-script/index.ts create mode 100644 setup.py diff --git a/.env.example b/.env.example index 3dcbaa1..94ec782 100644 --- a/.env.example +++ b/.env.example @@ -2,12 +2,16 @@ CROWDSEC_LAPI_URL=http://crowdsec:8080 CROWDSEC_LAPI_KEY=your_bouncer_api_key_here -# Bunny CDN Shield API -BUNNY_API_KEY=your_bunny_api_key_here -BUNNY_SHIELD_ZONE_ID=12345 -BUNNY_ACCESS_LIST_ID=12345 +# Bunny Database (libSQL) +BUNNY_DB_URL=https://your-database-id.lite.bunnydb.net +BUNNY_DB_TOKEN=your_database_access_token_here # Optional settings SYNC_INTERVAL=60 -MAX_IPS=1000 +INCLUDE_CAPI=true +FULL_RESYNC_INTERVAL=21600 LOG_LEVEL=INFO + +# For setup.py only (Edge Script deployment) +BUNNY_API_KEY=your_bunny_api_key_here +BUNNY_SCRIPT_ID=your_edge_script_id_here diff --git a/.gitignore b/.gitignore index cff5543..a448b22 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .env __pycache__/ *.pyc +node_modules/ +edge-script/dist/ diff --git a/bouncer.py b/bouncer.py index 49cd257..b35eee9 100644 --- a/bouncer.py +++ b/bouncer.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""CrowdSec Bouncer for Bunny CDN — syncs ban decisions to Shield Access Lists.""" +"""CrowdSec Bouncer for Bunny CDN — syncs ban decisions to Bunny Database via libSQL HTTP API.""" import logging import os @@ -18,15 +18,16 @@ from urllib3.util.retry import Retry CROWDSEC_LAPI_URL = os.environ["CROWDSEC_LAPI_URL"].rstrip("/") CROWDSEC_LAPI_KEY = os.environ["CROWDSEC_LAPI_KEY"] -BUNNY_API_KEY = os.environ["BUNNY_API_KEY"] -BUNNY_SHIELD_ZONE_ID = os.environ["BUNNY_SHIELD_ZONE_ID"] -BUNNY_ACCESS_LIST_ID = os.environ["BUNNY_ACCESS_LIST_ID"] +BUNNY_DB_URL = os.environ["BUNNY_DB_URL"].rstrip("/") +BUNNY_DB_TOKEN = os.environ["BUNNY_DB_TOKEN"] SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "60")) -MAX_IPS = int(os.environ.get("MAX_IPS", "1000")) +INCLUDE_CAPI = os.environ.get("INCLUDE_CAPI", "true").lower() in ("true", "1", "yes") +FULL_RESYNC_INTERVAL = int(os.environ.get("FULL_RESYNC_INTERVAL", "21600")) # 6 hours LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() -BUNNY_API_BASE = "https://api.bunny.net" +PIPELINE_URL = f"{BUNNY_DB_URL}/v2/pipeline" +BATCH_SIZE = 500 HEALTHCHECK_FILE = Path("/tmp/bouncer-healthy") # --------------------------------------------------------------------------- @@ -69,7 +70,7 @@ def _make_session(headers: dict, retries: int = 3) -> requests.Session: total=retries, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], - allowed_methods=["GET", "POST", "PATCH"], + allowed_methods=["GET", "POST"], ) ) s.mount("http://", adapter) @@ -78,83 +79,166 @@ def _make_session(headers: dict, retries: int = 3) -> requests.Session: crowdsec_session = _make_session({"X-Api-Key": CROWDSEC_LAPI_KEY}) -bunny_session = _make_session( - {"AccessKey": BUNNY_API_KEY, "Content-Type": "application/json"} +db_session = _make_session( + {"Authorization": f"Bearer {BUNNY_DB_TOKEN}", "Content-Type": "application/json"} ) # --------------------------------------------------------------------------- -# State -# --------------------------------------------------------------------------- - -_last_pushed: set[str] = set() - -# --------------------------------------------------------------------------- -# CrowdSec helpers +# Bunny Database helpers (libSQL HTTP API) # --------------------------------------------------------------------------- -def fetch_banned_ips() -> list[str]: - """Return locally-detected banned IPs from CrowdSec LAPI, capped at MAX_IPS.""" - url = f"{CROWDSEC_LAPI_URL}/v1/decisions" - resp = crowdsec_session.get(url, params={"type": "ban"}, timeout=10) +def db_pipeline(statements: list[dict]) -> dict: + """Send a batch of statements to Bunny Database via /v2/pipeline.""" + reqs = [{"type": "execute", "stmt": s} for s in statements] + reqs.append({"type": "close"}) + resp = db_session.post(PIPELINE_URL, json={"requests": reqs}, timeout=30) resp.raise_for_status() data = resp.json() - if data is None: - return [] + for r in data.get("results", []): + if r.get("type") == "error": + raise RuntimeError(f"DB error: {r.get('error', {}).get('message', r)}") + return data - # Only keep locally-detected decisions (exclude CAPI community blocklists) - seen: set[str] = set() - unique: list[str] = [] - for d in data: - if d.get("origin") not in ("crowdsec", "cscli"): - continue - ip = d.get("value", "") - if ip and ip not in seen: - seen.add(ip) - unique.append(ip) - if len(unique) > MAX_IPS: - log.warning( - "CrowdSec has %d locally-detected banned IPs, truncating to %d", - len(unique), - MAX_IPS, - ) - unique = unique[:MAX_IPS] +def db_pipeline_batched(statements: list[dict]): + """Send statements in batches of BATCH_SIZE.""" + for i in range(0, len(statements), BATCH_SIZE): + batch = statements[i : i + BATCH_SIZE] + db_pipeline(batch) - return unique + +def init_db(): + """Create the blocklist table if it doesn't exist.""" + db_pipeline([ + { + "sql": ( + "CREATE TABLE IF NOT EXISTS blocklist (" + " ip TEXT PRIMARY KEY," + " reason TEXT NOT NULL DEFAULT ''," + " origin TEXT NOT NULL DEFAULT ''," + " expires_at TEXT," + " created_at TEXT NOT NULL DEFAULT (datetime('now'))" + ")" + ) + } + ]) + log.info("Database table 'blocklist' ready") + + +def db_count() -> int: + """Return the number of rows in the blocklist table.""" + data = db_pipeline([{"sql": "SELECT COUNT(*) FROM blocklist"}]) + row = data["results"][0]["response"]["result"]["rows"][0] + return int(row[0]["value"]) # --------------------------------------------------------------------------- -# Bunny CDN Shield Access Lists helpers +# CrowdSec streaming helpers # --------------------------------------------------------------------------- -def _access_list_url() -> str: - return f"{BUNNY_API_BASE}/shield/shield-zone/{BUNNY_SHIELD_ZONE_ID}/access-lists/{BUNNY_ACCESS_LIST_ID}" +def fetch_stream(startup: bool = False) -> dict: + """Fetch decisions from CrowdSec LAPI streaming endpoint.""" + url = f"{CROWDSEC_LAPI_URL}/v1/decisions/stream" + params: dict = {"startup": "true" if startup else "false"} + if not INCLUDE_CAPI: + params["origins"] = "crowdsec,cscli" + resp = crowdsec_session.get(url, params=params, timeout=30) + resp.raise_for_status() + return resp.json() -def sync_access_list(banned_ips: list[str]): - """Update the Shield Access List with the current set of banned IPs.""" - global _last_pushed +# --------------------------------------------------------------------------- +# Decision → SQL statement converters +# --------------------------------------------------------------------------- - ip_set = set(banned_ips) - if ip_set == _last_pushed: - log.debug("No changes, skipping sync") - return - content = "\n".join(banned_ips) if banned_ips else "" +def _normalize_ip(value: str) -> str: + """Strip /32 suffix from single-host CIDR notation.""" + return value[:-3] if value.endswith("/32") else value - resp = bunny_session.patch( - _access_list_url(), - json={"content": content}, - timeout=30, - ) - if not resp.ok: - log.error("Bunny API %s: %s", resp.status_code, resp.text) - resp.raise_for_status() - _last_pushed = ip_set - log.info("Synced %d IPs to Shield Access List", len(banned_ips)) +def _insert_stmt(d: dict) -> dict: + """Convert a CrowdSec decision to an INSERT OR REPLACE statement.""" + until = d.get("until") + return { + "sql": ( + "INSERT OR REPLACE INTO blocklist (ip, reason, origin, expires_at) " + "VALUES (?, ?, ?, ?)" + ), + "args": [ + {"type": "text", "value": _normalize_ip(d.get("value", ""))}, + {"type": "text", "value": d.get("scenario", "")}, + {"type": "text", "value": d.get("origin", "")}, + {"type": "text", "value": until} if until else {"type": "null"}, + ], + } + + +def _delete_stmt(d: dict) -> dict: + """Convert a CrowdSec decision to a DELETE statement.""" + return { + "sql": "DELETE FROM blocklist WHERE ip = ?", + "args": [{"type": "text", "value": _normalize_ip(d.get("value", ""))}], + } + + +def _is_ip_ban(d: dict) -> bool: + return d.get("scope", "").lower() == "ip" and d.get("type") == "ban" + + +# --------------------------------------------------------------------------- +# Sync logic +# --------------------------------------------------------------------------- + + +def full_sync() -> int: + """Full resync: fetch all decisions and rebuild the database.""" + log.info("Starting full resync (startup=true)") + data = fetch_stream(startup=True) + + new_decisions = data.get("new") or [] + + stmts: list[dict] = [{"sql": "DELETE FROM blocklist"}] + for d in new_decisions: + if _is_ip_ban(d): + stmts.append(_insert_stmt(d)) + + db_pipeline_batched(stmts) + + count = len(stmts) - 1 # exclude the DELETE statement + log.info("Full resync complete: %d IPs written to blocklist", count) + return count + + +def incremental_sync(): + """Incremental sync: fetch only new/deleted decisions since last poll.""" + data = fetch_stream(startup=False) + + new_decisions = data.get("new") or [] + deleted_decisions = data.get("deleted") or [] + + stmts: list[dict] = [] + + for d in deleted_decisions: + if d.get("scope", "").lower() == "ip": + stmts.append(_delete_stmt(d)) + + for d in new_decisions: + if _is_ip_ban(d): + stmts.append(_insert_stmt(d)) + + if stmts: + db_pipeline_batched(stmts) + + added = sum(1 for d in new_decisions if _is_ip_ban(d)) + removed = sum(1 for d in deleted_decisions if d.get("scope", "").lower() == "ip") + + if added or removed: + log.info("Incremental sync: +%d -%d IPs", added, removed) + else: + log.debug("Incremental sync: no changes") # --------------------------------------------------------------------------- @@ -163,41 +247,19 @@ def sync_access_list(banned_ips: list[str]): def verify_connections(): - """Check connectivity to CrowdSec LAPI and Bunny Shield API at startup.""" + """Check connectivity to CrowdSec LAPI and Bunny Database at startup.""" log.info("Verifying CrowdSec LAPI at %s …", CROWDSEC_LAPI_URL) resp = crowdsec_session.get( - f"{CROWDSEC_LAPI_URL}/v1/decisions", params={"type": "ban"}, timeout=10 + f"{CROWDSEC_LAPI_URL}/v1/decisions", + params={"type": "ban", "limit": "1"}, + timeout=10, ) resp.raise_for_status() log.info("CrowdSec LAPI: OK") - log.info("Verifying Bunny Shield zone %s …", BUNNY_SHIELD_ZONE_ID) - resp = bunny_session.get( - f"{BUNNY_API_BASE}/shield/shield-zone/{BUNNY_SHIELD_ZONE_ID}/access-lists", - timeout=15, - ) - resp.raise_for_status() - data = resp.json() - - found = False - for cl in data.get("customLists", []): - if str(cl.get("listId")) == BUNNY_ACCESS_LIST_ID: - found = True - log.info( - "Access List '%s': enabled=%s, action=%s, entries=%s/%s", - cl.get("name"), - cl.get("isEnabled"), - cl.get("action"), - data.get("customEntryCount"), - data.get("customEntryLimit"), - ) - break - - if not found: - log.error("Access List ID %s not found in Shield zone", BUNNY_ACCESS_LIST_ID) - sys.exit(1) - - log.info("Bunny Shield: OK") + log.info("Verifying Bunny Database at %s …", BUNNY_DB_URL) + init_db() + log.info("Bunny Database: OK") # --------------------------------------------------------------------------- @@ -207,21 +269,33 @@ def verify_connections(): def main(): log.info( - "Starting CrowdSec Bunny Bouncer — shield_zone=%s access_list=%s interval=%ds max_ips=%d", - BUNNY_SHIELD_ZONE_ID, - BUNNY_ACCESS_LIST_ID, + "Starting CrowdSec Bunny Bouncer — db=%s interval=%ds include_capi=%s full_resync=%ds", + BUNNY_DB_URL, SYNC_INTERVAL, - MAX_IPS, + INCLUDE_CAPI, + FULL_RESYNC_INTERVAL, ) verify_connections() HEALTHCHECK_FILE.touch() + # Initial full sync + try: + count = full_sync() + log.info("Blocklist initialized with %d IPs", count) + except Exception: + log.exception("Initial full sync failed, will retry") + HEALTHCHECK_FILE.unlink(missing_ok=True) + + last_full_sync = time.monotonic() + while _running: try: - banned = fetch_banned_ips() - log.debug("Fetched %d banned IPs", len(banned)) - sync_access_list(banned) + if time.monotonic() - last_full_sync >= FULL_RESYNC_INTERVAL: + full_sync() + last_full_sync = time.monotonic() + else: + incremental_sync() except requests.RequestException: log.exception("Sync cycle failed, will retry next interval") HEALTHCHECK_FILE.unlink(missing_ok=True) diff --git a/edge-script/index.ts b/edge-script/index.ts new file mode 100644 index 0000000..c0ac6f9 --- /dev/null +++ b/edge-script/index.ts @@ -0,0 +1,91 @@ +import { createClient } from "@libsql/client/web"; +import * as BunnySDK from "@bunny.net/edgescript-sdk"; +import process from "node:process"; + +// --------------------------------------------------------------------------- +// Database client +// --------------------------------------------------------------------------- + +const db = createClient({ + url: process.env.DB_URL!, + authToken: process.env.DB_TOKEN!, +}); + +// --------------------------------------------------------------------------- +// In-memory cache (per edge node, 60s TTL) +// --------------------------------------------------------------------------- + +interface CacheEntry { + blocked: boolean; + expiresAt: number; +} + +const cache = new Map(); +const CACHE_TTL_MS = 60_000; +const CACHE_MAX_SIZE = 50_000; + +function cacheGet(ip: string): boolean | null { + const entry = cache.get(ip); + if (!entry) return null; + if (entry.expiresAt <= Date.now()) { + cache.delete(ip); + return null; + } + return entry.blocked; +} + +function cacheSet(ip: string, blocked: boolean): void { + if (cache.size >= CACHE_MAX_SIZE) { + // Evict expired entries first + const now = Date.now(); + for (const [key, val] of cache) { + if (val.expiresAt <= now) cache.delete(key); + } + // If still too large, clear entirely + if (cache.size >= CACHE_MAX_SIZE) cache.clear(); + } + cache.set(ip, { blocked, expiresAt: Date.now() + CACHE_TTL_MS }); +} + +// --------------------------------------------------------------------------- +// Blocklist lookup +// --------------------------------------------------------------------------- + +async function isBlocked(ip: string): Promise { + const cached = cacheGet(ip); + if (cached !== null) return cached; + + try { + const result = await db.execute({ + sql: "SELECT 1 FROM blocklist WHERE ip = ? AND (expires_at IS NULL OR expires_at > datetime('now'))", + args: [ip], + }); + const blocked = result.rows.length > 0; + cacheSet(ip, blocked); + return blocked; + } catch (err) { + // Fail-open: on DB error, allow the request through + console.error("Blocklist lookup failed, allowing request:", err); + return false; + } +} + +// --------------------------------------------------------------------------- +// Middleware +// --------------------------------------------------------------------------- + +BunnySDK.net.http + .servePullZone() + .onOriginRequest(async (ctx) => { + const ip = ctx.request.headers.get("X-Real-Ip"); + + if (!ip) { + return ctx.request; + } + + if (await isBlocked(ip)) { + return new Response("Forbidden", { status: 403 }); + } + + return ctx.request; + }); diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..7823a0a --- /dev/null +++ b/setup.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +"""One-time setup: deploy the Edge Script to Bunny CDN and publish a release.""" + +import os +import sys +from pathlib import Path + +import requests + +BUNNY_API_BASE = "https://api.bunny.net" +BUNNY_API_KEY = os.environ.get("BUNNY_API_KEY", "") +BUNNY_SCRIPT_ID = os.environ.get("BUNNY_SCRIPT_ID", "") + +EDGE_SCRIPT_PATH = Path(__file__).parent / "edge-script" / "index.ts" + + +def api(method: str, path: str, **kwargs) -> requests.Response: + url = f"{BUNNY_API_BASE}{path}" + headers = kwargs.pop("headers", {}) + headers["AccessKey"] = BUNNY_API_KEY + resp = requests.request(method, url, headers=headers, timeout=30, **kwargs) + if not resp.ok: + print(f"ERROR {resp.status_code}: {resp.text}", file=sys.stderr) + resp.raise_for_status() + return resp + + +def upload_code(script_id: str, code: str): + """Upload edge script code via POST /compute/script/{id}/code.""" + print(f"Uploading code to script {script_id} …") + api("POST", f"/compute/script/{script_id}/code", json={"Code": code}) + print("Code uploaded.") + + +def publish(script_id: str): + """Publish the current code as a release via POST /compute/script/{id}/publish.""" + print(f"Publishing script {script_id} …") + api("POST", f"/compute/script/{script_id}/publish", json={"Note": "CrowdSec bouncer middleware"}) + print("Published.") + + +def add_variable(script_id: str, name: str, value: str): + """Add an environment variable via POST /compute/script/{id}/variables.""" + print(f"Setting variable {name} …") + api( + "POST", + f"/compute/script/{script_id}/variables", + json={"Name": name, "DefaultValue": value, "Required": True}, + ) + print(f"Variable {name} set.") + + +def main(): + if not BUNNY_API_KEY: + print("ERROR: BUNNY_API_KEY is required", file=sys.stderr) + sys.exit(1) + if not BUNNY_SCRIPT_ID: + print("ERROR: BUNNY_SCRIPT_ID is required", file=sys.stderr) + sys.exit(1) + if not EDGE_SCRIPT_PATH.exists(): + print(f"ERROR: {EDGE_SCRIPT_PATH} not found", file=sys.stderr) + sys.exit(1) + + code = EDGE_SCRIPT_PATH.read_text() + + print("=== CrowdSec Bunny Bouncer — Edge Script Deployment ===") + print(f"Script ID: {BUNNY_SCRIPT_ID}") + print(f"Code size: {len(code)} bytes") + print() + + # Step 1: Upload code + upload_code(BUNNY_SCRIPT_ID, code) + + # Step 2: Publish release + publish(BUNNY_SCRIPT_ID) + + print() + print("Done! Make sure you have:") + print(" 1. Connected your Bunny Database to this Edge Script (DB_URL, DB_TOKEN secrets)") + print(" 2. Attached the Edge Script to your Pull Zone") + + +if __name__ == "__main__": + main()