FastAPI-based SSL certificate automation server. - Google Public CA wildcard cert issuance via certbot - Cloudflare DNS-01 challenge with auto EAB key generation - APISIX multi-instance deployment with domain-instance mapping - Vault integration for all secrets - Bearer token auth, retry logic, Discord DM alerts - Auto-renewal scheduler (daily 03:00 UTC) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
import hashlib
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
|
|
|
|
from .config import AppConfig, ApisixInstance
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _ssl_id(domain: str) -> str:
|
|
"""도메인 기반의 안정적인 SSL ID 생성."""
|
|
return hashlib.md5(domain.encode()).hexdigest()[:16]
|
|
|
|
|
|
@retry(
|
|
stop=stop_after_attempt(3),
|
|
wait=wait_exponential(multiplier=1, min=2, max=10),
|
|
retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException)),
|
|
reraise=True,
|
|
)
|
|
async def _put_ssl(client: httpx.AsyncClient, url: str, payload: dict, headers: dict):
|
|
resp = await client.put(url, json=payload, headers=headers)
|
|
resp.raise_for_status()
|
|
return resp
|
|
|
|
|
|
async def deploy_certificate(
|
|
domain: str,
|
|
config: AppConfig,
|
|
instances: list[ApisixInstance] | None = None,
|
|
) -> list[dict]:
|
|
"""인증서를 APISIX 인스턴스들에 배포."""
|
|
targets = instances or config.apisix_instances
|
|
live_dir = Path(config.certbot_config_dir) / "live" / domain
|
|
|
|
cert_path = live_dir / "fullchain.pem"
|
|
key_path = live_dir / "privkey.pem"
|
|
|
|
if not cert_path.exists() or not key_path.exists():
|
|
return [{"instance": t.name, "success": False, "error": "Certificate files not found"} for t in targets]
|
|
|
|
cert_pem = cert_path.read_text()
|
|
key_pem = key_path.read_text()
|
|
ssl_id = _ssl_id(domain)
|
|
|
|
payload = {
|
|
"cert": cert_pem,
|
|
"key": key_pem,
|
|
"snis": [f"*.{domain}", domain],
|
|
}
|
|
|
|
results = []
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
for inst in targets:
|
|
try:
|
|
await _put_ssl(
|
|
client,
|
|
f"{inst.admin_url}/apisix/admin/ssls/{ssl_id}",
|
|
payload,
|
|
{"X-API-KEY": inst.admin_key},
|
|
)
|
|
logger.info("Deployed %s to %s", domain, inst.name)
|
|
results.append({"instance": inst.name, "success": True})
|
|
except Exception as e:
|
|
logger.error("Failed to deploy %s to %s: %s", domain, inst.name, e)
|
|
results.append({"instance": inst.name, "success": False, "error": str(e)})
|
|
|
|
return results
|