Files
cert-manager/app/certbot.py
kappa 1cd1f0cfc2 Initial commit: cert-manager API server
FastAPI-based SSL certificate automation server.
- Google Public CA wildcard cert issuance via certbot
- Cloudflare DNS-01 challenge with auto EAB key generation
- APISIX multi-instance deployment with domain-instance mapping
- Vault integration for all secrets
- Bearer token auth, retry logic, Discord DM alerts
- Auto-renewal scheduler (daily 03:00 UTC)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 17:39:14 +09:00

130 lines
4.6 KiB
Python

import asyncio
import logging
import os
import shutil
import subprocess
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from .config import AppConfig
from .google_eab import create_eab_key
logger = logging.getLogger(__name__)
async def issue_certificate(domain: str, config: AppConfig) -> dict:
"""certbot으로 와일드카드 인증서 발급. 도메인별 디렉토리로 lock 분리."""
# EAB 키 자동 생성
try:
eab = create_eab_key(config.gcp_service_account_json, config.gcp_project)
except Exception as e:
logger.error("Failed to create EAB key: %s", e)
return {"domain": domain, "success": False, "error": f"EAB key creation failed: {e}"}
# 도메인별 config/work/logs 디렉토리 (lock 완전 분리)
config_dir = f"{config.certbot_config_dir}/{domain}"
work_dir = f"{config.certbot_work_dir}/{domain}"
logs_dir = f"{config.certbot_logs_dir}/{domain}"
os.makedirs(config_dir, exist_ok=True)
os.makedirs(work_dir, exist_ok=True)
os.makedirs(logs_dir, exist_ok=True)
# Cloudflare credentials 임시 파일
with tempfile.NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
f.write(f"dns_cloudflare_api_token = {config.cloudflare_api_token}\n")
credentials_path = f.name
os.chmod(credentials_path, 0o600)
try:
cmd = [
"certbot", "certonly",
"--dns-cloudflare",
"--dns-cloudflare-credentials", credentials_path,
"--dns-cloudflare-propagation-seconds", str(config.dns_propagation_seconds),
"--server", config.google_acme_server,
"-d", f"*.{domain}",
"-d", domain,
"--email", config.certbot_email,
"--eab-kid", eab["key_id"],
"--eab-hmac-key", eab["b64_mac_key"],
"--agree-tos",
"--non-interactive",
"--config-dir", config_dir,
"--work-dir", work_dir,
"--logs-dir", logs_dir,
]
logger.info("Running certbot for %s", domain)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode().strip() or stdout.decode().strip()
logger.error("certbot failed for %s: %s", domain, error_msg)
return {"domain": domain, "success": False, "error": error_msg}
# 인증서를 공용 디렉토리로 복사
src = Path(config_dir) / "live" / domain
dst = Path(config.certbot_config_dir) / "live" / domain
if src.exists():
dst.mkdir(parents=True, exist_ok=True)
for f in ("fullchain.pem", "privkey.pem", "chain.pem", "cert.pem"):
src_file = src / f
if src_file.exists():
# symlink를 따라가서 실제 파일 복사
shutil.copy2(str(src_file.resolve()), str(dst / f))
logger.info("Certificate issued for %s", domain)
return {"domain": domain, "success": True}
finally:
os.unlink(credentials_path)
def get_certificate_info(domain: str, config: AppConfig) -> dict | None:
"""발급된 인증서의 경로와 만료일 반환."""
live_dir = Path(config.certbot_config_dir) / "live" / domain
cert_path = live_dir / "fullchain.pem"
key_path = live_dir / "privkey.pem"
if not cert_path.exists() or not key_path.exists():
return None
result = subprocess.run(
["openssl", "x509", "-enddate", "-noout", "-in", str(cert_path)],
capture_output=True, text=True,
)
if result.returncode != 0:
return None
line = result.stdout.strip()
date_str = line.split("=", 1)[1]
expiry = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z").replace(tzinfo=timezone.utc)
return {
"domain": domain,
"cert_path": str(cert_path),
"key_path": str(key_path),
"expiry": expiry.isoformat(),
"days_remaining": (expiry - datetime.now(timezone.utc)).days,
}
def list_certificates(config: AppConfig) -> list[dict]:
"""발급된 모든 인증서 목록 반환."""
live_dir = Path(config.certbot_config_dir) / "live"
if not live_dir.exists():
return []
certs = []
for entry in sorted(live_dir.iterdir()):
if entry.is_dir() and not entry.name.startswith("."):
info = get_certificate_info(entry.name, config)
if info:
certs.append(info)
return certs