import json import logging import os import re from dataclasses import dataclass, field import httpx logger = logging.getLogger(__name__) @dataclass class ApisixInstance: name: str admin_url: str admin_key: str @dataclass class AppConfig: cloudflare_api_token: str google_acme_server: str certbot_email: str dns_propagation_seconds: int gcp_project: str gcp_service_account_json: str apisix_instances: list[ApisixInstance] api_token: str = "" discord_bot_token: str = "" discord_alert_user_id: str = "" domain_instance_map: dict[str, list[str]] = field(default_factory=dict) certbot_config_dir: str = "/data/certbot/config" certbot_work_dir: str = "/data/certbot/work" certbot_logs_dir: str = "/data/certbot/logs" # --- Domain validation --- _DOMAIN_RE = re.compile( r"^(?:\*\.)?(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$" ) def validate_domain(domain: str) -> str | None: """도메인 유효성 검사. 유효하면 None, 아니면 에러 메시지.""" if not domain or len(domain) > 253: return "Invalid domain length" if ".." in domain or "/" in domain or "\\" in domain: return "Invalid characters in domain" if not _DOMAIN_RE.match(domain): return "Invalid domain format" return None # --- Vault --- def _vault_read(path: str) -> dict | None: """Vault KV v2에서 시크릿 읽기. 실패 시 None 반환.""" addr = os.environ.get("VAULT_ADDR", "") token = os.environ.get("VAULT_TOKEN", "") if not addr or not token: return None try: resp = httpx.get( f"{addr}/v1/secret/data/{path}", headers={"X-Vault-Token": token}, timeout=10, ) if resp.status_code == 200: return resp.json()["data"]["data"] if resp.status_code == 403: logger.error("Vault token expired or invalid for %s (403)", path) except Exception as e: logger.warning("Vault read failed for %s: %s", path, e) return None def load_config(path: str = "/data/config/config.json") -> AppConfig: with open(path) as f: raw = json.load(f) # --- Vault에서 시크릿 로드 --- vault_cf = _vault_read("cloudflare") vault_apisix = _vault_read("infra/apisix") vault_sa = _vault_read("google/ca/service-account") vault_cm = _vault_read("infra/cert-manager") vault_discord = _vault_read("discord/bot") # Cloudflare token: Vault → config → env cf_token = ( (vault_cf or {}).get("api_token") or raw.get("cloudflare_api_token") or os.environ.get("CLOUDFLARE_API_TOKEN", "") ) # GCP service account: Vault → config(파일경로 또는 JSON문자열) sa_json = (vault_sa or {}).get("service_account_json", "") if not sa_json: sa_json = raw.get("gcp_service_account_json", "") if sa_json and not sa_json.startswith("{"): with open(sa_json) as f: sa_json = f.read() # APISIX instances: 통일 admin_key를 Vault에서 로드 apisix_admin_key = (vault_apisix or {}).get("admin_key", "") instances = [] for inst in raw.get("apisix_instances", []): instances.append(ApisixInstance( name=inst["name"], admin_url=inst["admin_url"], admin_key=apisix_admin_key or inst.get("admin_key", ""), )) # API token: Vault → config → env api_token = ( (vault_cm or {}).get("api_token") or raw.get("api_token") or os.environ.get("CERT_MANAGER_API_TOKEN", "") ) # Discord: Vault → config discord_bot_token = (vault_discord or {}).get("bot_token") or raw.get("discord_bot_token", "") discord_alert_user_id = (vault_discord or {}).get("alert_user_id") or raw.get("discord_alert_user_id", "") # Domain-instance mapping domain_instance_map = raw.get("domain_instance_map", {}) vault_status = "connected" if vault_cf else "unavailable, using fallback" logger.info("Config loaded (vault: %s)", vault_status) return AppConfig( cloudflare_api_token=cf_token, google_acme_server=raw.get("google_acme_server", "https://dv.acme-v02.api.pki.goog/directory"), certbot_email=raw.get("certbot_email", ""), dns_propagation_seconds=raw.get("dns_propagation_seconds", 30), gcp_project=raw.get("gcp_project", ""), gcp_service_account_json=sa_json, apisix_instances=instances, api_token=api_token, discord_bot_token=discord_bot_token, discord_alert_user_id=discord_alert_user_id, domain_instance_map=domain_instance_map, )