#!/usr/bin/env python3 """CrowdSec Cloudflare Worker Bouncer 도메인 관리 API 서버""" import os from contextlib import asynccontextmanager from typing import Optional from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel, Field # cf_bouncer 모듈에서 핵심 함수들 import from cf_bouncer import ( __version__, SERVER_BOUNCER, SERVER_CROWDSEC, SSH_CONFIGS, BouncerError, get_config, get_zones, get_cf_token, get_zone_id, get_all_zones_from_cf, find_zone_by_domain, extract_domain, save_config, backup_config, do_apply, run_ssh, log_action, BACKUP_DIR, LOG_FILE, ) # ============ Pydantic Models ============ class DomainBase(BaseModel): domain: str action: str = "captcha" turnstile_enabled: bool = True turnstile_mode: str = "managed" class DomainCreate(DomainBase): pass class DomainUpdate(BaseModel): action: Optional[str] = None turnstile_enabled: Optional[bool] = None turnstile_mode: Optional[str] = None class DomainResponse(BaseModel): domain: str zone_id: str action: str turnstile_enabled: bool turnstile_mode: str routes: list[str] class StatusResponse(BaseModel): bouncer_running: bool bouncer_pids: list[str] protected_domains: int crowdsec_status: Optional[str] = None class MessageResponse(BaseModel): message: str success: bool = True class ErrorResponse(BaseModel): detail: str success: bool = False # ============ FastAPI App ============ @asynccontextmanager async def lifespan(app: FastAPI): # Startup: SSH 설정 확인 bouncer_host = SSH_CONFIGS[SERVER_BOUNCER].get("host") crowdsec_host = SSH_CONFIGS[SERVER_CROWDSEC].get("host") if not bouncer_host or not crowdsec_host: print("WARNING: SSH 설정이 완료되지 않았습니다.") print(f" CFB_BOUNCER_HOST: {bouncer_host or '(not set)'}") print(f" CFB_CROWDSEC_HOST: {crowdsec_host or '(not set)'}") else: print(f"SSH 설정 확인:") print(f" Bouncer: {bouncer_host}") print(f" CrowdSec: {crowdsec_host}") yield # Shutdown print("API 서버 종료") app = FastAPI( title="CrowdSec Cloudflare Bouncer Manager API", description="CrowdSec Cloudflare Worker Bouncer 도메인 관리 API", version=__version__, lifespan=lifespan, ) # ============ API Endpoints ============ @app.get("/", response_model=MessageResponse) async def root(): """API 상태 확인""" return MessageResponse(message=f"CrowdSec Cloudflare Bouncer Manager API v{__version__}") @app.get("/health") async def health(): """헬스 체크""" return {"status": "healthy", "version": __version__} @app.get("/domains", response_model=list[DomainResponse]) async def list_domains(): """보호 중인 도메인 목록""" try: config = get_config() zones = get_zones(config) result = [] for zone in zones: routes = zone.get("routes_to_protect", []) turnstile = zone.get("turnstile", {}) result.append(DomainResponse( domain=extract_domain(routes[0]) if routes else "N/A", zone_id=zone.get("zone_id", ""), action=zone.get("default_action", ""), turnstile_enabled=turnstile.get("enabled", False), turnstile_mode=turnstile.get("mode", ""), routes=routes, )) return result except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/domains/{domain}", response_model=DomainResponse) async def get_domain(domain: str): """특정 도메인 상세 정보""" try: config = get_config() zones = get_zones(config) idx, zone = find_zone_by_domain(zones, domain) if zone is None: raise HTTPException(status_code=404, detail=f"{domain}을 찾을 수 없습니다") routes = zone.get("routes_to_protect", []) turnstile = zone.get("turnstile", {}) return DomainResponse( domain=extract_domain(routes[0]) if routes else "N/A", zone_id=zone.get("zone_id", ""), action=zone.get("default_action", ""), turnstile_enabled=turnstile.get("enabled", False), turnstile_mode=turnstile.get("mode", ""), routes=routes, ) except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/domains", response_model=MessageResponse) async def add_domain(data: DomainCreate, auto_apply: bool = Query(False)): """새 도메인 추가""" try: config = get_config() token = get_cf_token(config) if not token: raise HTTPException(status_code=400, detail="Cloudflare API 토큰을 찾을 수 없습니다") zones = get_zones(config) idx, _ = find_zone_by_domain(zones, data.domain) if idx is not None: raise HTTPException(status_code=409, detail=f"{data.domain}은 이미 보호 목록에 있습니다") zone_id = get_zone_id(data.domain, token) if not zone_id: raise HTTPException(status_code=404, detail=f"{data.domain}의 zone_id를 찾을 수 없습니다") new_zone = { "zone_id": zone_id, "actions": [data.action], "default_action": data.action, "routes_to_protect": [f"*{data.domain}/*"], "turnstile": { "enabled": data.turnstile_enabled, "rotate_secret_key": True, "rotate_secret_key_every": "168h0m0s", "mode": data.turnstile_mode, }, } config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone) save_config(config, reason=f"add_{data.domain}") log_action("add", data.domain) if auto_apply: do_apply() return MessageResponse(message=f"{data.domain} 추가 및 적용 완료") return MessageResponse(message=f"{data.domain} 추가 완료. 적용하려면 /apply 호출 필요") except HTTPException: raise except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.put("/domains/{domain}", response_model=MessageResponse) async def update_domain(domain: str, data: DomainUpdate, auto_apply: bool = Query(False)): """도메인 설정 수정""" try: config = get_config() zones = get_zones(config) idx, zone = find_zone_by_domain(zones, domain) if zone is None: raise HTTPException(status_code=404, detail=f"{domain}을 찾을 수 없습니다") changes = [] if data.action is not None: zone["default_action"] = data.action zone["actions"] = [data.action] changes.append(f"action={data.action}") if data.turnstile_enabled is not None: zone["turnstile"]["enabled"] = data.turnstile_enabled changes.append(f"turnstile={'enabled' if data.turnstile_enabled else 'disabled'}") if data.turnstile_mode is not None: zone["turnstile"]["mode"] = data.turnstile_mode changes.append(f"mode={data.turnstile_mode}") if not changes: raise HTTPException(status_code=400, detail="변경할 내용이 없습니다") config["cloudflare_config"]["accounts"][0]["zones"][idx] = zone save_config(config, reason=f"edit_{domain}") log_action("edit", f"{domain}: {', '.join(changes)}") if auto_apply: do_apply() return MessageResponse(message=f"{domain} 수정 및 적용 완료: {', '.join(changes)}") return MessageResponse(message=f"{domain} 수정 완료: {', '.join(changes)}") except HTTPException: raise except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/domains/{domain}", response_model=MessageResponse) async def remove_domain(domain: str, auto_apply: bool = Query(False)): """도메인 제거""" try: config = get_config() zones = get_zones(config) idx, zone = find_zone_by_domain(zones, domain) if idx is None: raise HTTPException(status_code=404, detail=f"{domain}을 찾을 수 없습니다") del config["cloudflare_config"]["accounts"][0]["zones"][idx] save_config(config, reason=f"remove_{domain}") log_action("remove", domain) if auto_apply: do_apply() return MessageResponse(message=f"{domain} 제거 및 적용 완료") return MessageResponse(message=f"{domain} 제거 완료") except HTTPException: raise except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/apply", response_model=MessageResponse) async def apply_config(): """설정 적용 (bouncer 서비스 재시작)""" try: if do_apply(): log_action("apply", "Service restarted via API") return MessageResponse(message="설정 적용 완료") else: raise HTTPException(status_code=500, detail="bouncer 프로세스를 시작할 수 없습니다") except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/status", response_model=StatusResponse) async def get_status(): """bouncer 상태 확인""" try: # Bouncer 프로세스 확인 result = run_ssh(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER) bouncer_running = result.returncode == 0 pids = result.stdout.strip().split('\n') if bouncer_running else [] # CrowdSec bouncer 목록 cs_result = run_ssh(["cscli", "bouncers", "list"], server=SERVER_CROWDSEC) cs_status = cs_result.stdout if cs_result.returncode == 0 else None # 보호 도메인 수 config = get_config() zones = get_zones(config) return StatusResponse( bouncer_running=bouncer_running, bouncer_pids=pids, protected_domains=len(zones), crowdsec_status=cs_status, ) except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/available") async def available_domains(): """Cloudflare에서 추가 가능한 도메인 목록""" try: config = get_config() token = get_cf_token(config) if not token: raise HTTPException(status_code=400, detail="Cloudflare API 토큰을 찾을 수 없습니다") cf_zones = get_all_zones_from_cf(token) protected_zones = get_zones(config) protected_ids = {z.get("zone_id") for z in protected_zones} result = [] for zone in cf_zones: zone_id = zone.get("id", "") result.append({ "domain": zone.get("name"), "zone_id": zone_id, "status": zone.get("status"), "protected": zone_id in protected_ids, }) return { "total": len(cf_zones), "protected": len(protected_ids), "zones": result, } except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/decisions") async def get_decisions(limit: int = Query(20, ge=1, le=100)): """CrowdSec 현재 차단 결정 조회""" try: result = run_ssh( ["cscli", "decisions", "list", "-o", "json", "--limit", str(limit)], server=SERVER_CROWDSEC, ) if result.returncode == 0: import json try: decisions = json.loads(result.stdout) if result.stdout.strip() else [] return {"decisions": decisions} except json.JSONDecodeError: return {"decisions": [], "raw": result.stdout} else: raise HTTPException(status_code=500, detail=f"결정 조회 실패: {result.stderr}") except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/sync", response_model=MessageResponse) async def sync_domains(auto_apply: bool = Query(False)): """Cloudflare의 모든 도메인을 보호 목록에 추가""" try: config = get_config() token = get_cf_token(config) if not token: raise HTTPException(status_code=400, detail="Cloudflare API 토큰을 찾을 수 없습니다") cf_zones = get_all_zones_from_cf(token) protected_zones = get_zones(config) protected_ids = {z.get("zone_id") for z in protected_zones} # 추가할 도메인 찾기 to_add = [z for z in cf_zones if z.get("id") not in protected_ids and z.get("status") == "active"] if not to_add: return MessageResponse(message="모든 도메인이 이미 보호 목록에 있습니다") added = [] for cf_zone in to_add: new_zone = { "zone_id": cf_zone.get("id"), "actions": ["captcha"], "default_action": "captcha", "routes_to_protect": [f"*{cf_zone.get('name')}/*"], "turnstile": { "enabled": True, "rotate_secret_key": True, "rotate_secret_key_every": "168h0m0s", "mode": "managed", }, } config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone) added.append(cf_zone.get("name")) save_config(config, reason="sync") log_action("sync", f"Added {len(added)} domains: {', '.join(added)}") if auto_apply: do_apply() return MessageResponse(message=f"{len(added)}개 도메인 추가 및 적용 완료: {', '.join(added)}") return MessageResponse(message=f"{len(added)}개 도메인 추가 완료: {', '.join(added)}") except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/backup", response_model=MessageResponse) async def create_backup(): """현재 설정 백업""" try: config = get_config() backup_file = backup_config(config, reason="api_backup") log_action("backup", str(backup_file)) return MessageResponse(message=f"백업 완료: {backup_file.name}") except BouncerError as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/history") async def get_history(lines: int = Query(20, ge=1, le=100)): """변경 이력 조회""" try: if not LOG_FILE.exists(): return {"history": []} content = LOG_FILE.read_text() history_lines = content.strip().split('\n') return {"history": history_lines[-lines:]} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============ Main ============ if __name__ == "__main__": import uvicorn port = int(os.environ.get("CFB_API_PORT", "8000")) host = os.environ.get("CFB_API_HOST", "0.0.0.0") uvicorn.run(app, host=host, port=port)