Initial commit: cert-manager API server

FastAPI-based SSL certificate automation server.
- Google Public CA wildcard cert issuance via certbot
- Cloudflare DNS-01 challenge with auto EAB key generation
- APISIX multi-instance deployment with domain-instance mapping
- Vault integration for all secrets
- Bearer token auth, retry logic, Discord DM alerts
- Auto-renewal scheduler (daily 03:00 UTC)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
kappa
2026-02-28 17:39:14 +09:00
commit 1cd1f0cfc2
12 changed files with 782 additions and 0 deletions

129
app/certbot.py Normal file
View File

@@ -0,0 +1,129 @@
import asyncio
import logging
import os
import shutil
import subprocess
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from .config import AppConfig
from .google_eab import create_eab_key
logger = logging.getLogger(__name__)
async def issue_certificate(domain: str, config: AppConfig) -> dict:
"""certbot으로 와일드카드 인증서 발급. 도메인별 디렉토리로 lock 분리."""
# EAB 키 자동 생성
try:
eab = create_eab_key(config.gcp_service_account_json, config.gcp_project)
except Exception as e:
logger.error("Failed to create EAB key: %s", e)
return {"domain": domain, "success": False, "error": f"EAB key creation failed: {e}"}
# 도메인별 config/work/logs 디렉토리 (lock 완전 분리)
config_dir = f"{config.certbot_config_dir}/{domain}"
work_dir = f"{config.certbot_work_dir}/{domain}"
logs_dir = f"{config.certbot_logs_dir}/{domain}"
os.makedirs(config_dir, exist_ok=True)
os.makedirs(work_dir, exist_ok=True)
os.makedirs(logs_dir, exist_ok=True)
# Cloudflare credentials 임시 파일
with tempfile.NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
f.write(f"dns_cloudflare_api_token = {config.cloudflare_api_token}\n")
credentials_path = f.name
os.chmod(credentials_path, 0o600)
try:
cmd = [
"certbot", "certonly",
"--dns-cloudflare",
"--dns-cloudflare-credentials", credentials_path,
"--dns-cloudflare-propagation-seconds", str(config.dns_propagation_seconds),
"--server", config.google_acme_server,
"-d", f"*.{domain}",
"-d", domain,
"--email", config.certbot_email,
"--eab-kid", eab["key_id"],
"--eab-hmac-key", eab["b64_mac_key"],
"--agree-tos",
"--non-interactive",
"--config-dir", config_dir,
"--work-dir", work_dir,
"--logs-dir", logs_dir,
]
logger.info("Running certbot for %s", domain)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode().strip() or stdout.decode().strip()
logger.error("certbot failed for %s: %s", domain, error_msg)
return {"domain": domain, "success": False, "error": error_msg}
# 인증서를 공용 디렉토리로 복사
src = Path(config_dir) / "live" / domain
dst = Path(config.certbot_config_dir) / "live" / domain
if src.exists():
dst.mkdir(parents=True, exist_ok=True)
for f in ("fullchain.pem", "privkey.pem", "chain.pem", "cert.pem"):
src_file = src / f
if src_file.exists():
# symlink를 따라가서 실제 파일 복사
shutil.copy2(str(src_file.resolve()), str(dst / f))
logger.info("Certificate issued for %s", domain)
return {"domain": domain, "success": True}
finally:
os.unlink(credentials_path)
def get_certificate_info(domain: str, config: AppConfig) -> dict | None:
"""발급된 인증서의 경로와 만료일 반환."""
live_dir = Path(config.certbot_config_dir) / "live" / domain
cert_path = live_dir / "fullchain.pem"
key_path = live_dir / "privkey.pem"
if not cert_path.exists() or not key_path.exists():
return None
result = subprocess.run(
["openssl", "x509", "-enddate", "-noout", "-in", str(cert_path)],
capture_output=True, text=True,
)
if result.returncode != 0:
return None
line = result.stdout.strip()
date_str = line.split("=", 1)[1]
expiry = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z").replace(tzinfo=timezone.utc)
return {
"domain": domain,
"cert_path": str(cert_path),
"key_path": str(key_path),
"expiry": expiry.isoformat(),
"days_remaining": (expiry - datetime.now(timezone.utc)).days,
}
def list_certificates(config: AppConfig) -> list[dict]:
"""발급된 모든 인증서 목록 반환."""
live_dir = Path(config.certbot_config_dir) / "live"
if not live_dir.exists():
return []
certs = []
for entry in sorted(live_dir.iterdir()):
if entry.is_dir() and not entry.name.startswith("."):
info = get_certificate_info(entry.name, config)
if info:
certs.append(info)
return certs