import asyncio import logging import os import shutil import subprocess import tempfile from datetime import datetime, timezone from pathlib import Path from .config import AppConfig from .google_eab import create_eab_key logger = logging.getLogger(__name__) async def issue_certificate(domain: str, config: AppConfig) -> dict: """certbot으로 와일드카드 인증서 발급. 도메인별 디렉토리로 lock 분리.""" # EAB 키 자동 생성 try: eab = create_eab_key(config.gcp_service_account_json, config.gcp_project) except Exception as e: logger.error("Failed to create EAB key: %s", e) return {"domain": domain, "success": False, "error": f"EAB key creation failed: {e}"} # 도메인별 config/work/logs 디렉토리 (lock 완전 분리) config_dir = f"{config.certbot_config_dir}/{domain}" work_dir = f"{config.certbot_work_dir}/{domain}" logs_dir = f"{config.certbot_logs_dir}/{domain}" os.makedirs(config_dir, exist_ok=True) os.makedirs(work_dir, exist_ok=True) os.makedirs(logs_dir, exist_ok=True) # Cloudflare credentials 임시 파일 with tempfile.NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f: f.write(f"dns_cloudflare_api_token = {config.cloudflare_api_token}\n") credentials_path = f.name os.chmod(credentials_path, 0o600) try: cmd = [ "certbot", "certonly", "--dns-cloudflare", "--dns-cloudflare-credentials", credentials_path, "--dns-cloudflare-propagation-seconds", str(config.dns_propagation_seconds), "--server", config.google_acme_server, "-d", f"*.{domain}", "-d", domain, "--email", config.certbot_email, "--eab-kid", eab["key_id"], "--eab-hmac-key", eab["b64_mac_key"], "--agree-tos", "--non-interactive", "--config-dir", config_dir, "--work-dir", work_dir, "--logs-dir", logs_dir, ] logger.info("Running certbot for %s", domain) proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode().strip() or stdout.decode().strip() logger.error("certbot failed for %s: %s", domain, error_msg) return {"domain": domain, "success": False, "error": error_msg} # 인증서를 공용 디렉토리로 복사 src = Path(config_dir) / "live" / domain dst = Path(config.certbot_config_dir) / "live" / domain if src.exists(): dst.mkdir(parents=True, exist_ok=True) for f in ("fullchain.pem", "privkey.pem", "chain.pem", "cert.pem"): src_file = src / f if src_file.exists(): # symlink를 따라가서 실제 파일 복사 shutil.copy2(str(src_file.resolve()), str(dst / f)) logger.info("Certificate issued for %s", domain) return {"domain": domain, "success": True} finally: os.unlink(credentials_path) def get_certificate_info(domain: str, config: AppConfig) -> dict | None: """발급된 인증서의 경로와 만료일 반환.""" live_dir = Path(config.certbot_config_dir) / "live" / domain cert_path = live_dir / "fullchain.pem" key_path = live_dir / "privkey.pem" if not cert_path.exists() or not key_path.exists(): return None result = subprocess.run( ["openssl", "x509", "-enddate", "-noout", "-in", str(cert_path)], capture_output=True, text=True, ) if result.returncode != 0: return None line = result.stdout.strip() date_str = line.split("=", 1)[1] expiry = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z").replace(tzinfo=timezone.utc) return { "domain": domain, "cert_path": str(cert_path), "key_path": str(key_path), "expiry": expiry.isoformat(), "days_remaining": (expiry - datetime.now(timezone.utc)).days, } def list_certificates(config: AppConfig) -> list[dict]: """발급된 모든 인증서 목록 반환.""" live_dir = Path(config.certbot_config_dir) / "live" if not live_dir.exists(): return [] certs = [] for entry in sorted(live_dir.iterdir()): if entry.is_dir() and not entry.name.startswith("."): info = get_certificate_info(entry.name, config) if info: certs.append(info) return certs