- Add FastAPI-based REST API server (api_server.py) - Add Dockerfile and docker-compose.yaml for containerized deployment - Add Gitea Actions CI workflow for building and pushing images - Refactor CLI to support dual-server SSH (bouncer + crowdsec) - Update dependencies with FastAPI and uvicorn - Update CLAUDE.md and README.md with full documentation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
991 lines
36 KiB
Python
Executable File
991 lines
36 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""CrowdSec Cloudflare Worker Bouncer 도메인 관리 CLI"""
|
|
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
from datetime import datetime
|
|
from difflib import unified_diff
|
|
from pathlib import Path
|
|
from typing import Annotated, Optional
|
|
|
|
import requests
|
|
import typer
|
|
import yaml
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
|
|
__version__ = "1.3.0"
|
|
|
|
app = typer.Typer(help="CrowdSec Cloudflare Bouncer 도메인 관리")
|
|
console = Console()
|
|
|
|
# 설정
|
|
CONFIG_PATH = "/etc/crowdsec/bouncers/crowdsec-cloudflare-worker-bouncer.yaml"
|
|
BACKUP_DIR = Path.home() / "cf-bouncer-manager" / "backups"
|
|
LOG_FILE = Path.home() / "cf-bouncer-manager" / "history.log"
|
|
REQUEST_TIMEOUT = 30
|
|
SUBPROCESS_TIMEOUT = 60
|
|
|
|
# 서버 식별자 (논리적 이름)
|
|
SERVER_BOUNCER = "bouncer"
|
|
SERVER_CROWDSEC = "crowdsec"
|
|
|
|
# SSH 설정 (환경변수) - 각 서버별 설정
|
|
# Bouncer 서버 (crowdsec-cloudflare-worker-bouncer가 실행되는 서버)
|
|
SSH_BOUNCER_HOST = os.environ.get("CFB_BOUNCER_HOST") # user@hostname 또는 hostname
|
|
SSH_BOUNCER_PORT = os.environ.get("CFB_BOUNCER_PORT", "22")
|
|
SSH_BOUNCER_KEY = os.environ.get("CFB_BOUNCER_KEY") # SSH 키 경로 (선택)
|
|
SSH_BOUNCER_USER = os.environ.get("CFB_BOUNCER_USER", "root") # SSH 사용자
|
|
|
|
# CrowdSec 서버 (CrowdSec 엔진이 실행되는 서버)
|
|
SSH_CROWDSEC_HOST = os.environ.get("CFB_CROWDSEC_HOST")
|
|
SSH_CROWDSEC_PORT = os.environ.get("CFB_CROWDSEC_PORT", "22")
|
|
SSH_CROWDSEC_KEY = os.environ.get("CFB_CROWDSEC_KEY")
|
|
SSH_CROWDSEC_USER = os.environ.get("CFB_CROWDSEC_USER", "root")
|
|
|
|
# SSH 설정 매핑
|
|
SSH_CONFIGS = {
|
|
SERVER_BOUNCER: {
|
|
"host": SSH_BOUNCER_HOST,
|
|
"port": SSH_BOUNCER_PORT,
|
|
"key": SSH_BOUNCER_KEY,
|
|
"user": SSH_BOUNCER_USER,
|
|
},
|
|
SERVER_CROWDSEC: {
|
|
"host": SSH_CROWDSEC_HOST,
|
|
"port": SSH_CROWDSEC_PORT,
|
|
"key": SSH_CROWDSEC_KEY,
|
|
"user": SSH_CROWDSEC_USER,
|
|
},
|
|
}
|
|
|
|
|
|
class BouncerError(Exception):
|
|
"""Bouncer 관련 에러"""
|
|
pass
|
|
|
|
|
|
def log_action(action: str, details: str = "") -> None:
|
|
"""변경 이력 로깅"""
|
|
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
with open(LOG_FILE, "a") as f:
|
|
f.write(f"[{timestamp}] {action}: {details}\n")
|
|
|
|
|
|
def get_ssh_config(server: str) -> dict:
|
|
"""서버별 SSH 설정 반환"""
|
|
config = SSH_CONFIGS.get(server)
|
|
if not config or not config.get("host"):
|
|
raise BouncerError(f"SSH 설정이 없습니다: {server}\n환경변수 CFB_{server.upper()}_HOST를 설정하세요.")
|
|
return config
|
|
|
|
|
|
def run_ssh(cmd: list[str], server: str = SERVER_BOUNCER, capture: bool = True, timeout: int = SUBPROCESS_TIMEOUT) -> subprocess.CompletedProcess:
|
|
"""SSH를 통해 원격 서버에서 명령 실행
|
|
|
|
Args:
|
|
cmd: 실행할 명령어 리스트
|
|
server: 서버 식별자 (SERVER_BOUNCER 또는 SERVER_CROWDSEC)
|
|
capture: 출력 캡처 여부
|
|
timeout: 타임아웃 (초)
|
|
|
|
Returns:
|
|
subprocess.CompletedProcess 결과
|
|
"""
|
|
config = get_ssh_config(server)
|
|
|
|
# SSH 호스트 구성 (user@host 형태 또는 host만)
|
|
host = config["host"]
|
|
if "@" not in host:
|
|
host = f"{config['user']}@{host}"
|
|
|
|
# SSH 명령어 구성
|
|
ssh_cmd = [
|
|
"ssh",
|
|
"-o", "BatchMode=yes",
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null",
|
|
"-o", "LogLevel=ERROR",
|
|
"-o", "ConnectTimeout=10",
|
|
"-p", config["port"],
|
|
]
|
|
|
|
if config.get("key"):
|
|
ssh_cmd.extend(["-i", config["key"]])
|
|
|
|
# 원격 명령어 구성
|
|
remote_cmd = " ".join(shlex.quote(c) for c in cmd)
|
|
ssh_cmd.extend([host, remote_cmd])
|
|
|
|
try:
|
|
return subprocess.run(ssh_cmd, capture_output=capture, text=True, timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
raise BouncerError(f"명령 실행 시간 초과 ({server}): {' '.join(cmd)}")
|
|
except Exception as e:
|
|
raise BouncerError(f"SSH 명령 실행 실패 ({server}): {e}")
|
|
|
|
|
|
def get_config() -> dict:
|
|
"""bouncer 설정 파일 읽기"""
|
|
result = run_ssh(["cat", CONFIG_PATH], server=SERVER_BOUNCER)
|
|
if result.returncode != 0:
|
|
raise BouncerError(f"설정 파일을 읽을 수 없습니다: {result.stderr}")
|
|
try:
|
|
return yaml.safe_load(result.stdout)
|
|
except yaml.YAMLError as e:
|
|
raise BouncerError(f"YAML 파싱 실패: {e}")
|
|
|
|
|
|
def validate_config(config: dict) -> bool:
|
|
"""설정 유효성 검사"""
|
|
required_keys = ["cloudflare_config", "crowdsec_config"]
|
|
for key in required_keys:
|
|
if key not in config:
|
|
raise BouncerError(f"필수 설정 누락: {key}")
|
|
|
|
cf_config = config.get("cloudflare_config", {})
|
|
if not cf_config.get("accounts"):
|
|
raise BouncerError("Cloudflare 계정 설정이 없습니다")
|
|
|
|
return True
|
|
|
|
|
|
def backup_config(config: dict, reason: str = "manual") -> Path:
|
|
"""설정 백업"""
|
|
BACKUP_DIR.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_file = BACKUP_DIR / f"config_{timestamp}_{reason}.yaml"
|
|
|
|
yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
backup_file.write_text(yaml_content)
|
|
|
|
# 오래된 백업 정리 (최근 20개만 유지)
|
|
backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
for old_backup in backups[20:]:
|
|
old_backup.unlink()
|
|
|
|
return backup_file
|
|
|
|
|
|
def save_config(config: dict, reason: str = "update") -> None:
|
|
"""bouncer 설정 파일 저장 (백업 후)"""
|
|
validate_config(config)
|
|
backup_file = backup_config(config, reason)
|
|
console.print(f"[dim]백업 저장: {backup_file}[/dim]")
|
|
|
|
yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
result = run_ssh(["sh", "-c", f"cat > {CONFIG_PATH} << 'EOFCONFIG'\n{yaml_content}\nEOFCONFIG"], server=SERVER_BOUNCER)
|
|
if result.returncode != 0:
|
|
raise BouncerError(f"설정 파일 저장 실패: {result.stderr}")
|
|
|
|
log_action(reason, f"설정 파일 업데이트")
|
|
|
|
|
|
def get_cf_token(config: dict) -> str:
|
|
"""설정에서 Cloudflare API 토큰 가져오기"""
|
|
accounts = config.get("cloudflare_config", {}).get("accounts", [])
|
|
if accounts:
|
|
return accounts[0].get("token", "")
|
|
return ""
|
|
|
|
|
|
def get_zone_id(domain: str, token: str) -> Optional[str]:
|
|
"""Cloudflare API로 도메인의 zone_id 조회"""
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
try:
|
|
resp = requests.get(
|
|
f"https://api.cloudflare.com/client/v4/zones?name={domain}",
|
|
headers=headers,
|
|
timeout=REQUEST_TIMEOUT,
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
if data.get("success") and data.get("result"):
|
|
return data["result"][0]["id"]
|
|
except requests.RequestException as e:
|
|
raise BouncerError(f"Cloudflare API 요청 실패: {e}")
|
|
return None
|
|
|
|
|
|
def get_all_zones_from_cf(token: str) -> list[dict]:
|
|
"""Cloudflare 계정의 모든 zone 조회"""
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
all_zones = []
|
|
page = 1
|
|
|
|
try:
|
|
while True:
|
|
resp = requests.get(
|
|
f"https://api.cloudflare.com/client/v4/zones?per_page=50&page={page}",
|
|
headers=headers,
|
|
timeout=REQUEST_TIMEOUT,
|
|
)
|
|
if resp.status_code != 200:
|
|
break
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
break
|
|
zones = data.get("result", [])
|
|
if not zones:
|
|
break
|
|
all_zones.extend(zones)
|
|
if len(zones) < 50:
|
|
break
|
|
page += 1
|
|
except requests.RequestException as e:
|
|
raise BouncerError(f"Cloudflare API 요청 실패: {e}")
|
|
|
|
return all_zones
|
|
|
|
|
|
def get_zones(config: dict) -> list[dict]:
|
|
"""설정에서 zones 목록 가져오기"""
|
|
accounts = config.get("cloudflare_config", {}).get("accounts", [])
|
|
if accounts:
|
|
return accounts[0].get("zones", [])
|
|
return []
|
|
|
|
|
|
def extract_domain(route: str) -> str:
|
|
"""routes_to_protect에서 도메인 추출"""
|
|
return route.strip("*").strip("/")
|
|
|
|
|
|
def find_zone_by_domain(zones: list[dict], domain: str) -> tuple[int, dict] | tuple[None, None]:
|
|
"""도메인으로 zone 찾기"""
|
|
for i, zone in enumerate(zones):
|
|
routes = zone.get("routes_to_protect", [])
|
|
if routes and domain in routes[0]:
|
|
return i, zone
|
|
return None, None
|
|
|
|
|
|
def do_apply() -> bool:
|
|
"""bouncer 서비스 재시작"""
|
|
result = run_ssh(["systemctl", "restart", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
|
|
|
|
if result.returncode != 0:
|
|
run_ssh(["pkill", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
|
|
run_ssh([
|
|
"sh", "-c",
|
|
f"nohup crowdsec-cloudflare-worker-bouncer -c {CONFIG_PATH} > /var/log/bouncer.log 2>&1 &"
|
|
], server=SERVER_BOUNCER)
|
|
|
|
# 잠시 대기 후 프로세스 확인
|
|
import time
|
|
time.sleep(2)
|
|
|
|
result = run_ssh(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
|
|
return result.returncode == 0
|
|
|
|
|
|
def version_callback(value: bool):
|
|
if value:
|
|
console.print(f"cfb version {__version__}")
|
|
raise typer.Exit()
|
|
|
|
|
|
# ============ CLI Commands ============
|
|
|
|
@app.callback()
|
|
def main(
|
|
version: Annotated[Optional[bool], typer.Option("--version", "-v", callback=version_callback, is_eager=True, help="버전 표시")] = None,
|
|
):
|
|
"""CrowdSec Cloudflare Bouncer 도메인 관리 CLI"""
|
|
pass
|
|
|
|
|
|
@app.command("list")
|
|
def list_domains():
|
|
"""보호 중인 도메인 목록 표시"""
|
|
try:
|
|
config = get_config()
|
|
zones = get_zones(config)
|
|
|
|
table = Table(title="보호 중인 도메인")
|
|
table.add_column("#", style="dim", width=3)
|
|
table.add_column("도메인", style="cyan")
|
|
table.add_column("Zone ID", style="dim")
|
|
table.add_column("액션", style="green")
|
|
table.add_column("Turnstile", style="yellow")
|
|
table.add_column("모드", style="magenta")
|
|
|
|
for i, zone in enumerate(zones, 1):
|
|
routes = zone.get("routes_to_protect", [])
|
|
domain = extract_domain(routes[0]) if routes else "N/A"
|
|
zone_id = zone.get("zone_id", "N/A")
|
|
action = zone.get("default_action", "N/A")
|
|
turnstile_cfg = zone.get("turnstile", {})
|
|
turnstile = "활성" if turnstile_cfg.get("enabled") else "비활성"
|
|
mode = turnstile_cfg.get("mode", "N/A") if turnstile_cfg.get("enabled") else "-"
|
|
table.add_row(str(i), domain, zone_id[:12] + "...", action, turnstile, mode)
|
|
|
|
console.print(table)
|
|
console.print(f"\n총 [bold]{len(zones)}[/bold]개 도메인 보호 중")
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def show(domain: str = typer.Argument(..., help="조회할 도메인")):
|
|
"""특정 도메인 상세 정보"""
|
|
try:
|
|
config = get_config()
|
|
zones = get_zones(config)
|
|
idx, zone = find_zone_by_domain(zones, domain)
|
|
|
|
if zone is None:
|
|
console.print(f"[red]{domain}을 찾을 수 없습니다[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
routes = zone.get("routes_to_protect", [])
|
|
turnstile = zone.get("turnstile", {})
|
|
|
|
console.print(f"\n[bold cyan]{'='*60}[/bold cyan]")
|
|
console.print(f"[bold cyan]{domain} 상세 정보[/bold cyan]")
|
|
console.print(f"[bold cyan]{'='*60}[/bold cyan]\n")
|
|
|
|
console.print(f"[bold]도메인:[/bold] {extract_domain(routes[0]) if routes else 'N/A'}")
|
|
console.print(f"[bold]Zone ID:[/bold] {zone.get('zone_id', 'N/A')}")
|
|
console.print(f"[bold]액션:[/bold] {zone.get('default_action', 'N/A')}")
|
|
console.print(f"[bold]가능한 액션:[/bold] {', '.join(zone.get('actions', []))}")
|
|
console.print(f"[bold]보호 경로:[/bold] {', '.join(routes)}")
|
|
|
|
console.print(f"\n[bold yellow]Turnstile 설정[/bold yellow]")
|
|
console.print(f" 활성화: {turnstile.get('enabled', False)}")
|
|
console.print(f" 모드: {turnstile.get('mode', 'N/A')}")
|
|
console.print(f" 키 자동 교체: {turnstile.get('rotate_secret_key', False)}")
|
|
console.print(f" 교체 주기: {turnstile.get('rotate_secret_key_every', 'N/A')}")
|
|
console.print()
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def add(
|
|
domain: Annotated[str, typer.Argument(help="추가할 도메인 (예: example.com)")],
|
|
action: Annotated[str, typer.Option(help="기본 액션 (captcha/ban)")] = "captcha",
|
|
turnstile: Annotated[bool, typer.Option(help="Turnstile 활성화")] = True,
|
|
mode: Annotated[str, typer.Option(help="Turnstile 모드 (managed/invisible/visible)")] = "managed",
|
|
auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="추가 후 자동 적용")] = False,
|
|
dry_run: Annotated[bool, typer.Option("--dry-run", help="실제 변경 없이 미리보기")] = False,
|
|
):
|
|
"""새 도메인 추가"""
|
|
try:
|
|
config = get_config()
|
|
token = get_cf_token(config)
|
|
|
|
if not token:
|
|
console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
zones = get_zones(config)
|
|
idx, _ = find_zone_by_domain(zones, domain)
|
|
if idx is not None:
|
|
console.print(f"[yellow]{domain}은 이미 보호 목록에 있습니다[/yellow]")
|
|
raise typer.Exit(1)
|
|
|
|
console.print(f"[dim]Cloudflare에서 {domain}의 zone_id 조회 중...[/dim]")
|
|
zone_id = get_zone_id(domain, token)
|
|
|
|
if not zone_id:
|
|
console.print(f"[red]{domain}의 zone_id를 찾을 수 없습니다.[/red]")
|
|
console.print("[dim]'cfb available' 명령으로 추가 가능한 도메인을 확인하세요.[/dim]")
|
|
raise typer.Exit(1)
|
|
|
|
console.print(f"[green]zone_id 확인: {zone_id}[/green]")
|
|
|
|
new_zone = {
|
|
"zone_id": zone_id,
|
|
"actions": [action],
|
|
"default_action": action,
|
|
"routes_to_protect": [f"*{domain}/*"],
|
|
"turnstile": {
|
|
"enabled": turnstile,
|
|
"rotate_secret_key": True,
|
|
"rotate_secret_key_every": "168h0m0s",
|
|
"mode": mode,
|
|
},
|
|
}
|
|
|
|
if dry_run:
|
|
console.print("\n[yellow]== DRY RUN ==[/yellow]")
|
|
console.print(yaml.dump(new_zone, default_flow_style=False))
|
|
return
|
|
|
|
config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone)
|
|
save_config(config, reason=f"add_{domain}")
|
|
log_action("add", domain)
|
|
|
|
console.print(f"[green]✓ {domain} 추가 완료[/green]")
|
|
|
|
if auto_apply:
|
|
console.print("[dim]설정 적용 중...[/dim]")
|
|
if do_apply():
|
|
console.print("[green]✓ 적용 완료[/green]")
|
|
else:
|
|
console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]")
|
|
else:
|
|
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def edit(
|
|
domain: Annotated[str, typer.Argument(help="수정할 도메인")],
|
|
action: Annotated[Optional[str], typer.Option(help="기본 액션 변경")] = None,
|
|
turnstile: Annotated[Optional[bool], typer.Option(help="Turnstile 활성화/비활성화")] = None,
|
|
mode: Annotated[Optional[str], typer.Option(help="Turnstile 모드 변경")] = None,
|
|
auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="수정 후 자동 적용")] = False,
|
|
):
|
|
"""기존 도메인 설정 수정"""
|
|
try:
|
|
config = get_config()
|
|
zones = get_zones(config)
|
|
idx, zone = find_zone_by_domain(zones, domain)
|
|
|
|
if zone is None:
|
|
console.print(f"[red]{domain}을 찾을 수 없습니다[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
changes = []
|
|
if action is not None:
|
|
zone["default_action"] = action
|
|
zone["actions"] = [action]
|
|
changes.append(f"액션: {action}")
|
|
|
|
if turnstile is not None:
|
|
zone["turnstile"]["enabled"] = turnstile
|
|
changes.append(f"Turnstile: {'활성' if turnstile else '비활성'}")
|
|
|
|
if mode is not None:
|
|
zone["turnstile"]["mode"] = mode
|
|
changes.append(f"모드: {mode}")
|
|
|
|
if not changes:
|
|
console.print("[yellow]변경할 내용이 없습니다. --action, --turnstile, --mode 옵션을 사용하세요.[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
config["cloudflare_config"]["accounts"][0]["zones"][idx] = zone
|
|
save_config(config, reason=f"edit_{domain}")
|
|
log_action("edit", f"{domain}: {', '.join(changes)}")
|
|
|
|
console.print(f"[green]✓ {domain} 수정 완료[/green]")
|
|
for change in changes:
|
|
console.print(f" - {change}")
|
|
|
|
if auto_apply:
|
|
console.print("[dim]설정 적용 중...[/dim]")
|
|
if do_apply():
|
|
console.print("[green]✓ 적용 완료[/green]")
|
|
else:
|
|
console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]")
|
|
else:
|
|
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def remove(
|
|
domain: Annotated[str, typer.Argument(help="제거할 도메인")],
|
|
force: Annotated[bool, typer.Option("--force", "-f", help="확인 없이 제거")] = False,
|
|
auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="제거 후 자동 적용")] = False,
|
|
):
|
|
"""도메인 제거"""
|
|
try:
|
|
config = get_config()
|
|
zones = get_zones(config)
|
|
idx, zone = find_zone_by_domain(zones, domain)
|
|
|
|
if idx is None:
|
|
console.print(f"[red]{domain}을 찾을 수 없습니다[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
if not force and not typer.confirm(f"{domain}을 정말 제거하시겠습니까?"):
|
|
console.print("[yellow]취소됨[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
del config["cloudflare_config"]["accounts"][0]["zones"][idx]
|
|
save_config(config, reason=f"remove_{domain}")
|
|
log_action("remove", domain)
|
|
|
|
console.print(f"[green]✓ {domain} 제거 완료[/green]")
|
|
|
|
if auto_apply:
|
|
console.print("[dim]설정 적용 중...[/dim]")
|
|
if do_apply():
|
|
console.print("[green]✓ 적용 완료[/green]")
|
|
else:
|
|
console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]")
|
|
else:
|
|
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def sync(
|
|
auto_apply: Annotated[bool, typer.Option("--auto-apply", "-a", help="추가 후 자동 적용")] = False,
|
|
dry_run: Annotated[bool, typer.Option("--dry-run", help="실제 변경 없이 미리보기")] = False,
|
|
):
|
|
"""Cloudflare의 모든 도메인을 보호 목록에 추가"""
|
|
try:
|
|
config = get_config()
|
|
token = get_cf_token(config)
|
|
|
|
if not token:
|
|
console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
console.print("[dim]Cloudflare에서 도메인 목록 조회 중...[/dim]")
|
|
cf_zones = get_all_zones_from_cf(token)
|
|
protected_zones = get_zones(config)
|
|
protected_ids = {z.get("zone_id") for z in protected_zones}
|
|
|
|
# 추가할 도메인 찾기
|
|
to_add = []
|
|
for cf_zone in cf_zones:
|
|
if cf_zone.get("id") not in protected_ids and cf_zone.get("status") == "active":
|
|
to_add.append(cf_zone)
|
|
|
|
if not to_add:
|
|
console.print("[green]모든 도메인이 이미 보호 목록에 있습니다[/green]")
|
|
return
|
|
|
|
console.print(f"\n[bold]추가할 도메인 ({len(to_add)}개):[/bold]")
|
|
for z in to_add:
|
|
console.print(f" - {z.get('name')}")
|
|
|
|
if dry_run:
|
|
console.print("\n[yellow]== DRY RUN - 실제 변경 없음 ==[/yellow]")
|
|
return
|
|
|
|
if not typer.confirm(f"\n{len(to_add)}개 도메인을 추가하시겠습니까?"):
|
|
console.print("[yellow]취소됨[/yellow]")
|
|
return
|
|
|
|
added = []
|
|
for cf_zone in to_add:
|
|
new_zone = {
|
|
"zone_id": cf_zone.get("id"),
|
|
"actions": ["captcha"],
|
|
"default_action": "captcha",
|
|
"routes_to_protect": [f"*{cf_zone.get('name')}/*"],
|
|
"turnstile": {
|
|
"enabled": True,
|
|
"rotate_secret_key": True,
|
|
"rotate_secret_key_every": "168h0m0s",
|
|
"mode": "managed",
|
|
},
|
|
}
|
|
config["cloudflare_config"]["accounts"][0]["zones"].append(new_zone)
|
|
added.append(cf_zone.get("name"))
|
|
|
|
save_config(config, reason="sync")
|
|
log_action("sync", f"Added {len(added)} domains: {', '.join(added)}")
|
|
|
|
console.print(f"[green]✓ {len(added)}개 도메인 추가 완료[/green]")
|
|
|
|
if auto_apply:
|
|
console.print("[dim]설정 적용 중...[/dim]")
|
|
if do_apply():
|
|
console.print("[green]✓ 적용 완료[/green]")
|
|
else:
|
|
console.print("[yellow]적용 실패 - 수동으로 확인하세요[/yellow]")
|
|
else:
|
|
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def apply():
|
|
"""설정 적용 (bouncer 서비스 재시작)"""
|
|
console.print("[dim]bouncer 서비스 재시작 중...[/dim]")
|
|
|
|
try:
|
|
if do_apply():
|
|
console.print("[green]✓ 설정 적용 완료[/green]")
|
|
result = run_ssh(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
|
|
if result.returncode == 0:
|
|
console.print(f"[green]bouncer PID: {result.stdout.strip()}[/green]")
|
|
log_action("apply", "Service restarted")
|
|
else:
|
|
console.print("[red]✗ bouncer 프로세스를 시작할 수 없습니다[/red]")
|
|
raise typer.Exit(1)
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def status():
|
|
"""bouncer 상태 확인"""
|
|
try:
|
|
result = run_ssh(["pgrep", "-f", "crowdsec-cloudflare-worker-bouncer"], server=SERVER_BOUNCER)
|
|
if result.returncode == 0:
|
|
pids = result.stdout.strip().split('\n')
|
|
console.print(f"[green]✓ bouncer 실행 중 (PID: {', '.join(pids)})[/green]")
|
|
else:
|
|
console.print("[red]✗ bouncer가 실행되지 않음[/red]")
|
|
|
|
result = run_ssh(["cscli", "bouncers", "list"], server=SERVER_CROWDSEC)
|
|
if result.returncode == 0:
|
|
console.print("\n[bold]CrowdSec Bouncer 상태:[/bold]")
|
|
console.print(result.stdout)
|
|
|
|
config = get_config()
|
|
zones = get_zones(config)
|
|
console.print(f"\n[bold]보호 도메인 수:[/bold] {len(zones)}")
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def available():
|
|
"""Cloudflare에서 추가 가능한 도메인 목록"""
|
|
try:
|
|
config = get_config()
|
|
token = get_cf_token(config)
|
|
|
|
if not token:
|
|
console.print("[red]Cloudflare API 토큰을 찾을 수 없습니다[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
console.print("[dim]Cloudflare에서 도메인 목록 조회 중...[/dim]")
|
|
cf_zones = get_all_zones_from_cf(token)
|
|
protected_zones = get_zones(config)
|
|
protected_ids = {z.get("zone_id") for z in protected_zones}
|
|
|
|
table = Table(title="Cloudflare 도메인 목록")
|
|
table.add_column("도메인", style="cyan")
|
|
table.add_column("Zone ID", style="dim")
|
|
table.add_column("상태", style="green")
|
|
table.add_column("보호", style="yellow")
|
|
|
|
for zone in cf_zones:
|
|
zone_id = zone.get("id", "")
|
|
is_protected = "✓ 보호중" if zone_id in protected_ids else "-"
|
|
status_style = "green" if zone.get("status") == "active" else "yellow"
|
|
table.add_row(
|
|
zone.get("name", "N/A"),
|
|
zone_id[:12] + "...",
|
|
f"[{status_style}]{zone.get('status', 'N/A')}[/{status_style}]",
|
|
is_protected,
|
|
)
|
|
|
|
console.print(table)
|
|
console.print(f"\n총 [bold]{len(cf_zones)}[/bold]개 도메인, [bold]{len(protected_ids)}[/bold]개 보호 중")
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def logs(
|
|
lines: Annotated[int, typer.Option("-n", "--lines", help="표시할 줄 수")] = 50,
|
|
follow: Annotated[bool, typer.Option("-f", "--follow", help="실시간 로그 추적")] = False,
|
|
):
|
|
"""bouncer 로그 조회"""
|
|
try:
|
|
config = get_ssh_config(SERVER_BOUNCER)
|
|
host = config["host"]
|
|
if "@" not in host:
|
|
host = f"{config['user']}@{host}"
|
|
|
|
if follow:
|
|
console.print("[dim]로그 추적 중... (Ctrl+C로 종료)[/dim]\n")
|
|
ssh_cmd = [
|
|
"ssh",
|
|
"-o", "BatchMode=yes",
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null",
|
|
"-o", "LogLevel=ERROR",
|
|
"-p", config["port"],
|
|
]
|
|
if config.get("key"):
|
|
ssh_cmd.extend(["-i", config["key"]])
|
|
ssh_cmd.extend([host, "journalctl -u crowdsec-cloudflare-worker-bouncer -f --no-pager"])
|
|
subprocess.run(ssh_cmd, timeout=None)
|
|
else:
|
|
# journalctl 시도, 실패시 syslog에서 grep
|
|
result = run_ssh(["journalctl", "-u", "crowdsec-cloudflare-worker-bouncer", "-n", str(lines), "--no-pager"], server=SERVER_BOUNCER)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
console.print(result.stdout)
|
|
else:
|
|
result = run_ssh(["sh", "-c", f"grep -i bouncer /var/log/syslog | tail -n {lines}"], server=SERVER_BOUNCER)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
console.print(result.stdout)
|
|
else:
|
|
console.print("[yellow]로그를 찾을 수 없습니다[/yellow]")
|
|
except KeyboardInterrupt:
|
|
console.print("\n[dim]로그 추적 종료[/dim]")
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def decisions(
|
|
limit: Annotated[int, typer.Option("-n", "--limit", help="표시할 개수")] = 20,
|
|
):
|
|
"""CrowdSec 현재 차단 결정 조회"""
|
|
try:
|
|
result = run_ssh(
|
|
["cscli", "decisions", "list", "-o", "raw", "--limit", str(limit)],
|
|
server=SERVER_CROWDSEC,
|
|
)
|
|
if result.returncode == 0:
|
|
if result.stdout.strip():
|
|
console.print("[bold]현재 차단 결정:[/bold]\n")
|
|
console.print(result.stdout)
|
|
else:
|
|
console.print("[green]현재 활성 차단 결정이 없습니다[/green]")
|
|
else:
|
|
console.print(f"[yellow]결정 조회 실패: {result.stderr}[/yellow]")
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def metrics():
|
|
"""bouncer Prometheus 메트릭 조회"""
|
|
try:
|
|
config = get_config()
|
|
prometheus_cfg = config.get("prometheus", {})
|
|
|
|
if not prometheus_cfg.get("enabled"):
|
|
console.print("[yellow]Prometheus 메트릭이 비활성화되어 있습니다[/yellow]")
|
|
return
|
|
|
|
addr = prometheus_cfg.get("listen_addr", "127.0.0.1")
|
|
port = prometheus_cfg.get("listen_port", "2112")
|
|
|
|
result = run_ssh(["curl", "-s", f"http://{addr}:{port}/metrics"], server=SERVER_BOUNCER)
|
|
if result.returncode == 0:
|
|
# 주요 메트릭만 필터링
|
|
lines = result.stdout.split('\n')
|
|
important_metrics = [l for l in lines if l and not l.startswith('#') and 'crowdsec' in l.lower()]
|
|
|
|
if important_metrics:
|
|
console.print("[bold]Bouncer 메트릭:[/bold]\n")
|
|
for line in important_metrics[:20]:
|
|
console.print(line)
|
|
else:
|
|
console.print("[dim]메트릭 수집 중... 잠시 후 다시 시도하세요[/dim]")
|
|
else:
|
|
console.print("[yellow]메트릭을 가져올 수 없습니다[/yellow]")
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def backup():
|
|
"""현재 설정 백업"""
|
|
try:
|
|
config = get_config()
|
|
backup_file = backup_config(config, reason="manual")
|
|
log_action("backup", str(backup_file))
|
|
console.print(f"[green]✓ 백업 완료: {backup_file}[/green]")
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def restore(
|
|
backup_file: Annotated[Optional[str], typer.Argument(help="복원할 백업 파일 경로")] = None,
|
|
):
|
|
"""백업에서 설정 복원"""
|
|
try:
|
|
if backup_file is None:
|
|
backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
if not backups:
|
|
console.print("[yellow]백업 파일이 없습니다[/yellow]")
|
|
raise typer.Exit(1)
|
|
|
|
table = Table(title="백업 파일 목록")
|
|
table.add_column("#", style="dim", width=3)
|
|
table.add_column("파일명", style="cyan")
|
|
table.add_column("날짜", style="green")
|
|
|
|
for i, b in enumerate(backups[:10], 1):
|
|
mtime = datetime.fromtimestamp(b.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S")
|
|
table.add_row(str(i), b.name, mtime)
|
|
|
|
console.print(table)
|
|
console.print("\n[dim]복원하려면: cfb restore <파일명>[/dim]")
|
|
return
|
|
|
|
backup_path = Path(backup_file)
|
|
if not backup_path.exists():
|
|
backup_path = BACKUP_DIR / backup_file
|
|
if not backup_path.exists():
|
|
console.print(f"[red]백업 파일을 찾을 수 없습니다: {backup_file}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
config = yaml.safe_load(backup_path.read_text())
|
|
validate_config(config)
|
|
|
|
if not typer.confirm(f"{backup_path.name}에서 복원하시겠습니까?"):
|
|
console.print("[yellow]취소됨[/yellow]")
|
|
raise typer.Exit(0)
|
|
|
|
current_config = get_config()
|
|
backup_config(current_config, reason="before_restore")
|
|
|
|
yaml_content = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
result = run_ssh(["sh", "-c", f"cat > {CONFIG_PATH} << 'EOFCONFIG'\n{yaml_content}\nEOFCONFIG"], server=SERVER_BOUNCER)
|
|
if result.returncode != 0:
|
|
raise BouncerError(f"복원 실패: {result.stderr}")
|
|
|
|
log_action("restore", backup_path.name)
|
|
console.print(f"[green]✓ 복원 완료: {backup_path.name}[/green]")
|
|
console.print("[yellow]적용하려면 'cfb apply' 명령을 실행하세요[/yellow]")
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
except yaml.YAMLError as e:
|
|
console.print(f"[red]백업 파일 파싱 실패: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def diff(
|
|
backup_file: Annotated[Optional[str], typer.Argument(help="비교할 백업 파일")] = None,
|
|
):
|
|
"""현재 설정과 백업 비교"""
|
|
try:
|
|
if backup_file is None:
|
|
backups = sorted(BACKUP_DIR.glob("config_*.yaml"), key=lambda p: p.stat().st_mtime, reverse=True)
|
|
if not backups:
|
|
console.print("[yellow]백업 파일이 없습니다[/yellow]")
|
|
raise typer.Exit(1)
|
|
backup_path = backups[0]
|
|
console.print(f"[dim]최근 백업과 비교: {backup_path.name}[/dim]\n")
|
|
else:
|
|
backup_path = Path(backup_file)
|
|
if not backup_path.exists():
|
|
backup_path = BACKUP_DIR / backup_file
|
|
if not backup_path.exists():
|
|
console.print(f"[red]백업 파일을 찾을 수 없습니다: {backup_file}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
config = get_config()
|
|
current_yaml = yaml.dump(config, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
backup_yaml = backup_path.read_text()
|
|
|
|
diff_lines = list(unified_diff(
|
|
backup_yaml.splitlines(keepends=True),
|
|
current_yaml.splitlines(keepends=True),
|
|
fromfile=f"백업: {backup_path.name}",
|
|
tofile="현재 설정",
|
|
))
|
|
|
|
if diff_lines:
|
|
for line in diff_lines:
|
|
if line.startswith('+') and not line.startswith('+++'):
|
|
console.print(f"[green]{line.rstrip()}[/green]")
|
|
elif line.startswith('-') and not line.startswith('---'):
|
|
console.print(f"[red]{line.rstrip()}[/red]")
|
|
elif line.startswith('@@'):
|
|
console.print(f"[cyan]{line.rstrip()}[/cyan]")
|
|
else:
|
|
console.print(line.rstrip())
|
|
else:
|
|
console.print("[green]변경사항 없음[/green]")
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command("export")
|
|
def export_config(
|
|
output: Annotated[Optional[str], typer.Option("-o", "--output", help="출력 파일 경로")] = None,
|
|
format: Annotated[str, typer.Option("-f", "--format", help="출력 형식 (yaml/json)")] = "yaml",
|
|
):
|
|
"""설정을 파일로 내보내기"""
|
|
try:
|
|
config = get_config()
|
|
zones = get_zones(config)
|
|
|
|
# 도메인 목록만 추출
|
|
export_data = {
|
|
"domains": [
|
|
{
|
|
"domain": extract_domain(z.get("routes_to_protect", [""])[0]),
|
|
"zone_id": z.get("zone_id"),
|
|
"action": z.get("default_action"),
|
|
"turnstile_enabled": z.get("turnstile", {}).get("enabled"),
|
|
"turnstile_mode": z.get("turnstile", {}).get("mode"),
|
|
}
|
|
for z in zones
|
|
],
|
|
"exported_at": datetime.now().isoformat(),
|
|
"total": len(zones),
|
|
}
|
|
|
|
if format == "json":
|
|
import json
|
|
content = json.dumps(export_data, indent=2, ensure_ascii=False)
|
|
else:
|
|
content = yaml.dump(export_data, default_flow_style=False, allow_unicode=True)
|
|
|
|
if output:
|
|
Path(output).write_text(content)
|
|
console.print(f"[green]✓ 내보내기 완료: {output}[/green]")
|
|
else:
|
|
console.print(content)
|
|
|
|
except BouncerError as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def history(
|
|
lines: Annotated[int, typer.Option("-n", "--lines", help="표시할 줄 수")] = 20,
|
|
):
|
|
"""변경 이력 조회"""
|
|
try:
|
|
if not LOG_FILE.exists():
|
|
console.print("[yellow]변경 이력이 없습니다[/yellow]")
|
|
return
|
|
|
|
content = LOG_FILE.read_text()
|
|
history_lines = content.strip().split('\n')
|
|
|
|
console.print("[bold]변경 이력:[/bold]\n")
|
|
for line in history_lines[-lines:]:
|
|
console.print(line)
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]에러: {e}[/red]")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app()
|