#!/usr/bin/env python3 """CrowdSec Cloudflare Worker Bouncer 도메인 관리 CLI""" import subprocess from datetime import datetime from difflib import unified_diff from pathlib import Path from typing import Annotated, Optional import requests import typer import yaml from rich.console import Console from rich.table import Table __version__ = "1.1.0" app = typer.Typer(help="CrowdSec Cloudflare Bouncer 도메인 관리") console = Console() # 설정 CONFIG_PATH = "/etc/crowdsec/bouncers/crowdsec-cloudflare-worker-bouncer.yaml" CONTAINER_NAME = "cs-cf-worker-bouncer" CROWDSEC_CONTAINER = "crowdsec" BACKUP_DIR = Path.home() / "cf-bouncer-manager" / "backups" LOG_FILE = Path.home() / "cf-bouncer-manager" / "history.log" REQUEST_TIMEOUT = 30 SUBPROCESS_TIMEOUT = 60 class BouncerError(Exception): """Bouncer 관련 에러""" pass def log_action(action: str, details: str = "") -> None: """변경 이력 로깅""" LOG_FILE.parent.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(LOG_FILE, "a") as f: f.write(f"[{timestamp}] {action}: {details}\n") def run_incus(cmd: list[str], container: str = CONTAINER_NAME, capture: bool = True, timeout: int = SUBPROCESS_TIMEOUT) -> subprocess.CompletedProcess: """incus exec 명령 실행""" full_cmd = ["incus", "exec", container, "--"] + cmd try: return subprocess.run(full_cmd, capture_output=capture, text=True, timeout=timeout) except subprocess.TimeoutExpired: raise BouncerError(f"명령 실행 시간 초과: {' '.join(cmd)}") except Exception as e: raise BouncerError(f"명령 실행 실패: {e}") def get_config() -> dict: """bouncer 설정 파일 읽기""" result = run_incus(["cat", CONFIG_PATH]) if result.returncode != 0: raise BouncerError(f"설정 파일을 읽을 수 없습니다: {result.stderr}") try: return yaml.safe_load(result.stdout) except yaml.YAMLError as e: raise BouncerError(f"YAML 파싱 실패: {e}") def validate_config(config: dict) -> bool: """설정 유효성 검사""" required_keys = ["cloudflare_config", "crowdsec_config"] for key in required_keys: if key not in config: raise BouncerError(f"필수 설정 누락: {key}") cf_config = config.get("cloudflare_config", {}) if not cf_config.get("accounts"): raise BouncerError("Cloudflare 계정 설정이 없습니다") return True def backup_config(config: dict, reason: str = "manual") -> Path: """설정 백업""" BACKUP_DIR.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = BACKUP_DIR / f"config_{timestamp}_{reason}.yaml" yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False) backup_file.write_text(yaml_content) # 오래된 백업 정리 (최근 20개만 유지) backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True) for old_backup in backups[20:]: old_backup.unlink() return backup_file def save_config(config: dict, reason: str = "update") -> None: """bouncer 설정 파일 저장 (백업 후)""" validate_config(config) backup_file = backup_config(config, reason) console.print(f"[dim]백업 저장: {backup_file}[/dim]") yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False) result = run_incus(["sh", "-c", f"cat > {CONFIG_PATH} << 'EOFCONFIG'\n{yaml_content}\nEOFCONFIG"]) if result.returncode != 0: raise BouncerError(f"설정 파일 저장 실패: {result.stderr}") log_action(reason, f"설정 파일 업데이트") def get_cf_token(config: dict) -> str: """설정에서 Cloudflare API 토큰 가져오기""" accounts = config.get("cloudflare_config", {}).get("accounts", []) if accounts: return accounts[0].get("token", "") return "" def get_zone_id(domain: str, token: str) -> Optional[str]: """Cloudflare API로 도메인의 zone_id 조회""" headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} try: resp = requests.get( f"https://api.cloudflare.com/client/v4/zones?name={domain}", headers=headers, timeout=REQUEST_TIMEOUT, ) if resp.status_code == 200: data = resp.json() if data.get("success") and data.get("result"): return data["result"][0]["id"] except requests.RequestException as e: raise BouncerError(f"Cloudflare API 요청 실패: {e}") return None def get_all_zones_from_cf(token: str) -> list[dict]: """Cloudflare 계정의 모든 zone 조회""" headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} all_zones = [] page = 1 try: while True: resp = requests.get( f"https://api.cloudflare.com/client/v4/zones?per_page=50&page={page}", headers=headers, timeout=REQUEST_TIMEOUT, ) if resp.status_code != 200: break data = resp.json() if not data.get("success"): break zones = data.get("result", []) if not zones: break all_zones.extend(zones) if len(zones) < 50: break page += 1 except requests.RequestException as e: raise BouncerError(f"Cloudflare API 요청 실패: {e}") return all_zones def get_zones(config: dict) -> list[dict]: """설정에서 zones 목록 가져오기""" accounts = config.get("cloudflare_config", {}).get("accounts", []) if accounts: return accounts[0].get("zones", []) return [] def extract_domain(route: str) -> str: """routes_to_protect에서 도메인 추출""" return route.strip("*").strip("/") def find_zone_by_domain(zones: list[dict], domain: str) -> tuple[int, dict] | tuple[None, None]: """도메인으로 zone 찾기""" for i, zone in enumerate(zones): routes = zone.get("routes_to_protect", []) if routes and domain in routes[0]: return i, zone return None, None def do_apply() -> bool: """bouncer 서비스 재시작""" result = run_incus(["systemctl", "restart", "crowdsec-cloudflare-worker-bouncer"]) if result.returncode != 0: run_incus(["pkill", "-f", "crowdsec-cloudflare-worker-bouncer"]) run_incus([ "sh", "-c", f"nohup crowdsec-cloudflare-worker-bouncer -c {CONFIG_PATH} > /var/log/bouncer.log 2>&1 &" ]) # 잠시 대기 후 프로세스 확인 import time time.sleep(2) result = run_incus(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"]) return result.returncode == 0 def version_callback(value: bool): if value: console.print(f"cfb version {__version__}") raise typer.Exit() # ============ CLI Commands ============ @app.callback() def main( version: Annotated[Optional[bool], typer.Option("--version", "-v", callback=version_callback, is_eager=True, help="버전 표시")] = None, ): """CrowdSec Cloudflare Bouncer 도메인 관리 CLI""" pass @app.command("list") def list_domains(): """보호 중인 도메인 목록 표시""" try: config = get_config() zones = get_zones(config) table = Table(title="보호 중인 도메인") table.add_column("#", style="dim", width=3) table.add_column("도메인", style="cyan") table.add_column("Zone ID", style="dim") table.add_column("액션", style="green") table.add_column("Turnstile", style="yellow") table.add_column("모드", style="magenta") for i, zone in enumerate(zones, 1): routes = zone.get("routes_to_protect", []) domain = extract_domain(routes[0]) if routes else "N/A" zone_id = zone.get("zone_id", "N/A") action = zone.get("default_action", "N/A") turnstile_cfg = zone.get("turnstile", {}) turnstile = "활성" if turnstile_cfg.get("enabled") else "비활성" mode = turnstile_cfg.get("mode", "N/A") if turnstile_cfg.get("enabled") else "-" table.add_row(str(i), domain, zone_id[:12] + "...", action, turnstile, mode) console.print(table) console.print(f"\n총 [bold]{len(zones)}[/bold]개 도메인 보호 중") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def show(domain: str = typer.Argument(..., help="조회할 도메인")): """특정 도메인 상세 정보""" try: config = get_config() zones = get_zones(config) idx, zone = find_zone_by_domain(zones, domain) if zone is None: console.print(f"[red]{domain}을 찾을 수 없습니다[/red]") raise typer.Exit(1) routes = zone.get("routes_to_protect", []) turnstile = zone.get("turnstile", {}) console.print(f"\n[bold cyan]{'='*60}[/bold cyan]") console.print(f"[bold cyan]{domain} 상세 정보[/bold cyan]") console.print(f"[bold cyan]{'='*60}[/bold cyan]\n") console.print(f"[bold]도메인:[/bold] {extract_domain(routes[0]) if routes else 'N/A'}") console.print(f"[bold]Zone ID:[/bold] {zone.get('zone_id', 'N/A')}") console.print(f"[bold]액션:[/bold] {zone.get('default_action', 'N/A')}") console.print(f"[bold]가능한 액션:[/bold] {', '.join(zone.get('actions', []))}") console.print(f"[bold]보호 경로:[/bold] {', '.join(routes)}") console.print(f"\n[bold yellow]Turnstile 설정[/bold yellow]") console.print(f" 활성화: {turnstile.get('enabled', False)}") console.print(f" 모드: {turnstile.get('mode', 'N/A')}") console.print(f" 키 자동 교체: {turnstile.get('rotate_secret_key', False)}") console.print(f" 교체 주기: {turnstile.get('rotate_secret_key_every', 'N/A')}") console.print() except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def add( domain: Annotated[str, typer.Argument(help="추가할 도메인 (예: example.com)")], action: Annotated[str, typer.Option(help="기본 액션 (captcha/ban)")] = "captcha", turnstile: Annotated[bool, typer.Option(help="Turnstile 활성화")] = True, mode: Annotated[str, typer.Option(help="Turnstile 모드 (managed/invisible/visible)")] = "managed", auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="추가 후 자동 적용")] = False, dry_run: Annotated[bool, typer.Option("--dry-run", help="실제 변경 없이 미리보기")] = False, ): """새 도메인 추가""" try: config = get_config() token = get_cf_token(config) if not token: console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]") raise typer.Exit(1) zones = get_zones(config) idx, _ = find_zone_by_domain(zones, domain) if idx is not None: console.print(f"[yellow]{domain}은 이미 보호 목록에 있습니다[/yellow]") raise typer.Exit(1) console.print(f"[dim]Cloudflare에서 {domain}의 zone_id 조회 중...[/dim]") zone_id = get_zone_id(domain, token) if not zone_id: console.print(f"[red]{domain}의 zone_id를 찾을 수 없습니다.[/red]") console.print("[dim]'cfb available' 명령으로 추가 가능한 도메인을 확인하세요.[/dim]") raise typer.Exit(1) console.print(f"[green]zone_id 확인: {zone_id}[/green]") new_zone = { "zone_id": zone_id, "actions": [action], "default_action": action, "routes_to_protect": [f"*{domain}/*"], "turnstile": { "enabled": turnstile, "rotate_secret_key": True, "rotate_secret_key_every": "168h0m0s", "mode": mode, }, } if dry_run: console.print("\n[yellow]== DRY RUN ==[/yellow]") console.print(yaml.dump(new_zone, default_flow_style=False)) return config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone) save_config(config, reason=f"add_{domain}") log_action("add", domain) console.print(f"[green]✓ {domain} 추가 완료[/green]") if auto_apply: console.print("[dim]설정 적용 중...[/dim]") if do_apply(): console.print("[green]✓ 적용 완료[/green]") else: console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]") else: console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def edit( domain: Annotated[str, typer.Argument(help="수정할 도메인")], action: Annotated[Optional[str], typer.Option(help="기본 액션 변경")] = None, turnstile: Annotated[Optional[bool], typer.Option(help="Turnstile 활성화/비활성화")] = None, mode: Annotated[Optional[str], typer.Option(help="Turnstile 모드 변경")] = None, auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="수정 후 자동 적용")] = False, ): """기존 도메인 설정 수정""" try: config = get_config() zones = get_zones(config) idx, zone = find_zone_by_domain(zones, domain) if zone is None: console.print(f"[red]{domain}을 찾을 수 없습니다[/red]") raise typer.Exit(1) changes = [] if action is not None: zone["default_action"] = action zone["actions"] = [action] changes.append(f"액션: {action}") if turnstile is not None: zone["turnstile"]["enabled"] = turnstile changes.append(f"Turnstile: {'활성' if turnstile else '비활성'}") if mode is not None: zone["turnstile"]["mode"] = mode changes.append(f"모드: {mode}") if not changes: console.print("[yellow]변경할 내용이 없습니다. --action, --turnstile, --mode 옵션을 사용하세요.[/yellow]") raise typer.Exit(0) config["cloudflare_config"]["accounts"][0]["zones"][idx] = zone save_config(config, reason=f"edit_{domain}") log_action("edit", f"{domain}: {', '.join(changes)}") console.print(f"[green]✓ {domain} 수정 완료[/green]") for change in changes: console.print(f" - {change}") if auto_apply: console.print("[dim]설정 적용 중...[/dim]") if do_apply(): console.print("[green]✓ 적용 완료[/green]") else: console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]") else: console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def remove( domain: Annotated[str, typer.Argument(help="제거할 도메인")], force: Annotated[bool, typer.Option("--force", "-f", help="확인 없이 제거")] = False, auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="제거 후 자동 적용")] = False, ): """도메인 제거""" try: config = get_config() zones = get_zones(config) idx, zone = find_zone_by_domain(zones, domain) if idx is None: console.print(f"[red]{domain}을 찾을 수 없습니다[/red]") raise typer.Exit(1) if not force and not typer.confirm(f"{domain}을 정말 제거하시겠습니까?"): console.print("[yellow]취소됨[/yellow]") raise typer.Exit(0) del config["cloudflare_config"]["accounts"][0]["zones"][idx] save_config(config, reason=f"remove_{domain}") log_action("remove", domain) console.print(f"[green]✓ {domain} 제거 완료[/green]") if auto_apply: console.print("[dim]설정 적용 중...[/dim]") if do_apply(): console.print("[green]✓ 적용 완료[/green]") else: console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]") else: console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def sync( auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="추가 후 자동 적용")] = False, dry_run: Annotated[bool, typer.Option("--dry-run", help="실제 변경 없이 미리보기")] = False, ): """Cloudflare의 모든 도메인을 보호 목록에 추가""" try: config = get_config() token = get_cf_token(config) if not token: console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]") raise typer.Exit(1) console.print("[dim]Cloudflare에서 도메인 목록 조회 중...[/dim]") cf_zones = get_all_zones_from_cf(token) protected_zones = get_zones(config) protected_ids = {z.get("zone_id") for z in protected_zones} # 추가할 도메인 찾기 to_add = [] for cf_zone in cf_zones: if cf_zone.get("id") not in protected_ids and cf_zone.get("status") == "active": to_add.append(cf_zone) if not to_add: console.print("[green]모든 도메인이 이미 보호 목록에 있습니다[/green]") return console.print(f"\n[bold]추가할 도메인 ({len(to_add)}개):[/bold]") for z in to_add: console.print(f" - {z.get('name')}") if dry_run: console.print("\n[yellow]== DRY RUN - 실제 변경 없음 ==[/yellow]") return if not typer.confirm(f"\n{len(to_add)}개 도메인을 추가하시겠습니까?"): console.print("[yellow]취소됨[/yellow]") return added = [] for cf_zone in to_add: new_zone = { "zone_id": cf_zone.get("id"), "actions": ["captcha"], "default_action": "captcha", "routes_to_protect": [f"*{cf_zone.get('name')}/*"], "turnstile": { "enabled": True, "rotate_secret_key": True, "rotate_secret_key_every": "168h0m0s", "mode": "managed", }, } config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone) added.append(cf_zone.get("name")) save_config(config, reason="sync") log_action("sync", f"Added {len(added)} domains: {', '.join(added)}") console.print(f"[green]✓ {len(added)}개 도메인 추가 완료[/green]") if auto_apply: console.print("[dim]설정 적용 중...[/dim]") if do_apply(): console.print("[green]✓ 적용 완료[/green]") else: console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]") else: console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def apply(): """설정 적용 (bouncer 서비스 재시작)""" console.print("[dim]bouncer 서비스 재시작 중...[/dim]") try: if do_apply(): console.print("[green]✓ 설정 적용 완료[/green]") result = run_incus(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"]) if result.returncode == 0: console.print(f"[green]bouncer PID: {result.stdout.strip()}[/green]") log_action("apply", "Service restarted") else: console.print("[red]✗ bouncer 프로세스를 시작할 수 없습니다[/red]") raise typer.Exit(1) except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def status(): """bouncer 상태 확인""" try: result = run_incus(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"]) if result.returncode == 0: pids = result.stdout.strip().split('\n') console.print(f"[green]✓ bouncer 실행 중 (PID: {', '.join(pids)})[/green]") else: console.print("[red]✗ bouncer가 실행되지 않음[/red]") result = subprocess.run( ["incus", "exec", CROWDSEC_CONTAINER, "--", "cscli", "bouncers", "list"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT, ) if result.returncode == 0: console.print("\n[bold]CrowdSec Bouncer 상태:[/bold]") console.print(result.stdout) config = get_config() zones = get_zones(config) console.print(f"\n[bold]보호 도메인 수:[/bold] {len(zones)}") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def available(): """Cloudflare에서 추가 가능한 도메인 목록""" try: config = get_config() token = get_cf_token(config) if not token: console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]") raise typer.Exit(1) console.print("[dim]Cloudflare에서 도메인 목록 조회 중...[/dim]") cf_zones = get_all_zones_from_cf(token) protected_zones = get_zones(config) protected_ids = {z.get("zone_id") for z in protected_zones} table = Table(title="Cloudflare 도메인 목록") table.add_column("도메인", style="cyan") table.add_column("Zone ID", style="dim") table.add_column("상태", style="green") table.add_column("보호", style="yellow") for zone in cf_zones: zone_id = zone.get("id", "") is_protected = "✓ 보호중" if zone_id in protected_ids else "-" status_style = "green" if zone.get("status") == "active" else "yellow" table.add_row( zone.get("name", "N/A"), zone_id[:12] + "...", f"[{status_style}]{zone.get('status', 'N/A')}[/{status_style}]", is_protected, ) console.print(table) console.print(f"\n총 [bold]{len(cf_zones)}[/bold]개 도메인, [bold]{len(protected_ids)}[/bold]개 보호 중") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def logs( lines: Annotated[int, typer.Option("-n", "--lines", help="표시할 줄 수")] = 50, follow: Annotated[bool, typer.Option("-f", "--follow", help="실시간 로그 추적")] = False, ): """bouncer 로그 조회""" try: if follow: console.print("[dim]로그 추적 중... (Ctrl+C로 종료)[/dim]\n") subprocess.run( ["incus", "exec", CONTAINER_NAME, "--", "journalctl", "-u", "crowdsec-cloudflare-worker-bouncer", "-f", "--no-pager"], timeout=None, ) else: # journalctl 시도, 실패시 syslog에서 grep result = run_incus(["journalctl", "-u", "crowdsec-cloudflare-worker-bouncer", "-n", str(lines), "--no-pager"]) if result.returncode == 0 and result.stdout.strip(): console.print(result.stdout) else: result = run_incus(["sh", "-c", f"grep -i bouncer /var/log/syslog | tail -n {lines}"]) if result.returncode == 0 and result.stdout.strip(): console.print(result.stdout) else: console.print("[yellow]로그를 찾을 수 없습니다[/yellow]") except KeyboardInterrupt: console.print("\n[dim]로그 추적 종료[/dim]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def decisions( limit: Annotated[int, typer.Option("-n", "--limit", help="표시할 개수")] = 20, ): """CrowdSec 현재 차단 결정 조회""" try: result = run_incus( ["cscli", "decisions", "list", "-o", "raw", "--limit", str(limit)], container=CROWDSEC_CONTAINER, ) if result.returncode == 0: if result.stdout.strip(): console.print("[bold]현재 차단 결정:[/bold]\n") console.print(result.stdout) else: console.print("[green]현재 활성 차단 결정이 없습니다[/green]") else: console.print(f"[yellow]결정 조회 실패: {result.stderr}[/yellow]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def metrics(): """bouncer Prometheus 메트릭 조회""" try: config = get_config() prometheus_cfg = config.get("prometheus", {}) if not prometheus_cfg.get("enabled"): console.print("[yellow]Prometheus 메트릭이 비활성화되어 있습니다[/yellow]") return addr = prometheus_cfg.get("listen_addr", "127.0.0.1") port = prometheus_cfg.get("listen_port", "2112") result = run_incus(["curl", "-s", f"http://{addr}:{port}/metrics"]) if result.returncode == 0: # 주요 메트릭만 필터링 lines = result.stdout.split('\n') important_metrics = [l for l in lines if l and not l.startswith('#') and 'crowdsec' in l.lower()] if important_metrics: console.print("[bold]Bouncer 메트릭:[/bold]\n") for line in important_metrics[:20]: console.print(line) else: console.print("[dim]메트릭 수집 중... 잠시 후 다시 시도하세요[/dim]") else: console.print("[yellow]메트릭을 가져올 수 없습니다[/yellow]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def backup(): """현재 설정 백업""" try: config = get_config() backup_file = backup_config(config, reason="manual") log_action("backup", str(backup_file)) console.print(f"[green]✓ 백업 완료: {backup_file}[/green]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def restore( backup_file: Annotated[Optional[str], typer.Argument(help="복원할 백업 파일 경로")] = None, ): """백업에서 설정 복원""" try: if backup_file is None: backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True) if not backups: console.print("[yellow]백업 파일이 없습니다[/yellow]") raise typer.Exit(1) table = Table(title="백업 파일 목록") table.add_column("#", style="dim", width=3) table.add_column("파일명", style="cyan") table.add_column("날짜", style="green") for i, b in enumerate(backups[:10], 1): mtime = datetime.fromtimestamp(b.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S") table.add_row(str(i), b.name, mtime) console.print(table) console.print("\n[dim]복원하려면: cfb restore <파일명>[/dim]") return backup_path = Path(backup_file) if not backup_path.exists(): backup_path = BACKUP_DIR / backup_file if not backup_path.exists(): console.print(f"[red]백업 파일을 찾을 수 없습니다: {backup_file}[/red]") raise typer.Exit(1) config = yaml.safe_load(backup_path.read_text()) validate_config(config) if not typer.confirm(f"{backup_path.name}에서 복원하시겠습니까?"): console.print("[yellow]취소됨[/yellow]") raise typer.Exit(0) current_config = get_config() backup_config(current_config, reason="before_restore") yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False) result = run_incus(["sh", "-c", f"cat > {CONFIG_PATH} << 'EOFCONFIG'\n{yaml_content}\nEOFCONFIG"]) if result.returncode != 0: raise BouncerError(f"복원 실패: {result.stderr}") log_action("restore", backup_path.name) console.print(f"[green]✓ 복원 완료: {backup_path.name}[/green]") console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) except yaml.YAMLError as e: console.print(f"[red]백업 파일 파싱 실패: {e}[/red]") raise typer.Exit(1) @app.command() def diff( backup_file: Annotated[Optional[str], typer.Argument(help="비교할 백업 파일")] = None, ): """현재 설정과 백업 비교""" try: if backup_file is None: backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True) if not backups: console.print("[yellow]백업 파일이 없습니다[/yellow]") raise typer.Exit(1) backup_path = backups[0] console.print(f"[dim]최근 백업과 비교: {backup_path.name}[/dim]\n") else: backup_path = Path(backup_file) if not backup_path.exists(): backup_path = BACKUP_DIR / backup_file if not backup_path.exists(): console.print(f"[red]백업 파일을 찾을 수 없습니다: {backup_file}[/red]") raise typer.Exit(1) config = get_config() current_yaml = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False) backup_yaml = backup_path.read_text() diff_lines = list(unified_diff( backup_yaml.splitlines(keepends=True), current_yaml.splitlines(keepends=True), fromfile=f"백업: {backup_path.name}", tofile="현재 설정", )) if diff_lines: for line in diff_lines: if line.startswith('+') and not line.startswith('+++'): console.print(f"[green]{line.rstrip()}[/green]") elif line.startswith('-') and not line.startswith('---'): console.print(f"[red]{line.rstrip()}[/red]") elif line.startswith('@@'): console.print(f"[cyan]{line.rstrip()}[/cyan]") else: console.print(line.rstrip()) else: console.print("[green]변경사항 없음[/green]") except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command("export") def export_config( output: Annotated[Optional[str], typer.Option("-o", "--output", help="출력 파일 경로")] = None, format: Annotated[str, typer.Option("-f", "--format", help="출력 형식 (yaml/json)")] = "yaml", ): """설정을 파일로 내보내기""" try: config = get_config() zones = get_zones(config) # 도메인 목록만 추출 export_data = { "domains": [ { "domain": extract_domain(z.get("routes_to_protect", [""])[0]), "zone_id": z.get("zone_id"), "action": z.get("default_action"), "turnstile_enabled": z.get("turnstile", {}).get("enabled"), "turnstile_mode": z.get("turnstile", {}).get("mode"), } for z in zones ], "exported_at": datetime.now().isoformat(), "total": len(zones), } if format == "json": import json content = json.dumps(export_data, indent=2, ensure_ascii=False) else: content = yaml.dump(export_data, default_flow_style=False, allow_unicode=True) if output: Path(output).write_text(content) console.print(f"[green]✓ 내보내기 완료: {output}[/green]") else: console.print(content) except BouncerError as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) @app.command() def history( lines: Annotated[int, typer.Option("-n", "--lines", help="표시할 줄 수")] = 20, ): """변경 이력 조회""" try: if not LOG_FILE.exists(): console.print("[yellow]변경 이력이 없습니다[/yellow]") return content = LOG_FILE.read_text() history_lines = content.strip().split('\n') console.print("[bold]변경 이력:[/bold]\n") for line in history_lines[-lines:]: console.print(line) except Exception as e: console.print(f"[red]에러: {e}[/red]") raise typer.Exit(1) if __name__ == "__main__": app()