Files
cf-bouncer-manager/cf_bouncer.py
kaffa 6a26c0c4e4 Add REST API server, Docker support, and CI pipeline
- Add FastAPI-based REST API server (api_server.py)
- Add Dockerfile and docker-compose.yaml for containerized deployment
- Add Gitea Actions CI workflow for building and pushing images
- Refactor CLI to support dual-server SSH (bouncer + crowdsec)
- Update dependencies with FastAPI and uvicorn
- Update CLAUDE.md and README.md with full documentation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 11:51:03 +09:00

991 lines
36 KiB
Python
Executable File

#!/usr/bin/env python3
"""CrowdSec Cloudflare Worker Bouncer 도메인 관리 CLI"""
import os
import shlex
import subprocess
from datetime import datetime
from difflib import unified_diff
from pathlib import Path
from typing import Annotated, Optional
import requests
import typer
import yaml
from rich.console import Console
from rich.table import Table
__version__ = "1.3.0"
app = typer.Typer(help="CrowdSec Cloudflare Bouncer 도메인 관리")
console = Console()
# 설정
CONFIG_PATH = "/etc/crowdsec/bouncers/crowdsec-cloudflare-worker-bouncer.yaml"
BACKUP_DIR = Path.home() / "cf-bouncer-manager" / "backups"
LOG_FILE = Path.home() / "cf-bouncer-manager" / "history.log"
REQUEST_TIMEOUT = 30
SUBPROCESS_TIMEOUT = 60
# 서버 식별자 (논리적 이름)
SERVER_BOUNCER = "bouncer"
SERVER_CROWDSEC = "crowdsec"
# SSH 설정 (환경변수) - 각 서버별 설정
# Bouncer 서버 (crowdsec-cloudflare-worker-bouncer가 실행되는 서버)
SSH_BOUNCER_HOST = os.environ.get("CFB_BOUNCER_HOST") # user@hostname 또는 hostname
SSH_BOUNCER_PORT = os.environ.get("CFB_BOUNCER_PORT", "22")
SSH_BOUNCER_KEY = os.environ.get("CFB_BOUNCER_KEY") # SSH 키 경로 (선택)
SSH_BOUNCER_USER = os.environ.get("CFB_BOUNCER_USER", "root") # SSH 사용자
# CrowdSec 서버 (CrowdSec 엔진이 실행되는 서버)
SSH_CROWDSEC_HOST = os.environ.get("CFB_CROWDSEC_HOST")
SSH_CROWDSEC_PORT = os.environ.get("CFB_CROWDSEC_PORT", "22")
SSH_CROWDSEC_KEY = os.environ.get("CFB_CROWDSEC_KEY")
SSH_CROWDSEC_USER = os.environ.get("CFB_CROWDSEC_USER", "root")
# SSH 설정 매핑
SSH_CONFIGS = {
SERVER_BOUNCER: {
"host": SSH_BOUNCER_HOST,
"port": SSH_BOUNCER_PORT,
"key": SSH_BOUNCER_KEY,
"user": SSH_BOUNCER_USER,
},
SERVER_CROWDSEC: {
"host": SSH_CROWDSEC_HOST,
"port": SSH_CROWDSEC_PORT,
"key": SSH_CROWDSEC_KEY,
"user": SSH_CROWDSEC_USER,
},
}
class BouncerError(Exception):
"""Bouncer 관련 에러"""
pass
def log_action(action: str, details: str = "") -> None:
"""변경 이력 로깅"""
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, "a") as f:
f.write(f"[{timestamp}] {action}: {details}\n")
def get_ssh_config(server: str) -> dict:
"""서버별 SSH 설정 반환"""
config = SSH_CONFIGS.get(server)
if not config or not config.get("host"):
raise BouncerError(f"SSH 설정이 없습니다: {server}\n환경변수 CFB_{server.upper()}_HOST를 설정하세요.")
return config
def run_ssh(cmd: list[str], server: str = SERVER_BOUNCER, capture: bool = True, timeout: int = SUBPROCESS_TIMEOUT) -> subprocess.CompletedProcess:
"""SSH를 통해 원격 서버에서 명령 실행
Args:
cmd: 실행할 명령어 리스트
server: 서버 식별자 (SERVER_BOUNCER 또는 SERVER_CROWDSEC)
capture: 출력 캡처 여부
timeout: 타임아웃 (초)
Returns:
subprocess.CompletedProcess 결과
"""
config = get_ssh_config(server)
# SSH 호스트 구성 (user@host 형태 또는 host만)
host = config["host"]
if "@" not in host:
host = f"{config['user']}@{host}"
# SSH 명령어 구성
ssh_cmd = [
"ssh",
"-o", "BatchMode=yes",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "LogLevel=ERROR",
"-o", "ConnectTimeout=10",
"-p", config["port"],
]
if config.get("key"):
ssh_cmd.extend(["-i", config["key"]])
# 원격 명령어 구성
remote_cmd = " ".join(shlex.quote(c) for c in cmd)
ssh_cmd.extend([host, remote_cmd])
try:
return subprocess.run(ssh_cmd, capture_output=capture, text=True, timeout=timeout)
except subprocess.TimeoutExpired:
raise BouncerError(f"명령 실행 시간 초과 ({server}): {' '.join(cmd)}")
except Exception as e:
raise BouncerError(f"SSH 명령 실행 실패 ({server}): {e}")
def get_config() -> dict:
"""bouncer 설정 파일 읽기"""
result = run_ssh(["cat", CONFIG_PATH], server=SERVER_BOUNCER)
if result.returncode != 0:
raise BouncerError(f"설정 파일을 읽을 수 없습니다: {result.stderr}")
try:
return yaml.safe_load(result.stdout)
except yaml.YAMLError as e:
raise BouncerError(f"YAML 파싱 실패: {e}")
def validate_config(config: dict) -> bool:
"""설정 유효성 검사"""
required_keys = ["cloudflare_config", "crowdsec_config"]
for key in required_keys:
if key not in config:
raise BouncerError(f"필수 설정 누락: {key}")
cf_config = config.get("cloudflare_config", {})
if not cf_config.get("accounts"):
raise BouncerError("Cloudflare 계정 설정이 없습니다")
return True
def backup_config(config: dict, reason: str = "manual") -> Path:
"""설정 백업"""
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = BACKUP_DIR / f"config_{timestamp}_{reason}.yaml"
yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
backup_file.write_text(yaml_content)
# 오래된 백업 정리 (최근 20개만 유지)
backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True)
for old_backup in backups[20:]:
old_backup.unlink()
return backup_file
def save_config(config: dict, reason: str = "update") -> None:
"""bouncer 설정 파일 저장 (백업 후)"""
validate_config(config)
backup_file = backup_config(config, reason)
console.print(f"[dim]백업 저장: {backup_file}[/dim]")
yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
result = run_ssh(["sh", "-c", f"cat > {CONFIG_PATH} << 'EOFCONFIG'\n{yaml_content}\nEOFCONFIG"], server=SERVER_BOUNCER)
if result.returncode != 0:
raise BouncerError(f"설정 파일 저장 실패: {result.stderr}")
log_action(reason, f"설정 파일 업데이트")
def get_cf_token(config: dict) -> str:
"""설정에서 Cloudflare API 토큰 가져오기"""
accounts = config.get("cloudflare_config", {}).get("accounts", [])
if accounts:
return accounts[0].get("token", "")
return ""
def get_zone_id(domain: str, token: str) -> Optional[str]:
"""Cloudflare API로 도메인의 zone_id 조회"""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
try:
resp = requests.get(
f"https://api.cloudflare.com/client/v4/zones?name={domain}",
headers=headers,
timeout=REQUEST_TIMEOUT,
)
if resp.status_code == 200:
data = resp.json()
if data.get("success") and data.get("result"):
return data["result"][0]["id"]
except requests.RequestException as e:
raise BouncerError(f"Cloudflare API 요청 실패: {e}")
return None
def get_all_zones_from_cf(token: str) -> list[dict]:
"""Cloudflare 계정의 모든 zone 조회"""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
all_zones = []
page = 1
try:
while True:
resp = requests.get(
f"https://api.cloudflare.com/client/v4/zones?per_page=50&page={page}",
headers=headers,
timeout=REQUEST_TIMEOUT,
)
if resp.status_code != 200:
break
data = resp.json()
if not data.get("success"):
break
zones = data.get("result", [])
if not zones:
break
all_zones.extend(zones)
if len(zones) < 50:
break
page += 1
except requests.RequestException as e:
raise BouncerError(f"Cloudflare API 요청 실패: {e}")
return all_zones
def get_zones(config: dict) -> list[dict]:
"""설정에서 zones 목록 가져오기"""
accounts = config.get("cloudflare_config", {}).get("accounts", [])
if accounts:
return accounts[0].get("zones", [])
return []
def extract_domain(route: str) -> str:
"""routes_to_protect에서 도메인 추출"""
return route.strip("*").strip("/")
def find_zone_by_domain(zones: list[dict], domain: str) -> tuple[int, dict] | tuple[None, None]:
"""도메인으로 zone 찾기"""
for i, zone in enumerate(zones):
routes = zone.get("routes_to_protect", [])
if routes and domain in routes[0]:
return i, zone
return None, None
def do_apply() -> bool:
"""bouncer 서비스 재시작"""
result = run_ssh(["systemctl", "restart", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
if result.returncode != 0:
run_ssh(["pkill", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
run_ssh([
"sh", "-c",
f"nohup crowdsec-cloudflare-worker-bouncer -c {CONFIG_PATH} > /var/log/bouncer.log 2>&1 &"
], server=SERVER_BOUNCER)
# 잠시 대기 후 프로세스 확인
import time
time.sleep(2)
result = run_ssh(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
return result.returncode == 0
def version_callback(value: bool):
if value:
console.print(f"cfb version {__version__}")
raise typer.Exit()
# ============ CLI Commands ============
@app.callback()
def main(
version: Annotated[Optional[bool], typer.Option("--version", "-v", callback=version_callback, is_eager=True, help="버전 표시")] = None,
):
"""CrowdSec Cloudflare Bouncer 도메인 관리 CLI"""
pass
@app.command("list")
def list_domains():
"""보호 중인 도메인 목록 표시"""
try:
config = get_config()
zones = get_zones(config)
table = Table(title="보호 중인 도메인")
table.add_column("#", style="dim", width=3)
table.add_column("도메인", style="cyan")
table.add_column("Zone ID", style="dim")
table.add_column("액션", style="green")
table.add_column("Turnstile", style="yellow")
table.add_column("모드", style="magenta")
for i, zone in enumerate(zones, 1):
routes = zone.get("routes_to_protect", [])
domain = extract_domain(routes[0]) if routes else "N/A"
zone_id = zone.get("zone_id", "N/A")
action = zone.get("default_action", "N/A")
turnstile_cfg = zone.get("turnstile", {})
turnstile = "활성" if turnstile_cfg.get("enabled") else "비활성"
mode = turnstile_cfg.get("mode", "N/A") if turnstile_cfg.get("enabled") else "-"
table.add_row(str(i), domain, zone_id[:12] + "...", action, turnstile, mode)
console.print(table)
console.print(f"\n총 [bold]{len(zones)}[/bold]개 도메인 보호 중")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def show(domain: str = typer.Argument(..., help="조회할 도메인")):
"""특정 도메인 상세 정보"""
try:
config = get_config()
zones = get_zones(config)
idx, zone = find_zone_by_domain(zones, domain)
if zone is None:
console.print(f"[red]{domain}을 찾을 수 없습니다[/red]")
raise typer.Exit(1)
routes = zone.get("routes_to_protect", [])
turnstile = zone.get("turnstile", {})
console.print(f"\n[bold cyan]{'='*60}[/bold cyan]")
console.print(f"[bold cyan]{domain} 상세 정보[/bold cyan]")
console.print(f"[bold cyan]{'='*60}[/bold cyan]\n")
console.print(f"[bold]도메인:[/bold] {extract_domain(routes[0]) if routes else 'N/A'}")
console.print(f"[bold]Zone ID:[/bold] {zone.get('zone_id', 'N/A')}")
console.print(f"[bold]액션:[/bold] {zone.get('default_action', 'N/A')}")
console.print(f"[bold]가능한 액션:[/bold] {', '.join(zone.get('actions', []))}")
console.print(f"[bold]보호 경로:[/bold] {', '.join(routes)}")
console.print(f"\n[bold yellow]Turnstile 설정[/bold yellow]")
console.print(f" 활성화: {turnstile.get('enabled', False)}")
console.print(f" 모드: {turnstile.get('mode', 'N/A')}")
console.print(f" 키 자동 교체: {turnstile.get('rotate_secret_key', False)}")
console.print(f" 교체 주기: {turnstile.get('rotate_secret_key_every', 'N/A')}")
console.print()
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def add(
domain: Annotated[str, typer.Argument(help="추가할 도메인 (예: example.com)")],
action: Annotated[str, typer.Option(help="기본 액션 (captcha/ban)")] = "captcha",
turnstile: Annotated[bool, typer.Option(help="Turnstile 활성화")] = True,
mode: Annotated[str, typer.Option(help="Turnstile 모드 (managed/invisible/visible)")] = "managed",
auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="추가 후 자동 적용")] = False,
dry_run: Annotated[bool, typer.Option("--dry-run", help="실제 변경 없이 미리보기")] = False,
):
"""새 도메인 추가"""
try:
config = get_config()
token = get_cf_token(config)
if not token:
console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]")
raise typer.Exit(1)
zones = get_zones(config)
idx, _ = find_zone_by_domain(zones, domain)
if idx is not None:
console.print(f"[yellow]{domain}은 이미 보호 목록에 있습니다[/yellow]")
raise typer.Exit(1)
console.print(f"[dim]Cloudflare에서 {domain}의 zone_id 조회 중...[/dim]")
zone_id = get_zone_id(domain, token)
if not zone_id:
console.print(f"[red]{domain}의 zone_id를 찾을 수 없습니다.[/red]")
console.print("[dim]'cfb available' 명령으로 추가 가능한 도메인을 확인하세요.[/dim]")
raise typer.Exit(1)
console.print(f"[green]zone_id 확인: {zone_id}[/green]")
new_zone = {
"zone_id": zone_id,
"actions": [action],
"default_action": action,
"routes_to_protect": [f"*{domain}/*"],
"turnstile": {
"enabled": turnstile,
"rotate_secret_key": True,
"rotate_secret_key_every": "168h0m0s",
"mode": mode,
},
}
if dry_run:
console.print("\n[yellow]== DRY RUN ==[/yellow]")
console.print(yaml.dump(new_zone, default_flow_style=False))
return
config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone)
save_config(config, reason=f"add_{domain}")
log_action("add", domain)
console.print(f"[green]✓ {domain} 추가 완료[/green]")
if auto_apply:
console.print("[dim]설정 적용 중...[/dim]")
if do_apply():
console.print("[green]✓ 적용 완료[/green]")
else:
console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]")
else:
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def edit(
domain: Annotated[str, typer.Argument(help="수정할 도메인")],
action: Annotated[Optional[str], typer.Option(help="기본 액션 변경")] = None,
turnstile: Annotated[Optional[bool], typer.Option(help="Turnstile 활성화/비활성화")] = None,
mode: Annotated[Optional[str], typer.Option(help="Turnstile 모드 변경")] = None,
auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="수정 후 자동 적용")] = False,
):
"""기존 도메인 설정 수정"""
try:
config = get_config()
zones = get_zones(config)
idx, zone = find_zone_by_domain(zones, domain)
if zone is None:
console.print(f"[red]{domain}을 찾을 수 없습니다[/red]")
raise typer.Exit(1)
changes = []
if action is not None:
zone["default_action"] = action
zone["actions"] = [action]
changes.append(f"액션: {action}")
if turnstile is not None:
zone["turnstile"]["enabled"] = turnstile
changes.append(f"Turnstile: {'활성' if turnstile else '비활성'}")
if mode is not None:
zone["turnstile"]["mode"] = mode
changes.append(f"모드: {mode}")
if not changes:
console.print("[yellow]변경할 내용이 없습니다. --action, --turnstile, --mode 옵션을 사용하세요.[/yellow]")
raise typer.Exit(0)
config["cloudflare_config"]["accounts"][0]["zones"][idx] = zone
save_config(config, reason=f"edit_{domain}")
log_action("edit", f"{domain}: {', '.join(changes)}")
console.print(f"[green]✓ {domain} 수정 완료[/green]")
for change in changes:
console.print(f" - {change}")
if auto_apply:
console.print("[dim]설정 적용 중...[/dim]")
if do_apply():
console.print("[green]✓ 적용 완료[/green]")
else:
console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]")
else:
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def remove(
domain: Annotated[str, typer.Argument(help="제거할 도메인")],
force: Annotated[bool, typer.Option("--force", "-f", help="확인 없이 제거")] = False,
auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="제거 후 자동 적용")] = False,
):
"""도메인 제거"""
try:
config = get_config()
zones = get_zones(config)
idx, zone = find_zone_by_domain(zones, domain)
if idx is None:
console.print(f"[red]{domain}을 찾을 수 없습니다[/red]")
raise typer.Exit(1)
if not force and not typer.confirm(f"{domain}을 정말 제거하시겠습니까?"):
console.print("[yellow]취소됨[/yellow]")
raise typer.Exit(0)
del config["cloudflare_config"]["accounts"][0]["zones"][idx]
save_config(config, reason=f"remove_{domain}")
log_action("remove", domain)
console.print(f"[green]✓ {domain} 제거 완료[/green]")
if auto_apply:
console.print("[dim]설정 적용 중...[/dim]")
if do_apply():
console.print("[green]✓ 적용 완료[/green]")
else:
console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]")
else:
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def sync(
auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="추가 후 자동 적용")] = False,
dry_run: Annotated[bool, typer.Option("--dry-run", help="실제 변경 없이 미리보기")] = False,
):
"""Cloudflare의 모든 도메인을 보호 목록에 추가"""
try:
config = get_config()
token = get_cf_token(config)
if not token:
console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]")
raise typer.Exit(1)
console.print("[dim]Cloudflare에서 도메인 목록 조회 중...[/dim]")
cf_zones = get_all_zones_from_cf(token)
protected_zones = get_zones(config)
protected_ids = {z.get("zone_id") for z in protected_zones}
# 추가할 도메인 찾기
to_add = []
for cf_zone in cf_zones:
if cf_zone.get("id") not in protected_ids and cf_zone.get("status") == "active":
to_add.append(cf_zone)
if not to_add:
console.print("[green]모든 도메인이 이미 보호 목록에 있습니다[/green]")
return
console.print(f"\n[bold]추가할 도메인 ({len(to_add)}개):[/bold]")
for z in to_add:
console.print(f" - {z.get('name')}")
if dry_run:
console.print("\n[yellow]== DRY RUN - 실제 변경 없음 ==[/yellow]")
return
if not typer.confirm(f"\n{len(to_add)}개 도메인을 추가하시겠습니까?"):
console.print("[yellow]취소됨[/yellow]")
return
added = []
for cf_zone in to_add:
new_zone = {
"zone_id": cf_zone.get("id"),
"actions": ["captcha"],
"default_action": "captcha",
"routes_to_protect": [f"*{cf_zone.get('name')}/*"],
"turnstile": {
"enabled": True,
"rotate_secret_key": True,
"rotate_secret_key_every": "168h0m0s",
"mode": "managed",
},
}
config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone)
added.append(cf_zone.get("name"))
save_config(config, reason="sync")
log_action("sync", f"Added {len(added)} domains: {', '.join(added)}")
console.print(f"[green]✓ {len(added)}개 도메인 추가 완료[/green]")
if auto_apply:
console.print("[dim]설정 적용 중...[/dim]")
if do_apply():
console.print("[green]✓ 적용 완료[/green]")
else:
console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]")
else:
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def apply():
"""설정 적용 (bouncer 서비스 재시작)"""
console.print("[dim]bouncer 서비스 재시작 중...[/dim]")
try:
if do_apply():
console.print("[green]✓ 설정 적용 완료[/green]")
result = run_ssh(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
if result.returncode == 0:
console.print(f"[green]bouncer PID: {result.stdout.strip()}[/green]")
log_action("apply", "Service restarted")
else:
console.print("[red]✗ bouncer 프로세스를 시작할 수 없습니다[/red]")
raise typer.Exit(1)
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def status():
"""bouncer 상태 확인"""
try:
result = run_ssh(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
if result.returncode == 0:
pids = result.stdout.strip().split('\n')
console.print(f"[green]✓ bouncer 실행 중 (PID: {', '.join(pids)})[/green]")
else:
console.print("[red]✗ bouncer가 실행되지 않음[/red]")
result = run_ssh(["cscli", "bouncers", "list"], server=SERVER_CROWDSEC)
if result.returncode == 0:
console.print("\n[bold]CrowdSec Bouncer 상태:[/bold]")
console.print(result.stdout)
config = get_config()
zones = get_zones(config)
console.print(f"\n[bold]보호 도메인 수:[/bold] {len(zones)}")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def available():
"""Cloudflare에서 추가 가능한 도메인 목록"""
try:
config = get_config()
token = get_cf_token(config)
if not token:
console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]")
raise typer.Exit(1)
console.print("[dim]Cloudflare에서 도메인 목록 조회 중...[/dim]")
cf_zones = get_all_zones_from_cf(token)
protected_zones = get_zones(config)
protected_ids = {z.get("zone_id") for z in protected_zones}
table = Table(title="Cloudflare 도메인 목록")
table.add_column("도메인", style="cyan")
table.add_column("Zone ID", style="dim")
table.add_column("상태", style="green")
table.add_column("보호", style="yellow")
for zone in cf_zones:
zone_id = zone.get("id", "")
is_protected = "✓ 보호중" if zone_id in protected_ids else "-"
status_style = "green" if zone.get("status") == "active" else "yellow"
table.add_row(
zone.get("name", "N/A"),
zone_id[:12] + "...",
f"[{status_style}]{zone.get('status', 'N/A')}[/{status_style}]",
is_protected,
)
console.print(table)
console.print(f"\n총 [bold]{len(cf_zones)}[/bold]개 도메인, [bold]{len(protected_ids)}[/bold]개 보호 중")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def logs(
lines: Annotated[int, typer.Option("-n", "--lines", help="표시할 줄 수")] = 50,
follow: Annotated[bool, typer.Option("-f", "--follow", help="실시간 로그 추적")] = False,
):
"""bouncer 로그 조회"""
try:
config = get_ssh_config(SERVER_BOUNCER)
host = config["host"]
if "@" not in host:
host = f"{config['user']}@{host}"
if follow:
console.print("[dim]로그 추적 중... (Ctrl+C로 종료)[/dim]\n")
ssh_cmd = [
"ssh",
"-o", "BatchMode=yes",
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "LogLevel=ERROR",
"-p", config["port"],
]
if config.get("key"):
ssh_cmd.extend(["-i", config["key"]])
ssh_cmd.extend([host, "journalctl -u crowdsec-cloudflare-worker-bouncer -f --no-pager"])
subprocess.run(ssh_cmd, timeout=None)
else:
# journalctl 시도, 실패시 syslog에서 grep
result = run_ssh(["journalctl", "-u", "crowdsec-cloudflare-worker-bouncer", "-n", str(lines), "--no-pager"], server=SERVER_BOUNCER)
if result.returncode == 0 and result.stdout.strip():
console.print(result.stdout)
else:
result = run_ssh(["sh", "-c", f"grep -i bouncer /var/log/syslog | tail -n {lines}"], server=SERVER_BOUNCER)
if result.returncode == 0 and result.stdout.strip():
console.print(result.stdout)
else:
console.print("[yellow]로그를 찾을 수 없습니다[/yellow]")
except KeyboardInterrupt:
console.print("\n[dim]로그 추적 종료[/dim]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def decisions(
limit: Annotated[int, typer.Option("-n", "--limit", help="표시할 개수")] = 20,
):
"""CrowdSec 현재 차단 결정 조회"""
try:
result = run_ssh(
["cscli", "decisions", "list", "-o", "raw", "--limit", str(limit)],
server=SERVER_CROWDSEC,
)
if result.returncode == 0:
if result.stdout.strip():
console.print("[bold]현재 차단 결정:[/bold]\n")
console.print(result.stdout)
else:
console.print("[green]현재 활성 차단 결정이 없습니다[/green]")
else:
console.print(f"[yellow]결정 조회 실패: {result.stderr}[/yellow]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def metrics():
"""bouncer Prometheus 메트릭 조회"""
try:
config = get_config()
prometheus_cfg = config.get("prometheus", {})
if not prometheus_cfg.get("enabled"):
console.print("[yellow]Prometheus 메트릭이 비활성화되어 있습니다[/yellow]")
return
addr = prometheus_cfg.get("listen_addr", "127.0.0.1")
port = prometheus_cfg.get("listen_port", "2112")
result = run_ssh(["curl", "-s", f"http://{addr}:{port}/metrics"], server=SERVER_BOUNCER)
if result.returncode == 0:
# 주요 메트릭만 필터링
lines = result.stdout.split('\n')
important_metrics = [l for l in lines if l and not l.startswith('#') and 'crowdsec' in l.lower()]
if important_metrics:
console.print("[bold]Bouncer 메트릭:[/bold]\n")
for line in important_metrics[:20]:
console.print(line)
else:
console.print("[dim]메트릭 수집 중... 잠시 후 다시 시도하세요[/dim]")
else:
console.print("[yellow]메트릭을 가져올 수 없습니다[/yellow]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def backup():
"""현재 설정 백업"""
try:
config = get_config()
backup_file = backup_config(config, reason="manual")
log_action("backup", str(backup_file))
console.print(f"[green]✓ 백업 완료: {backup_file}[/green]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def restore(
backup_file: Annotated[Optional[str], typer.Argument(help="복원할 백업 파일 경로")] = None,
):
"""백업에서 설정 복원"""
try:
if backup_file is None:
backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True)
if not backups:
console.print("[yellow]백업 파일이 없습니다[/yellow]")
raise typer.Exit(1)
table = Table(title="백업 파일 목록")
table.add_column("#", style="dim", width=3)
table.add_column("파일명", style="cyan")
table.add_column("날짜", style="green")
for i, b in enumerate(backups[:10], 1):
mtime = datetime.fromtimestamp(b.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S")
table.add_row(str(i), b.name, mtime)
console.print(table)
console.print("\n[dim]복원하려면: cfb restore <파일명>[/dim]")
return
backup_path = Path(backup_file)
if not backup_path.exists():
backup_path = BACKUP_DIR / backup_file
if not backup_path.exists():
console.print(f"[red]백업 파일을 찾을 수 없습니다: {backup_file}[/red]")
raise typer.Exit(1)
config = yaml.safe_load(backup_path.read_text())
validate_config(config)
if not typer.confirm(f"{backup_path.name}에서 복원하시겠습니까?"):
console.print("[yellow]취소됨[/yellow]")
raise typer.Exit(0)
current_config = get_config()
backup_config(current_config, reason="before_restore")
yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
result = run_ssh(["sh", "-c", f"cat > {CONFIG_PATH} << 'EOFCONFIG'\n{yaml_content}\nEOFCONFIG"], server=SERVER_BOUNCER)
if result.returncode != 0:
raise BouncerError(f"복원 실패: {result.stderr}")
log_action("restore", backup_path.name)
console.print(f"[green]✓ 복원 완료: {backup_path.name}[/green]")
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
except yaml.YAMLError as e:
console.print(f"[red]백업 파일 파싱 실패: {e}[/red]")
raise typer.Exit(1)
@app.command()
def diff(
backup_file: Annotated[Optional[str], typer.Argument(help="비교할 백업 파일")] = None,
):
"""현재 설정과 백업 비교"""
try:
if backup_file is None:
backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True)
if not backups:
console.print("[yellow]백업 파일이 없습니다[/yellow]")
raise typer.Exit(1)
backup_path = backups[0]
console.print(f"[dim]최근 백업과 비교: {backup_path.name}[/dim]\n")
else:
backup_path = Path(backup_file)
if not backup_path.exists():
backup_path = BACKUP_DIR / backup_file
if not backup_path.exists():
console.print(f"[red]백업 파일을 찾을 수 없습니다: {backup_file}[/red]")
raise typer.Exit(1)
config = get_config()
current_yaml = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
backup_yaml = backup_path.read_text()
diff_lines = list(unified_diff(
backup_yaml.splitlines(keepends=True),
current_yaml.splitlines(keepends=True),
fromfile=f"백업: {backup_path.name}",
tofile="현재 설정",
))
if diff_lines:
for line in diff_lines:
if line.startswith('+') and not line.startswith('+++'):
console.print(f"[green]{line.rstrip()}[/green]")
elif line.startswith('-') and not line.startswith('---'):
console.print(f"[red]{line.rstrip()}[/red]")
elif line.startswith('@@'):
console.print(f"[cyan]{line.rstrip()}[/cyan]")
else:
console.print(line.rstrip())
else:
console.print("[green]변경사항 없음[/green]")
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command("export")
def export_config(
output: Annotated[Optional[str], typer.Option("-o", "--output", help="출력 파일 경로")] = None,
format: Annotated[str, typer.Option("-f", "--format", help="출력 형식 (yaml/json)")] = "yaml",
):
"""설정을 파일로 내보내기"""
try:
config = get_config()
zones = get_zones(config)
# 도메인 목록만 추출
export_data = {
"domains": [
{
"domain": extract_domain(z.get("routes_to_protect", [""])[0]),
"zone_id": z.get("zone_id"),
"action": z.get("default_action"),
"turnstile_enabled": z.get("turnstile", {}).get("enabled"),
"turnstile_mode": z.get("turnstile", {}).get("mode"),
}
for z in zones
],
"exported_at": datetime.now().isoformat(),
"total": len(zones),
}
if format == "json":
import json
content = json.dumps(export_data, indent=2, ensure_ascii=False)
else:
content = yaml.dump(export_data, default_flow_style=False, allow_unicode=True)
if output:
Path(output).write_text(content)
console.print(f"[green]✓ 내보내기 완료: {output}[/green]")
else:
console.print(content)
except BouncerError as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
@app.command()
def history(
lines: Annotated[int, typer.Option("-n", "--lines", help="표시할 줄 수")] = 20,
):
"""변경 이력 조회"""
try:
if not LOG_FILE.exists():
console.print("[yellow]변경 이력이 없습니다[/yellow]")
return
content = LOG_FILE.read_text()
history_lines = content.strip().split('\n')
console.print("[bold]변경 이력:[/bold]\n")
for line in history_lines[-lines:]:
console.print(line)
except Exception as e:
console.print(f"[red]에러: {e}[/red]")
raise typer.Exit(1)
if __name__ == "__main__":
app()