Files
cf-bouncer-manager/api_server.py
kaffa 6a26c0c4e4 Add REST API server, Docker support, and CI pipeline
- Add FastAPI-based REST API server (api_server.py)
- Add Dockerfile and docker-compose.yaml for containerized deployment
- Add Gitea Actions CI workflow for building and pushing images
- Refactor CLI to support dual-server SSH (bouncer + crowdsec)
- Update dependencies with FastAPI and uvicorn
- Update CLAUDE.md and README.md with full documentation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 11:51:03 +09:00

475 lines
15 KiB
Python

#!/usr/bin/env python3
"""CrowdSec Cloudflare Worker Bouncer 도메인 관리 API 서버"""
import os
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel, Field
# cf_bouncer 모듈에서 핵심 함수들 import
from cf_bouncer import (
__version__,
SERVER_BOUNCER,
SERVER_CROWDSEC,
SSH_CONFIGS,
BouncerError,
get_config,
get_zones,
get_cf_token,
get_zone_id,
get_all_zones_from_cf,
find_zone_by_domain,
extract_domain,
save_config,
backup_config,
do_apply,
run_ssh,
log_action,
BACKUP_DIR,
LOG_FILE,
)
# ============ Pydantic Models ============
class DomainBase(BaseModel):
domain: str
action: str = "captcha"
turnstile_enabled: bool = True
turnstile_mode: str = "managed"
class DomainCreate(DomainBase):
pass
class DomainUpdate(BaseModel):
action: Optional[str] = None
turnstile_enabled: Optional[bool] = None
turnstile_mode: Optional[str] = None
class DomainResponse(BaseModel):
domain: str
zone_id: str
action: str
turnstile_enabled: bool
turnstile_mode: str
routes: list[str]
class StatusResponse(BaseModel):
bouncer_running: bool
bouncer_pids: list[str]
protected_domains: int
crowdsec_status: Optional[str] = None
class MessageResponse(BaseModel):
message: str
success: bool = True
class ErrorResponse(BaseModel):
detail: str
success: bool = False
# ============ FastAPI App ============
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: SSH 설정 확인
bouncer_host = SSH_CONFIGS[SERVER_BOUNCER].get("host")
crowdsec_host = SSH_CONFIGS[SERVER_CROWDSEC].get("host")
if not bouncer_host or not crowdsec_host:
print("WARNING: SSH 설정이 완료되지 않았습니다.")
print(f" CFB_BOUNCER_HOST: {bouncer_host or '(not set)'}")
print(f" CFB_CROWDSEC_HOST: {crowdsec_host or '(not set)'}")
else:
print(f"SSH 설정 확인:")
print(f" Bouncer: {bouncer_host}")
print(f" CrowdSec: {crowdsec_host}")
yield
# Shutdown
print("API 서버 종료")
app = FastAPI(
title="CrowdSec Cloudflare Bouncer Manager API",
description="CrowdSec Cloudflare Worker Bouncer 도메인 관리 API",
version=__version__,
lifespan=lifespan,
)
# ============ API Endpoints ============
@app.get("/", response_model=MessageResponse)
async def root():
"""API 상태 확인"""
return MessageResponse(message=f"CrowdSec Cloudflare Bouncer Manager API v{__version__}")
@app.get("/health")
async def health():
"""헬스 체크"""
return {"status": "healthy", "version": __version__}
@app.get("/domains", response_model=list[DomainResponse])
async def list_domains():
"""보호 중인 도메인 목록"""
try:
config = get_config()
zones = get_zones(config)
result = []
for zone in zones:
routes = zone.get("routes_to_protect", [])
turnstile = zone.get("turnstile", {})
result.append(DomainResponse(
domain=extract_domain(routes[0]) if routes else "N/A",
zone_id=zone.get("zone_id", ""),
action=zone.get("default_action", ""),
turnstile_enabled=turnstile.get("enabled", False),
turnstile_mode=turnstile.get("mode", ""),
routes=routes,
))
return result
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/domains/{domain}", response_model=DomainResponse)
async def get_domain(domain: str):
"""특정 도메인 상세 정보"""
try:
config = get_config()
zones = get_zones(config)
idx, zone = find_zone_by_domain(zones, domain)
if zone is None:
raise HTTPException(status_code=404, detail=f"{domain}을 찾을 수 없습니다")
routes = zone.get("routes_to_protect", [])
turnstile = zone.get("turnstile", {})
return DomainResponse(
domain=extract_domain(routes[0]) if routes else "N/A",
zone_id=zone.get("zone_id", ""),
action=zone.get("default_action", ""),
turnstile_enabled=turnstile.get("enabled", False),
turnstile_mode=turnstile.get("mode", ""),
routes=routes,
)
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/domains", response_model=MessageResponse)
async def add_domain(data: DomainCreate, auto_apply: bool = Query(False)):
"""새 도메인 추가"""
try:
config = get_config()
token = get_cf_token(config)
if not token:
raise HTTPException(status_code=400, detail="Cloudflare API 토큰을 찾을 수 없습니다")
zones = get_zones(config)
idx, _ = find_zone_by_domain(zones, data.domain)
if idx is not None:
raise HTTPException(status_code=409, detail=f"{data.domain}은 이미 보호 목록에 있습니다")
zone_id = get_zone_id(data.domain, token)
if not zone_id:
raise HTTPException(status_code=404, detail=f"{data.domain}의 zone_id를 찾을 수 없습니다")
new_zone = {
"zone_id": zone_id,
"actions": [data.action],
"default_action": data.action,
"routes_to_protect": [f"*{data.domain}/*"],
"turnstile": {
"enabled": data.turnstile_enabled,
"rotate_secret_key": True,
"rotate_secret_key_every": "168h0m0s",
"mode": data.turnstile_mode,
},
}
config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone)
save_config(config, reason=f"add_{data.domain}")
log_action("add", data.domain)
if auto_apply:
do_apply()
return MessageResponse(message=f"{data.domain} 추가 및 적용 완료")
return MessageResponse(message=f"{data.domain} 추가 완료. 적용하려면 /apply 호출 필요")
except HTTPException:
raise
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put("/domains/{domain}", response_model=MessageResponse)
async def update_domain(domain: str, data: DomainUpdate, auto_apply: bool = Query(False)):
"""도메인 설정 수정"""
try:
config = get_config()
zones = get_zones(config)
idx, zone = find_zone_by_domain(zones, domain)
if zone is None:
raise HTTPException(status_code=404, detail=f"{domain}을 찾을 수 없습니다")
changes = []
if data.action is not None:
zone["default_action"] = data.action
zone["actions"] = [data.action]
changes.append(f"action={data.action}")
if data.turnstile_enabled is not None:
zone["turnstile"]["enabled"] = data.turnstile_enabled
changes.append(f"turnstile={'enabled' if data.turnstile_enabled else 'disabled'}")
if data.turnstile_mode is not None:
zone["turnstile"]["mode"] = data.turnstile_mode
changes.append(f"mode={data.turnstile_mode}")
if not changes:
raise HTTPException(status_code=400, detail="변경할 내용이 없습니다")
config["cloudflare_config"]["accounts"][0]["zones"][idx] = zone
save_config(config, reason=f"edit_{domain}")
log_action("edit", f"{domain}: {', '.join(changes)}")
if auto_apply:
do_apply()
return MessageResponse(message=f"{domain} 수정 및 적용 완료: {', '.join(changes)}")
return MessageResponse(message=f"{domain} 수정 완료: {', '.join(changes)}")
except HTTPException:
raise
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/domains/{domain}", response_model=MessageResponse)
async def remove_domain(domain: str, auto_apply: bool = Query(False)):
"""도메인 제거"""
try:
config = get_config()
zones = get_zones(config)
idx, zone = find_zone_by_domain(zones, domain)
if idx is None:
raise HTTPException(status_code=404, detail=f"{domain}을 찾을 수 없습니다")
del config["cloudflare_config"]["accounts"][0]["zones"][idx]
save_config(config, reason=f"remove_{domain}")
log_action("remove", domain)
if auto_apply:
do_apply()
return MessageResponse(message=f"{domain} 제거 및 적용 완료")
return MessageResponse(message=f"{domain} 제거 완료")
except HTTPException:
raise
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/apply", response_model=MessageResponse)
async def apply_config():
"""설정 적용 (bouncer 서비스 재시작)"""
try:
if do_apply():
log_action("apply", "Service restarted via API")
return MessageResponse(message="설정 적용 완료")
else:
raise HTTPException(status_code=500, detail="bouncer 프로세스를 시작할 수 없습니다")
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status", response_model=StatusResponse)
async def get_status():
"""bouncer 상태 확인"""
try:
# Bouncer 프로세스 확인
result = run_ssh(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
bouncer_running = result.returncode == 0
pids = result.stdout.strip().split('\n') if bouncer_running else []
# CrowdSec bouncer 목록
cs_result = run_ssh(["cscli", "bouncers", "list"], server=SERVER_CROWDSEC)
cs_status = cs_result.stdout if cs_result.returncode == 0 else None
# 보호 도메인 수
config = get_config()
zones = get_zones(config)
return StatusResponse(
bouncer_running=bouncer_running,
bouncer_pids=pids,
protected_domains=len(zones),
crowdsec_status=cs_status,
)
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/available")
async def available_domains():
"""Cloudflare에서 추가 가능한 도메인 목록"""
try:
config = get_config()
token = get_cf_token(config)
if not token:
raise HTTPException(status_code=400, detail="Cloudflare API 토큰을 찾을 수 없습니다")
cf_zones = get_all_zones_from_cf(token)
protected_zones = get_zones(config)
protected_ids = {z.get("zone_id") for z in protected_zones}
result = []
for zone in cf_zones:
zone_id = zone.get("id", "")
result.append({
"domain": zone.get("name"),
"zone_id": zone_id,
"status": zone.get("status"),
"protected": zone_id in protected_ids,
})
return {
"total": len(cf_zones),
"protected": len(protected_ids),
"zones": result,
}
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/decisions")
async def get_decisions(limit: int = Query(20, ge=1, le=100)):
"""CrowdSec 현재 차단 결정 조회"""
try:
result = run_ssh(
["cscli", "decisions", "list", "-o", "json", "--limit", str(limit)],
server=SERVER_CROWDSEC,
)
if result.returncode == 0:
import json
try:
decisions = json.loads(result.stdout) if result.stdout.strip() else []
return {"decisions": decisions}
except json.JSONDecodeError:
return {"decisions": [], "raw": result.stdout}
else:
raise HTTPException(status_code=500, detail=f"결정 조회 실패: {result.stderr}")
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/sync", response_model=MessageResponse)
async def sync_domains(auto_apply: bool = Query(False)):
"""Cloudflare의 모든 도메인을 보호 목록에 추가"""
try:
config = get_config()
token = get_cf_token(config)
if not token:
raise HTTPException(status_code=400, detail="Cloudflare API 토큰을 찾을 수 없습니다")
cf_zones = get_all_zones_from_cf(token)
protected_zones = get_zones(config)
protected_ids = {z.get("zone_id") for z in protected_zones}
# 추가할 도메인 찾기
to_add = [z for z in cf_zones if z.get("id") not in protected_ids and z.get("status") == "active"]
if not to_add:
return MessageResponse(message="모든 도메인이 이미 보호 목록에 있습니다")
added = []
for cf_zone in to_add:
new_zone = {
"zone_id": cf_zone.get("id"),
"actions": ["captcha"],
"default_action": "captcha",
"routes_to_protect": [f"*{cf_zone.get('name')}/*"],
"turnstile": {
"enabled": True,
"rotate_secret_key": True,
"rotate_secret_key_every": "168h0m0s",
"mode": "managed",
},
}
config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone)
added.append(cf_zone.get("name"))
save_config(config, reason="sync")
log_action("sync", f"Added {len(added)} domains: {', '.join(added)}")
if auto_apply:
do_apply()
return MessageResponse(message=f"{len(added)}개 도메인 추가 및 적용 완료: {', '.join(added)}")
return MessageResponse(message=f"{len(added)}개 도메인 추가 완료: {', '.join(added)}")
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/backup", response_model=MessageResponse)
async def create_backup():
"""현재 설정 백업"""
try:
config = get_config()
backup_file = backup_config(config, reason="api_backup")
log_action("backup", str(backup_file))
return MessageResponse(message=f"백업 완료: {backup_file.name}")
except BouncerError as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/history")
async def get_history(lines: int = Query(20, ge=1, le=100)):
"""변경 이력 조회"""
try:
if not LOG_FILE.exists():
return {"history": []}
content = LOG_FILE.read_text()
history_lines = content.strip().split('\n')
return {"history": history_lines[-lines:]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============ Main ============
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("CFB_API_PORT", "8000"))
host = os.environ.get("CFB_API_HOST", "0.0.0.0")
uvicorn.run(app, host=host, port=port)