Switch to Edge Script + Bunny Database architecture for unlimited IP blocking

Replace Shield Access List (5,000 IP limit) with Bunny Database (libSQL) +
Edge Script middleware to support CAPI community blocklists (tens of thousands
of IPs). Bouncer now uses CrowdSec streaming API for incremental sync with
periodic full resync every 6 hours.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
kappa
2026-02-13 09:25:43 +09:00
parent d1b870227e
commit da199bce8c
5 changed files with 357 additions and 102 deletions

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""CrowdSec Bouncer for Bunny CDN — syncs ban decisions to Shield Access Lists."""
"""CrowdSec Bouncer for Bunny CDN — syncs ban decisions to Bunny Database via libSQL HTTP API."""
import logging
import os
@@ -18,15 +18,16 @@ from urllib3.util.retry import Retry
CROWDSEC_LAPI_URL = os.environ["CROWDSEC_LAPI_URL"].rstrip("/")
CROWDSEC_LAPI_KEY = os.environ["CROWDSEC_LAPI_KEY"]
BUNNY_API_KEY = os.environ["BUNNY_API_KEY"]
BUNNY_SHIELD_ZONE_ID = os.environ["BUNNY_SHIELD_ZONE_ID"]
BUNNY_ACCESS_LIST_ID = os.environ["BUNNY_ACCESS_LIST_ID"]
BUNNY_DB_URL = os.environ["BUNNY_DB_URL"].rstrip("/")
BUNNY_DB_TOKEN = os.environ["BUNNY_DB_TOKEN"]
SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "60"))
MAX_IPS = int(os.environ.get("MAX_IPS", "1000"))
INCLUDE_CAPI = os.environ.get("INCLUDE_CAPI", "true").lower() in ("true", "1", "yes")
FULL_RESYNC_INTERVAL = int(os.environ.get("FULL_RESYNC_INTERVAL", "21600")) # 6 hours
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
BUNNY_API_BASE = "https://api.bunny.net"
PIPELINE_URL = f"{BUNNY_DB_URL}/v2/pipeline"
BATCH_SIZE = 500
HEALTHCHECK_FILE = Path("/tmp/bouncer-healthy")
# ---------------------------------------------------------------------------
@@ -69,7 +70,7 @@ def _make_session(headers: dict, retries: int = 3) -> requests.Session:
total=retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PATCH"],
allowed_methods=["GET", "POST"],
)
)
s.mount("http://", adapter)
@@ -78,83 +79,166 @@ def _make_session(headers: dict, retries: int = 3) -> requests.Session:
crowdsec_session = _make_session({"X-Api-Key": CROWDSEC_LAPI_KEY})
bunny_session = _make_session(
{"AccessKey": BUNNY_API_KEY, "Content-Type": "application/json"}
db_session = _make_session(
{"Authorization": f"Bearer {BUNNY_DB_TOKEN}", "Content-Type": "application/json"}
)
# ---------------------------------------------------------------------------
# State
# ---------------------------------------------------------------------------
_last_pushed: set[str] = set()
# ---------------------------------------------------------------------------
# CrowdSec helpers
# Bunny Database helpers (libSQL HTTP API)
# ---------------------------------------------------------------------------
def fetch_banned_ips() -> list[str]:
"""Return locally-detected banned IPs from CrowdSec LAPI, capped at MAX_IPS."""
url = f"{CROWDSEC_LAPI_URL}/v1/decisions"
resp = crowdsec_session.get(url, params={"type": "ban"}, timeout=10)
def db_pipeline(statements: list[dict]) -> dict:
"""Send a batch of statements to Bunny Database via /v2/pipeline."""
reqs = [{"type": "execute", "stmt": s} for s in statements]
reqs.append({"type": "close"})
resp = db_session.post(PIPELINE_URL, json={"requests": reqs}, timeout=30)
resp.raise_for_status()
data = resp.json()
if data is None:
return []
for r in data.get("results", []):
if r.get("type") == "error":
raise RuntimeError(f"DB error: {r.get('error', {}).get('message', r)}")
return data
# Only keep locally-detected decisions (exclude CAPI community blocklists)
seen: set[str] = set()
unique: list[str] = []
for d in data:
if d.get("origin") not in ("crowdsec", "cscli"):
continue
ip = d.get("value", "")
if ip and ip not in seen:
seen.add(ip)
unique.append(ip)
if len(unique) > MAX_IPS:
log.warning(
"CrowdSec has %d locally-detected banned IPs, truncating to %d",
len(unique),
MAX_IPS,
)
unique = unique[:MAX_IPS]
def db_pipeline_batched(statements: list[dict]):
"""Send statements in batches of BATCH_SIZE."""
for i in range(0, len(statements), BATCH_SIZE):
batch = statements[i : i + BATCH_SIZE]
db_pipeline(batch)
return unique
def init_db():
"""Create the blocklist table if it doesn't exist."""
db_pipeline([
{
"sql": (
"CREATE TABLE IF NOT EXISTS blocklist ("
" ip TEXT PRIMARY KEY,"
" reason TEXT NOT NULL DEFAULT '',"
" origin TEXT NOT NULL DEFAULT '',"
" expires_at TEXT,"
" created_at TEXT NOT NULL DEFAULT (datetime('now'))"
")"
)
}
])
log.info("Database table 'blocklist' ready")
def db_count() -> int:
"""Return the number of rows in the blocklist table."""
data = db_pipeline([{"sql": "SELECT COUNT(*) FROM blocklist"}])
row = data["results"][0]["response"]["result"]["rows"][0]
return int(row[0]["value"])
# ---------------------------------------------------------------------------
# Bunny CDN Shield Access Lists helpers
# CrowdSec streaming helpers
# ---------------------------------------------------------------------------
def _access_list_url() -> str:
return f"{BUNNY_API_BASE}/shield/shield-zone/{BUNNY_SHIELD_ZONE_ID}/access-lists/{BUNNY_ACCESS_LIST_ID}"
def fetch_stream(startup: bool = False) -> dict:
"""Fetch decisions from CrowdSec LAPI streaming endpoint."""
url = f"{CROWDSEC_LAPI_URL}/v1/decisions/stream"
params: dict = {"startup": "true" if startup else "false"}
if not INCLUDE_CAPI:
params["origins"] = "crowdsec,cscli"
resp = crowdsec_session.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json()
def sync_access_list(banned_ips: list[str]):
"""Update the Shield Access List with the current set of banned IPs."""
global _last_pushed
# ---------------------------------------------------------------------------
# Decision → SQL statement converters
# ---------------------------------------------------------------------------
ip_set = set(banned_ips)
if ip_set == _last_pushed:
log.debug("No changes, skipping sync")
return
content = "\n".join(banned_ips) if banned_ips else ""
def _normalize_ip(value: str) -> str:
"""Strip /32 suffix from single-host CIDR notation."""
return value[:-3] if value.endswith("/32") else value
resp = bunny_session.patch(
_access_list_url(),
json={"content": content},
timeout=30,
)
if not resp.ok:
log.error("Bunny API %s: %s", resp.status_code, resp.text)
resp.raise_for_status()
_last_pushed = ip_set
log.info("Synced %d IPs to Shield Access List", len(banned_ips))
def _insert_stmt(d: dict) -> dict:
"""Convert a CrowdSec decision to an INSERT OR REPLACE statement."""
until = d.get("until")
return {
"sql": (
"INSERT OR REPLACE INTO blocklist (ip, reason, origin, expires_at) "
"VALUES (?, ?, ?, ?)"
),
"args": [
{"type": "text", "value": _normalize_ip(d.get("value", ""))},
{"type": "text", "value": d.get("scenario", "")},
{"type": "text", "value": d.get("origin", "")},
{"type": "text", "value": until} if until else {"type": "null"},
],
}
def _delete_stmt(d: dict) -> dict:
"""Convert a CrowdSec decision to a DELETE statement."""
return {
"sql": "DELETE FROM blocklist WHERE ip = ?",
"args": [{"type": "text", "value": _normalize_ip(d.get("value", ""))}],
}
def _is_ip_ban(d: dict) -> bool:
return d.get("scope", "").lower() == "ip" and d.get("type") == "ban"
# ---------------------------------------------------------------------------
# Sync logic
# ---------------------------------------------------------------------------
def full_sync() -> int:
"""Full resync: fetch all decisions and rebuild the database."""
log.info("Starting full resync (startup=true)")
data = fetch_stream(startup=True)
new_decisions = data.get("new") or []
stmts: list[dict] = [{"sql": "DELETE FROM blocklist"}]
for d in new_decisions:
if _is_ip_ban(d):
stmts.append(_insert_stmt(d))
db_pipeline_batched(stmts)
count = len(stmts) - 1 # exclude the DELETE statement
log.info("Full resync complete: %d IPs written to blocklist", count)
return count
def incremental_sync():
"""Incremental sync: fetch only new/deleted decisions since last poll."""
data = fetch_stream(startup=False)
new_decisions = data.get("new") or []
deleted_decisions = data.get("deleted") or []
stmts: list[dict] = []
for d in deleted_decisions:
if d.get("scope", "").lower() == "ip":
stmts.append(_delete_stmt(d))
for d in new_decisions:
if _is_ip_ban(d):
stmts.append(_insert_stmt(d))
if stmts:
db_pipeline_batched(stmts)
added = sum(1 for d in new_decisions if _is_ip_ban(d))
removed = sum(1 for d in deleted_decisions if d.get("scope", "").lower() == "ip")
if added or removed:
log.info("Incremental sync: +%d -%d IPs", added, removed)
else:
log.debug("Incremental sync: no changes")
# ---------------------------------------------------------------------------
@@ -163,41 +247,19 @@ def sync_access_list(banned_ips: list[str]):
def verify_connections():
"""Check connectivity to CrowdSec LAPI and Bunny Shield API at startup."""
"""Check connectivity to CrowdSec LAPI and Bunny Database at startup."""
log.info("Verifying CrowdSec LAPI at %s", CROWDSEC_LAPI_URL)
resp = crowdsec_session.get(
f"{CROWDSEC_LAPI_URL}/v1/decisions", params={"type": "ban"}, timeout=10
f"{CROWDSEC_LAPI_URL}/v1/decisions",
params={"type": "ban", "limit": "1"},
timeout=10,
)
resp.raise_for_status()
log.info("CrowdSec LAPI: OK")
log.info("Verifying Bunny Shield zone %s", BUNNY_SHIELD_ZONE_ID)
resp = bunny_session.get(
f"{BUNNY_API_BASE}/shield/shield-zone/{BUNNY_SHIELD_ZONE_ID}/access-lists",
timeout=15,
)
resp.raise_for_status()
data = resp.json()
found = False
for cl in data.get("customLists", []):
if str(cl.get("listId")) == BUNNY_ACCESS_LIST_ID:
found = True
log.info(
"Access List '%s': enabled=%s, action=%s, entries=%s/%s",
cl.get("name"),
cl.get("isEnabled"),
cl.get("action"),
data.get("customEntryCount"),
data.get("customEntryLimit"),
)
break
if not found:
log.error("Access List ID %s not found in Shield zone", BUNNY_ACCESS_LIST_ID)
sys.exit(1)
log.info("Bunny Shield: OK")
log.info("Verifying Bunny Database at %s", BUNNY_DB_URL)
init_db()
log.info("Bunny Database: OK")
# ---------------------------------------------------------------------------
@@ -207,21 +269,33 @@ def verify_connections():
def main():
log.info(
"Starting CrowdSec Bunny Bouncer — shield_zone=%s access_list=%s interval=%ds max_ips=%d",
BUNNY_SHIELD_ZONE_ID,
BUNNY_ACCESS_LIST_ID,
"Starting CrowdSec Bunny Bouncer — db=%s interval=%ds include_capi=%s full_resync=%ds",
BUNNY_DB_URL,
SYNC_INTERVAL,
MAX_IPS,
INCLUDE_CAPI,
FULL_RESYNC_INTERVAL,
)
verify_connections()
HEALTHCHECK_FILE.touch()
# Initial full sync
try:
count = full_sync()
log.info("Blocklist initialized with %d IPs", count)
except Exception:
log.exception("Initial full sync failed, will retry")
HEALTHCHECK_FILE.unlink(missing_ok=True)
last_full_sync = time.monotonic()
while _running:
try:
banned = fetch_banned_ips()
log.debug("Fetched %d banned IPs", len(banned))
sync_access_list(banned)
if time.monotonic() - last_full_sync >= FULL_RESYNC_INTERVAL:
full_sync()
last_full_sync = time.monotonic()
else:
incremental_sync()
except requests.RequestException:
log.exception("Sync cycle failed, will retry next interval")
HEALTHCHECK_FILE.unlink(missing_ok=True)