Files
crowdsec-bunny-bouncer/bouncer.py
kappa da199bce8c Switch to Edge Script + Bunny Database architecture for unlimited IP blocking
Replace Shield Access List (5,000 IP limit) with Bunny Database (libSQL) +
Edge Script middleware to support CAPI community blocklists (tens of thousands
of IPs). Bouncer now uses CrowdSec streaming API for incremental sync with
periodic full resync every 6 hours.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 09:25:43 +09:00

326 lines
10 KiB
Python

#!/usr/bin/env python3
"""CrowdSec Bouncer for Bunny CDN — syncs ban decisions to Bunny Database via libSQL HTTP API."""
import logging
import os
import signal
import sys
import time
from pathlib import Path
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
CROWDSEC_LAPI_URL = os.environ["CROWDSEC_LAPI_URL"].rstrip("/")
CROWDSEC_LAPI_KEY = os.environ["CROWDSEC_LAPI_KEY"]
BUNNY_DB_URL = os.environ["BUNNY_DB_URL"].rstrip("/")
BUNNY_DB_TOKEN = os.environ["BUNNY_DB_TOKEN"]
SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "60"))
INCLUDE_CAPI = os.environ.get("INCLUDE_CAPI", "true").lower() in ("true", "1", "yes")
FULL_RESYNC_INTERVAL = int(os.environ.get("FULL_RESYNC_INTERVAL", "21600")) # 6 hours
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
PIPELINE_URL = f"{BUNNY_DB_URL}/v2/pipeline"
BATCH_SIZE = 500
HEALTHCHECK_FILE = Path("/tmp/bouncer-healthy")
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("bouncer")
# ---------------------------------------------------------------------------
# Graceful shutdown
# ---------------------------------------------------------------------------
_running = True
def _shutdown(signum, _frame):
global _running
log.info("Received signal %s, shutting down…", signal.Signals(signum).name)
_running = False
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
# ---------------------------------------------------------------------------
# HTTP sessions with retry
# ---------------------------------------------------------------------------
def _make_session(headers: dict, retries: int = 3) -> requests.Session:
s = requests.Session()
s.headers.update(headers)
adapter = HTTPAdapter(
max_retries=Retry(
total=retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
)
)
s.mount("http://", adapter)
s.mount("https://", adapter)
return s
crowdsec_session = _make_session({"X-Api-Key": CROWDSEC_LAPI_KEY})
db_session = _make_session(
{"Authorization": f"Bearer {BUNNY_DB_TOKEN}", "Content-Type": "application/json"}
)
# ---------------------------------------------------------------------------
# Bunny Database helpers (libSQL HTTP API)
# ---------------------------------------------------------------------------
def db_pipeline(statements: list[dict]) -> dict:
"""Send a batch of statements to Bunny Database via /v2/pipeline."""
reqs = [{"type": "execute", "stmt": s} for s in statements]
reqs.append({"type": "close"})
resp = db_session.post(PIPELINE_URL, json={"requests": reqs}, timeout=30)
resp.raise_for_status()
data = resp.json()
for r in data.get("results", []):
if r.get("type") == "error":
raise RuntimeError(f"DB error: {r.get('error', {}).get('message', r)}")
return data
def db_pipeline_batched(statements: list[dict]):
"""Send statements in batches of BATCH_SIZE."""
for i in range(0, len(statements), BATCH_SIZE):
batch = statements[i : i + BATCH_SIZE]
db_pipeline(batch)
def init_db():
"""Create the blocklist table if it doesn't exist."""
db_pipeline([
{
"sql": (
"CREATE TABLE IF NOT EXISTS blocklist ("
" ip TEXT PRIMARY KEY,"
" reason TEXT NOT NULL DEFAULT '',"
" origin TEXT NOT NULL DEFAULT '',"
" expires_at TEXT,"
" created_at TEXT NOT NULL DEFAULT (datetime('now'))"
")"
)
}
])
log.info("Database table 'blocklist' ready")
def db_count() -> int:
"""Return the number of rows in the blocklist table."""
data = db_pipeline([{"sql": "SELECT COUNT(*) FROM blocklist"}])
row = data["results"][0]["response"]["result"]["rows"][0]
return int(row[0]["value"])
# ---------------------------------------------------------------------------
# CrowdSec streaming helpers
# ---------------------------------------------------------------------------
def fetch_stream(startup: bool = False) -> dict:
"""Fetch decisions from CrowdSec LAPI streaming endpoint."""
url = f"{CROWDSEC_LAPI_URL}/v1/decisions/stream"
params: dict = {"startup": "true" if startup else "false"}
if not INCLUDE_CAPI:
params["origins"] = "crowdsec,cscli"
resp = crowdsec_session.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# Decision → SQL statement converters
# ---------------------------------------------------------------------------
def _normalize_ip(value: str) -> str:
"""Strip /32 suffix from single-host CIDR notation."""
return value[:-3] if value.endswith("/32") else value
def _insert_stmt(d: dict) -> dict:
"""Convert a CrowdSec decision to an INSERT OR REPLACE statement."""
until = d.get("until")
return {
"sql": (
"INSERT OR REPLACE INTO blocklist (ip, reason, origin, expires_at) "
"VALUES (?, ?, ?, ?)"
),
"args": [
{"type": "text", "value": _normalize_ip(d.get("value", ""))},
{"type": "text", "value": d.get("scenario", "")},
{"type": "text", "value": d.get("origin", "")},
{"type": "text", "value": until} if until else {"type": "null"},
],
}
def _delete_stmt(d: dict) -> dict:
"""Convert a CrowdSec decision to a DELETE statement."""
return {
"sql": "DELETE FROM blocklist WHERE ip = ?",
"args": [{"type": "text", "value": _normalize_ip(d.get("value", ""))}],
}
def _is_ip_ban(d: dict) -> bool:
return d.get("scope", "").lower() == "ip" and d.get("type") == "ban"
# ---------------------------------------------------------------------------
# Sync logic
# ---------------------------------------------------------------------------
def full_sync() -> int:
"""Full resync: fetch all decisions and rebuild the database."""
log.info("Starting full resync (startup=true)")
data = fetch_stream(startup=True)
new_decisions = data.get("new") or []
stmts: list[dict] = [{"sql": "DELETE FROM blocklist"}]
for d in new_decisions:
if _is_ip_ban(d):
stmts.append(_insert_stmt(d))
db_pipeline_batched(stmts)
count = len(stmts) - 1 # exclude the DELETE statement
log.info("Full resync complete: %d IPs written to blocklist", count)
return count
def incremental_sync():
"""Incremental sync: fetch only new/deleted decisions since last poll."""
data = fetch_stream(startup=False)
new_decisions = data.get("new") or []
deleted_decisions = data.get("deleted") or []
stmts: list[dict] = []
for d in deleted_decisions:
if d.get("scope", "").lower() == "ip":
stmts.append(_delete_stmt(d))
for d in new_decisions:
if _is_ip_ban(d):
stmts.append(_insert_stmt(d))
if stmts:
db_pipeline_batched(stmts)
added = sum(1 for d in new_decisions if _is_ip_ban(d))
removed = sum(1 for d in deleted_decisions if d.get("scope", "").lower() == "ip")
if added or removed:
log.info("Incremental sync: +%d -%d IPs", added, removed)
else:
log.debug("Incremental sync: no changes")
# ---------------------------------------------------------------------------
# Startup validation
# ---------------------------------------------------------------------------
def verify_connections():
"""Check connectivity to CrowdSec LAPI and Bunny Database at startup."""
log.info("Verifying CrowdSec LAPI at %s", CROWDSEC_LAPI_URL)
resp = crowdsec_session.get(
f"{CROWDSEC_LAPI_URL}/v1/decisions",
params={"type": "ban", "limit": "1"},
timeout=10,
)
resp.raise_for_status()
log.info("CrowdSec LAPI: OK")
log.info("Verifying Bunny Database at %s", BUNNY_DB_URL)
init_db()
log.info("Bunny Database: OK")
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
def main():
log.info(
"Starting CrowdSec Bunny Bouncer — db=%s interval=%ds include_capi=%s full_resync=%ds",
BUNNY_DB_URL,
SYNC_INTERVAL,
INCLUDE_CAPI,
FULL_RESYNC_INTERVAL,
)
verify_connections()
HEALTHCHECK_FILE.touch()
# Initial full sync
try:
count = full_sync()
log.info("Blocklist initialized with %d IPs", count)
except Exception:
log.exception("Initial full sync failed, will retry")
HEALTHCHECK_FILE.unlink(missing_ok=True)
last_full_sync = time.monotonic()
while _running:
try:
if time.monotonic() - last_full_sync >= FULL_RESYNC_INTERVAL:
full_sync()
last_full_sync = time.monotonic()
else:
incremental_sync()
except requests.RequestException:
log.exception("Sync cycle failed, will retry next interval")
HEALTHCHECK_FILE.unlink(missing_ok=True)
except Exception:
log.exception("Unexpected error in sync cycle")
HEALTHCHECK_FILE.unlink(missing_ok=True)
else:
HEALTHCHECK_FILE.touch()
for _ in range(SYNC_INTERVAL):
if not _running:
break
time.sleep(1)
HEALTHCHECK_FILE.unlink(missing_ok=True)
log.info("Bouncer stopped")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
log.info("Interrupted")
sys.exit(0)
except Exception:
log.exception("Fatal error")
sys.exit(1)