FastAPI-based SSL certificate automation server. - Google Public CA wildcard cert issuance via certbot - Cloudflare DNS-01 challenge with auto EAB key generation - APISIX multi-instance deployment with domain-instance mapping - Vault integration for all secrets - Bearer token auth, retry logic, Discord DM alerts - Auto-renewal scheduler (daily 03:00 UTC) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
130 lines
4.6 KiB
Python
130 lines
4.6 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from .config import AppConfig
|
|
from .google_eab import create_eab_key
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def issue_certificate(domain: str, config: AppConfig) -> dict:
|
|
"""certbot으로 와일드카드 인증서 발급. 도메인별 디렉토리로 lock 분리."""
|
|
# EAB 키 자동 생성
|
|
try:
|
|
eab = create_eab_key(config.gcp_service_account_json, config.gcp_project)
|
|
except Exception as e:
|
|
logger.error("Failed to create EAB key: %s", e)
|
|
return {"domain": domain, "success": False, "error": f"EAB key creation failed: {e}"}
|
|
|
|
# 도메인별 config/work/logs 디렉토리 (lock 완전 분리)
|
|
config_dir = f"{config.certbot_config_dir}/{domain}"
|
|
work_dir = f"{config.certbot_work_dir}/{domain}"
|
|
logs_dir = f"{config.certbot_logs_dir}/{domain}"
|
|
os.makedirs(config_dir, exist_ok=True)
|
|
os.makedirs(work_dir, exist_ok=True)
|
|
os.makedirs(logs_dir, exist_ok=True)
|
|
|
|
# Cloudflare credentials 임시 파일
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".ini", delete=False) as f:
|
|
f.write(f"dns_cloudflare_api_token = {config.cloudflare_api_token}\n")
|
|
credentials_path = f.name
|
|
os.chmod(credentials_path, 0o600)
|
|
|
|
try:
|
|
cmd = [
|
|
"certbot", "certonly",
|
|
"--dns-cloudflare",
|
|
"--dns-cloudflare-credentials", credentials_path,
|
|
"--dns-cloudflare-propagation-seconds", str(config.dns_propagation_seconds),
|
|
"--server", config.google_acme_server,
|
|
"-d", f"*.{domain}",
|
|
"-d", domain,
|
|
"--email", config.certbot_email,
|
|
"--eab-kid", eab["key_id"],
|
|
"--eab-hmac-key", eab["b64_mac_key"],
|
|
"--agree-tos",
|
|
"--non-interactive",
|
|
"--config-dir", config_dir,
|
|
"--work-dir", work_dir,
|
|
"--logs-dir", logs_dir,
|
|
]
|
|
|
|
logger.info("Running certbot for %s", domain)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
error_msg = stderr.decode().strip() or stdout.decode().strip()
|
|
logger.error("certbot failed for %s: %s", domain, error_msg)
|
|
return {"domain": domain, "success": False, "error": error_msg}
|
|
|
|
# 인증서를 공용 디렉토리로 복사
|
|
src = Path(config_dir) / "live" / domain
|
|
dst = Path(config.certbot_config_dir) / "live" / domain
|
|
if src.exists():
|
|
dst.mkdir(parents=True, exist_ok=True)
|
|
for f in ("fullchain.pem", "privkey.pem", "chain.pem", "cert.pem"):
|
|
src_file = src / f
|
|
if src_file.exists():
|
|
# symlink를 따라가서 실제 파일 복사
|
|
shutil.copy2(str(src_file.resolve()), str(dst / f))
|
|
|
|
logger.info("Certificate issued for %s", domain)
|
|
return {"domain": domain, "success": True}
|
|
finally:
|
|
os.unlink(credentials_path)
|
|
|
|
|
|
def get_certificate_info(domain: str, config: AppConfig) -> dict | None:
|
|
"""발급된 인증서의 경로와 만료일 반환."""
|
|
live_dir = Path(config.certbot_config_dir) / "live" / domain
|
|
cert_path = live_dir / "fullchain.pem"
|
|
key_path = live_dir / "privkey.pem"
|
|
|
|
if not cert_path.exists() or not key_path.exists():
|
|
return None
|
|
|
|
result = subprocess.run(
|
|
["openssl", "x509", "-enddate", "-noout", "-in", str(cert_path)],
|
|
capture_output=True, text=True,
|
|
)
|
|
if result.returncode != 0:
|
|
return None
|
|
|
|
line = result.stdout.strip()
|
|
date_str = line.split("=", 1)[1]
|
|
expiry = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z").replace(tzinfo=timezone.utc)
|
|
|
|
return {
|
|
"domain": domain,
|
|
"cert_path": str(cert_path),
|
|
"key_path": str(key_path),
|
|
"expiry": expiry.isoformat(),
|
|
"days_remaining": (expiry - datetime.now(timezone.utc)).days,
|
|
}
|
|
|
|
|
|
def list_certificates(config: AppConfig) -> list[dict]:
|
|
"""발급된 모든 인증서 목록 반환."""
|
|
live_dir = Path(config.certbot_config_dir) / "live"
|
|
if not live_dir.exists():
|
|
return []
|
|
|
|
certs = []
|
|
for entry in sorted(live_dir.iterdir()):
|
|
if entry.is_dir() and not entry.name.startswith("."):
|
|
info = get_certificate_info(entry.name, config)
|
|
if info:
|
|
certs.append(info)
|
|
return certs
|