Replace Shield Access List (5,000 IP limit) with Bunny Database (libSQL) + Edge Script middleware to support CAPI community blocklists (tens of thousands of IPs). Bouncer now uses CrowdSec streaming API for incremental sync with periodic full resync every 6 hours. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
326 lines
10 KiB
Python
326 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""CrowdSec Bouncer for Bunny CDN — syncs ban decisions to Bunny Database via libSQL HTTP API."""
|
|
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
CROWDSEC_LAPI_URL = os.environ["CROWDSEC_LAPI_URL"].rstrip("/")
|
|
CROWDSEC_LAPI_KEY = os.environ["CROWDSEC_LAPI_KEY"]
|
|
BUNNY_DB_URL = os.environ["BUNNY_DB_URL"].rstrip("/")
|
|
BUNNY_DB_TOKEN = os.environ["BUNNY_DB_TOKEN"]
|
|
|
|
SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "60"))
|
|
INCLUDE_CAPI = os.environ.get("INCLUDE_CAPI", "true").lower() in ("true", "1", "yes")
|
|
FULL_RESYNC_INTERVAL = int(os.environ.get("FULL_RESYNC_INTERVAL", "21600")) # 6 hours
|
|
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
|
|
|
|
PIPELINE_URL = f"{BUNNY_DB_URL}/v2/pipeline"
|
|
BATCH_SIZE = 500
|
|
HEALTHCHECK_FILE = Path("/tmp/bouncer-healthy")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
logging.basicConfig(
|
|
level=LOG_LEVEL,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
log = logging.getLogger("bouncer")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Graceful shutdown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_running = True
|
|
|
|
|
|
def _shutdown(signum, _frame):
|
|
global _running
|
|
log.info("Received signal %s, shutting down…", signal.Signals(signum).name)
|
|
_running = False
|
|
|
|
|
|
signal.signal(signal.SIGTERM, _shutdown)
|
|
signal.signal(signal.SIGINT, _shutdown)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP sessions with retry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_session(headers: dict, retries: int = 3) -> requests.Session:
|
|
s = requests.Session()
|
|
s.headers.update(headers)
|
|
adapter = HTTPAdapter(
|
|
max_retries=Retry(
|
|
total=retries,
|
|
backoff_factor=1,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
allowed_methods=["GET", "POST"],
|
|
)
|
|
)
|
|
s.mount("http://", adapter)
|
|
s.mount("https://", adapter)
|
|
return s
|
|
|
|
|
|
crowdsec_session = _make_session({"X-Api-Key": CROWDSEC_LAPI_KEY})
|
|
db_session = _make_session(
|
|
{"Authorization": f"Bearer {BUNNY_DB_TOKEN}", "Content-Type": "application/json"}
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bunny Database helpers (libSQL HTTP API)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def db_pipeline(statements: list[dict]) -> dict:
|
|
"""Send a batch of statements to Bunny Database via /v2/pipeline."""
|
|
reqs = [{"type": "execute", "stmt": s} for s in statements]
|
|
reqs.append({"type": "close"})
|
|
resp = db_session.post(PIPELINE_URL, json={"requests": reqs}, timeout=30)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
for r in data.get("results", []):
|
|
if r.get("type") == "error":
|
|
raise RuntimeError(f"DB error: {r.get('error', {}).get('message', r)}")
|
|
return data
|
|
|
|
|
|
def db_pipeline_batched(statements: list[dict]):
|
|
"""Send statements in batches of BATCH_SIZE."""
|
|
for i in range(0, len(statements), BATCH_SIZE):
|
|
batch = statements[i : i + BATCH_SIZE]
|
|
db_pipeline(batch)
|
|
|
|
|
|
def init_db():
|
|
"""Create the blocklist table if it doesn't exist."""
|
|
db_pipeline([
|
|
{
|
|
"sql": (
|
|
"CREATE TABLE IF NOT EXISTS blocklist ("
|
|
" ip TEXT PRIMARY KEY,"
|
|
" reason TEXT NOT NULL DEFAULT '',"
|
|
" origin TEXT NOT NULL DEFAULT '',"
|
|
" expires_at TEXT,"
|
|
" created_at TEXT NOT NULL DEFAULT (datetime('now'))"
|
|
")"
|
|
)
|
|
}
|
|
])
|
|
log.info("Database table 'blocklist' ready")
|
|
|
|
|
|
def db_count() -> int:
|
|
"""Return the number of rows in the blocklist table."""
|
|
data = db_pipeline([{"sql": "SELECT COUNT(*) FROM blocklist"}])
|
|
row = data["results"][0]["response"]["result"]["rows"][0]
|
|
return int(row[0]["value"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CrowdSec streaming helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def fetch_stream(startup: bool = False) -> dict:
|
|
"""Fetch decisions from CrowdSec LAPI streaming endpoint."""
|
|
url = f"{CROWDSEC_LAPI_URL}/v1/decisions/stream"
|
|
params: dict = {"startup": "true" if startup else "false"}
|
|
if not INCLUDE_CAPI:
|
|
params["origins"] = "crowdsec,cscli"
|
|
resp = crowdsec_session.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Decision → SQL statement converters
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _normalize_ip(value: str) -> str:
|
|
"""Strip /32 suffix from single-host CIDR notation."""
|
|
return value[:-3] if value.endswith("/32") else value
|
|
|
|
|
|
def _insert_stmt(d: dict) -> dict:
|
|
"""Convert a CrowdSec decision to an INSERT OR REPLACE statement."""
|
|
until = d.get("until")
|
|
return {
|
|
"sql": (
|
|
"INSERT OR REPLACE INTO blocklist (ip, reason, origin, expires_at) "
|
|
"VALUES (?, ?, ?, ?)"
|
|
),
|
|
"args": [
|
|
{"type": "text", "value": _normalize_ip(d.get("value", ""))},
|
|
{"type": "text", "value": d.get("scenario", "")},
|
|
{"type": "text", "value": d.get("origin", "")},
|
|
{"type": "text", "value": until} if until else {"type": "null"},
|
|
],
|
|
}
|
|
|
|
|
|
def _delete_stmt(d: dict) -> dict:
|
|
"""Convert a CrowdSec decision to a DELETE statement."""
|
|
return {
|
|
"sql": "DELETE FROM blocklist WHERE ip = ?",
|
|
"args": [{"type": "text", "value": _normalize_ip(d.get("value", ""))}],
|
|
}
|
|
|
|
|
|
def _is_ip_ban(d: dict) -> bool:
|
|
return d.get("scope", "").lower() == "ip" and d.get("type") == "ban"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sync logic
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def full_sync() -> int:
|
|
"""Full resync: fetch all decisions and rebuild the database."""
|
|
log.info("Starting full resync (startup=true)")
|
|
data = fetch_stream(startup=True)
|
|
|
|
new_decisions = data.get("new") or []
|
|
|
|
stmts: list[dict] = [{"sql": "DELETE FROM blocklist"}]
|
|
for d in new_decisions:
|
|
if _is_ip_ban(d):
|
|
stmts.append(_insert_stmt(d))
|
|
|
|
db_pipeline_batched(stmts)
|
|
|
|
count = len(stmts) - 1 # exclude the DELETE statement
|
|
log.info("Full resync complete: %d IPs written to blocklist", count)
|
|
return count
|
|
|
|
|
|
def incremental_sync():
|
|
"""Incremental sync: fetch only new/deleted decisions since last poll."""
|
|
data = fetch_stream(startup=False)
|
|
|
|
new_decisions = data.get("new") or []
|
|
deleted_decisions = data.get("deleted") or []
|
|
|
|
stmts: list[dict] = []
|
|
|
|
for d in deleted_decisions:
|
|
if d.get("scope", "").lower() == "ip":
|
|
stmts.append(_delete_stmt(d))
|
|
|
|
for d in new_decisions:
|
|
if _is_ip_ban(d):
|
|
stmts.append(_insert_stmt(d))
|
|
|
|
if stmts:
|
|
db_pipeline_batched(stmts)
|
|
|
|
added = sum(1 for d in new_decisions if _is_ip_ban(d))
|
|
removed = sum(1 for d in deleted_decisions if d.get("scope", "").lower() == "ip")
|
|
|
|
if added or removed:
|
|
log.info("Incremental sync: +%d -%d IPs", added, removed)
|
|
else:
|
|
log.debug("Incremental sync: no changes")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Startup validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def verify_connections():
|
|
"""Check connectivity to CrowdSec LAPI and Bunny Database at startup."""
|
|
log.info("Verifying CrowdSec LAPI at %s …", CROWDSEC_LAPI_URL)
|
|
resp = crowdsec_session.get(
|
|
f"{CROWDSEC_LAPI_URL}/v1/decisions",
|
|
params={"type": "ban", "limit": "1"},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
log.info("CrowdSec LAPI: OK")
|
|
|
|
log.info("Verifying Bunny Database at %s …", BUNNY_DB_URL)
|
|
init_db()
|
|
log.info("Bunny Database: OK")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main():
|
|
log.info(
|
|
"Starting CrowdSec Bunny Bouncer — db=%s interval=%ds include_capi=%s full_resync=%ds",
|
|
BUNNY_DB_URL,
|
|
SYNC_INTERVAL,
|
|
INCLUDE_CAPI,
|
|
FULL_RESYNC_INTERVAL,
|
|
)
|
|
|
|
verify_connections()
|
|
HEALTHCHECK_FILE.touch()
|
|
|
|
# Initial full sync
|
|
try:
|
|
count = full_sync()
|
|
log.info("Blocklist initialized with %d IPs", count)
|
|
except Exception:
|
|
log.exception("Initial full sync failed, will retry")
|
|
HEALTHCHECK_FILE.unlink(missing_ok=True)
|
|
|
|
last_full_sync = time.monotonic()
|
|
|
|
while _running:
|
|
try:
|
|
if time.monotonic() - last_full_sync >= FULL_RESYNC_INTERVAL:
|
|
full_sync()
|
|
last_full_sync = time.monotonic()
|
|
else:
|
|
incremental_sync()
|
|
except requests.RequestException:
|
|
log.exception("Sync cycle failed, will retry next interval")
|
|
HEALTHCHECK_FILE.unlink(missing_ok=True)
|
|
except Exception:
|
|
log.exception("Unexpected error in sync cycle")
|
|
HEALTHCHECK_FILE.unlink(missing_ok=True)
|
|
else:
|
|
HEALTHCHECK_FILE.touch()
|
|
|
|
for _ in range(SYNC_INTERVAL):
|
|
if not _running:
|
|
break
|
|
time.sleep(1)
|
|
|
|
HEALTHCHECK_FILE.unlink(missing_ok=True)
|
|
log.info("Bouncer stopped")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
log.info("Interrupted")
|
|
sys.exit(0)
|
|
except Exception:
|
|
log.exception("Fatal error")
|
|
sys.exit(1)
|