feat: Zero-downtime certificate management via Runtime API

Changes:
- Replace USR2 signal reload with HAProxy Runtime API for cert updates
  - new ssl cert → set ssl cert → commit ssl cert
  - No connection drops during certificate changes
- Add certificates.json for persistence (domain list only)
- Add haproxy_load_cert tool for manual certificate loading
- Auto-restore certificates on MCP startup
- Update startup sequence to load both servers and certificates

certificates.json format:
{
  "domains": ["inouter.com", "anvil.it.com"]
}

Paths derived from convention:
- Host: /opt/haproxy/certs/{domain}.pem
- Container: /etc/haproxy/certs/{domain}.pem

Total MCP tools: 28 → 29

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
kaffa
2026-02-02 04:23:28 +00:00
parent 7ebe204f89
commit 79254835e9
4 changed files with 292 additions and 53 deletions

View File

@@ -1,5 +1,6 @@
"""Certificate management tools for HAProxy MCP Server."""
import json
import os
import subprocess
from datetime import datetime
@@ -7,18 +8,161 @@ from typing import Annotated
from pydantic import Field
from ..config import logger, SUBPROCESS_TIMEOUT, HAPROXY_CONTAINER
from ..config import logger, SUBPROCESS_TIMEOUT
from ..validation import validate_domain
from ..haproxy_client import haproxy_cmd
from ..file_ops import atomic_write_file
# Certificate paths
ACME_SH = os.path.expanduser("~/.acme.sh/acme.sh")
ACME_HOME = os.path.expanduser("~/.acme.sh")
CERTS_DIR = "/opt/haproxy/certs"
CERTS_DIR_CONTAINER = "/etc/haproxy/certs"
CERTS_JSON = "/opt/haproxy/conf/certificates.json"
# Longer timeout for certificate operations (ACME can be slow)
CERT_TIMEOUT = 120
def get_pem_paths(domain: str) -> tuple[str, str]:
"""Get host and container PEM paths for a domain.
Args:
domain: Domain name
Returns:
Tuple of (host_path, container_path)
"""
return (
os.path.join(CERTS_DIR, f"{domain}.pem"),
os.path.join(CERTS_DIR_CONTAINER, f"{domain}.pem")
)
def load_cert_config() -> list[str]:
"""Load certificate domain list from JSON file.
Returns:
List of domain names
"""
try:
with open(CERTS_JSON, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("domains", [])
except FileNotFoundError:
return []
except json.JSONDecodeError as e:
logger.warning("Corrupt certificates.json: %s", e)
return []
def save_cert_config(domains: list[str]) -> None:
"""Save certificate domain list to JSON file atomically.
Args:
domains: List of domain names
"""
atomic_write_file(CERTS_JSON, json.dumps({"domains": sorted(domains)}, indent=2))
def add_cert_to_config(domain: str) -> None:
"""Add a domain to the certificate config."""
domains = load_cert_config()
if domain not in domains:
domains.append(domain)
save_cert_config(domains)
def remove_cert_from_config(domain: str) -> None:
"""Remove a domain from the certificate config."""
domains = load_cert_config()
if domain in domains:
domains.remove(domain)
save_cert_config(domains)
def load_cert_to_haproxy(domain: str) -> tuple[bool, str]:
"""Load a certificate into HAProxy via Runtime API (zero-downtime).
Args:
domain: Domain name
Returns:
Tuple of (success, message)
"""
host_path, container_path = get_pem_paths(domain)
if not os.path.exists(host_path):
return False, f"PEM file not found: {host_path}"
try:
# Read PEM content
with open(host_path, "r", encoding="utf-8") as f:
pem_content = f.read()
# Check if cert already loaded
result = haproxy_cmd("show ssl cert")
if container_path in result:
# Update existing cert
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "updated"
else:
# Add new cert
haproxy_cmd(f"new ssl cert {container_path}")
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "added"
except Exception as e:
return False, str(e)
def unload_cert_from_haproxy(domain: str) -> tuple[bool, str]:
"""Unload a certificate from HAProxy via Runtime API.
Args:
domain: Domain name
Returns:
Tuple of (success, message)
"""
_, container_path = get_pem_paths(domain)
try:
# Check if cert is loaded
result = haproxy_cmd("show ssl cert")
if container_path not in result:
return True, "not loaded"
# Delete from HAProxy runtime
haproxy_cmd(f"del ssl cert {container_path}")
return True, "unloaded"
except Exception as e:
return False, str(e)
def restore_certificates() -> int:
"""Restore all certificates from config to HAProxy on startup.
Returns:
Number of certificates restored
"""
domains = load_cert_config()
restored = 0
for domain in domains:
success, msg = load_cert_to_haproxy(domain)
if success:
restored += 1
logger.debug("Certificate %s: %s", domain, msg)
else:
logger.warning("Failed to restore certificate %s: %s", domain, msg)
return restored
def register_certificate_tools(mcp):
"""Register certificate management tools with MCP server."""
@@ -42,19 +186,23 @@ def register_certificate_tools(mcp):
if len(lines) <= 1:
return "No certificates found"
# Get HAProxy loaded certs
try:
haproxy_certs = haproxy_cmd("show ssl cert")
except Exception:
haproxy_certs = ""
# Parse and format output
# Format: Main_Domain KeyLength SAN_Domains Profile CA Created Renew
certs = []
for line in lines[1:]: # Skip header
parts = line.split()
if len(parts) >= 4:
domain = parts[0]
# Find CA and dates by looking for known patterns
ca = "unknown"
created = "unknown"
renew = "unknown"
for i, part in enumerate(parts):
for part in parts:
if "Google" in part or "LetsEncrypt" in part or "ZeroSSL" in part:
ca = part
elif part.endswith("Z") and "T" in part:
@@ -63,11 +211,16 @@ def register_certificate_tools(mcp):
else:
renew = part
# Check if PEM exists in HAProxy certs dir
pem_path = os.path.join(CERTS_DIR, f"{domain}.pem")
deployed = "deployed" if os.path.exists(pem_path) else "not deployed"
# Check deployment status
host_path, container_path = get_pem_paths(domain)
if container_path in haproxy_certs:
status = "loaded"
elif os.path.exists(host_path):
status = "file exists (not loaded)"
else:
status = "not deployed"
certs.append(f"{domain} ({ca})\n Created: {created}\n Renew: {renew}\n Status: {deployed}")
certs.append(f"{domain} ({ca})\n Created: {created}\n Renew: {renew}\n Status: {status}")
return "\n\n".join(certs) if certs else "No certificates found"
except subprocess.TimeoutExpired:
@@ -88,14 +241,14 @@ def register_certificate_tools(mcp):
if not validate_domain(domain):
return "Error: Invalid domain format"
pem_path = os.path.join(CERTS_DIR, f"{domain}.pem")
if not os.path.exists(pem_path):
host_path, container_path = get_pem_paths(domain)
if not os.path.exists(host_path):
return f"Error: Certificate not found for {domain}"
try:
# Use openssl to get certificate info
result = subprocess.run(
["openssl", "x509", "-in", pem_path, "-noout",
["openssl", "x509", "-in", host_path, "-noout",
"-subject", "-issuer", "-dates", "-ext", "subjectAltName"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
@@ -103,13 +256,21 @@ def register_certificate_tools(mcp):
return f"Error reading certificate: {result.stderr}"
# Get file info
stat = os.stat(pem_path)
stat = os.stat(host_path)
modified = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
# Check HAProxy status
try:
haproxy_certs = haproxy_cmd("show ssl cert")
loaded = "Yes" if container_path in haproxy_certs else "No"
except Exception:
loaded = "Unknown"
info = [
f"Certificate: {domain}",
f"File: {pem_path}",
f"File: {host_path}",
f"Modified: {modified}",
f"Loaded in HAProxy: {loaded}",
"---",
result.stdout.strip()
]
@@ -126,7 +287,7 @@ def register_certificate_tools(mcp):
) -> str:
"""Issue a new SSL/TLS certificate using acme.sh with Cloudflare DNS.
Automatically deploys to HAProxy after issuance.
Automatically deploys to HAProxy via Runtime API (zero-downtime).
Example: haproxy_issue_cert("example.com", wildcard=True)
"""
@@ -135,7 +296,6 @@ def register_certificate_tools(mcp):
# Check if CF_Token is available
if not os.environ.get("CF_Token"):
# Try to load from secrets file
secrets_file = os.path.expanduser("~/.secrets/cloudflare.ini")
if os.path.exists(secrets_file):
try:
@@ -156,9 +316,11 @@ def register_certificate_tools(mcp):
if os.path.exists(cert_dir):
return f"Error: Certificate for {domain} already exists. Use haproxy_renew_cert to renew."
# Build acme.sh command
pem_path = os.path.join(CERTS_DIR, f"{domain}.pem")
reload_cmd = f"cat {ACME_HOME}/{domain}_ecc/fullchain.cer {ACME_HOME}/{domain}_ecc/{domain}.key > {pem_path} && podman exec {HAPROXY_CONTAINER} kill -USR2 1"
# Build acme.sh command (without reload - we'll do it via Runtime API)
host_path, _ = get_pem_paths(domain)
# Create PEM after issuance
install_cmd = f"cat {ACME_HOME}/{domain}_ecc/fullchain.cer {ACME_HOME}/{domain}_ecc/{domain}.key > {host_path}"
cmd = [
ACME_SH, "--issue",
@@ -169,7 +331,7 @@ def register_certificate_tools(mcp):
if wildcard:
cmd.extend(["-d", f"*.{domain}"])
cmd.extend(["--reloadcmd", reload_cmd])
cmd.extend(["--reloadcmd", install_cmd])
try:
logger.info("Issuing certificate for %s", domain)
@@ -183,11 +345,17 @@ def register_certificate_tools(mcp):
error_msg = result.stderr or result.stdout
return f"Error issuing certificate:\n{error_msg}"
# Verify deployment
if os.path.exists(pem_path):
return f"Certificate issued and deployed for {domain}\nFile: {pem_path}"
# Load into HAProxy via Runtime API (zero-downtime)
if os.path.exists(host_path):
success, msg = load_cert_to_haproxy(domain)
if success:
# Save to config for persistence
add_cert_to_config(domain)
return f"Certificate issued and loaded for {domain} ({msg})"
else:
return f"Certificate issued but HAProxy loading failed: {msg}"
else:
return f"Certificate issued but deployment may have failed. Check {pem_path}"
return f"Certificate issued but PEM file not created. Check {host_path}"
except subprocess.TimeoutExpired:
return f"Error: Certificate issuance timed out after {CERT_TIMEOUT}s"
@@ -201,6 +369,8 @@ def register_certificate_tools(mcp):
) -> str:
"""Renew an existing certificate.
Uses Runtime API for zero-downtime reload.
Example: haproxy_renew_cert("example.com", force=True)
"""
if not validate_domain(domain):
@@ -224,14 +394,20 @@ def register_certificate_tools(mcp):
output = result.stdout + result.stderr
if "Cert success" in output or "Reload success" in output:
return f"Certificate renewed for {domain}"
elif "Skip" in output and "Not in renewal period" in output:
if "Skip" in output and "Not yet due" in output:
return f"Certificate for {domain} not due for renewal. Use force=True to force renewal."
elif result.returncode != 0:
return f"Error renewing certificate:\n{output}"
if "Cert success" in output or result.returncode == 0:
# Reload into HAProxy via Runtime API
success, msg = load_cert_to_haproxy(domain)
if success:
# Ensure in config
add_cert_to_config(domain)
return f"Certificate renewed and reloaded for {domain} ({msg})"
else:
return f"Certificate renewed but HAProxy reload failed: {msg}"
else:
return f"Renewal completed:\n{output}"
return f"Error renewing certificate:\n{output}"
except subprocess.TimeoutExpired:
return f"Error: Certificate renewal timed out after {CERT_TIMEOUT}s"
@@ -248,7 +424,7 @@ def register_certificate_tools(mcp):
logger.info("Running certificate renewal cron")
result = subprocess.run(
[ACME_SH, "--cron"],
capture_output=True, text=True, timeout=CERT_TIMEOUT * 3, # Longer timeout for all certs
capture_output=True, text=True, timeout=CERT_TIMEOUT * 3,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
@@ -258,8 +434,15 @@ def register_certificate_tools(mcp):
renewed = output.count("Cert success")
skipped = output.count("Skip")
# Reload any renewed certs into HAProxy
if renewed > 0:
return f"Renewed {renewed} certificate(s), skipped {skipped}"
domains = load_cert_config()
reloaded = 0
for domain in domains:
success, _ = load_cert_to_haproxy(domain)
if success:
reloaded += 1
return f"Renewed {renewed} certificate(s), reloaded {reloaded} into HAProxy"
elif skipped > 0:
return f"No certificates due for renewal ({skipped} checked)"
elif result.returncode != 0:
@@ -286,14 +469,21 @@ def register_certificate_tools(mcp):
return "Error: Invalid domain format"
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
pem_path = os.path.join(CERTS_DIR, f"{domain}.pem")
host_path, _ = get_pem_paths(domain)
if not os.path.exists(cert_dir) and not os.path.exists(pem_path):
if not os.path.exists(cert_dir) and not os.path.exists(host_path):
return f"Error: No certificate found for {domain}"
errors = []
deleted = []
# Unload from HAProxy first (zero-downtime)
success, msg = unload_cert_from_haproxy(domain)
if success:
deleted.append(f"HAProxy ({msg})")
else:
errors.append(f"HAProxy unload: {msg}")
# Remove from acme.sh
if os.path.exists(cert_dir):
try:
@@ -305,28 +495,49 @@ def register_certificate_tools(mcp):
if result.returncode == 0:
deleted.append("acme.sh")
else:
errors.append(f"acme.sh removal: {result.stderr}")
errors.append(f"acme.sh: {result.stderr}")
except Exception as e:
errors.append(f"acme.sh removal: {e}")
errors.append(f"acme.sh: {e}")
# Remove PEM file
if os.path.exists(pem_path):
if os.path.exists(host_path):
try:
os.remove(pem_path)
deleted.append("HAProxy PEM")
# Signal HAProxy to reload certs
subprocess.run(
["podman", "exec", HAPROXY_CONTAINER, "kill", "-USR2", "1"],
capture_output=True, timeout=SUBPROCESS_TIMEOUT
)
os.remove(host_path)
deleted.append("PEM file")
except Exception as e:
errors.append(f"PEM removal: {e}")
errors.append(f"PEM file: {e}")
# Remove from config
remove_cert_from_config(domain)
result_parts = []
if deleted:
result_parts.append(f"Deleted from: {', '.join(deleted)}")
result_parts.append(f"Deleted: {', '.join(deleted)}")
if errors:
result_parts.append(f"Errors: {'; '.join(errors)}")
return "\n".join(result_parts) if result_parts else f"Certificate {domain} deleted"
@mcp.tool()
def haproxy_load_cert(
domain: Annotated[str, Field(description="Domain name to load certificate for")]
) -> str:
"""Load/reload a certificate into HAProxy (zero-downtime).
Use after manually updating a certificate file.
Example: haproxy_load_cert("example.com")
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, _ = get_pem_paths(domain)
if not os.path.exists(host_path):
return f"Error: PEM file not found: {host_path}"
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate {domain} loaded into HAProxy ({msg})"
else:
return f"Error loading certificate: {msg}"

View File

@@ -95,7 +95,7 @@ def restore_servers_from_config() -> int:
def startup_restore() -> None:
"""Restore servers from config file on startup."""
"""Restore servers and certificates from config files on startup."""
# Wait for HAProxy to be ready
for _ in range(STARTUP_RETRY_COUNT):
try:
@@ -107,6 +107,7 @@ def startup_restore() -> None:
logger.warning("HAProxy not ready, skipping restore")
return
# Restore servers
try:
count = restore_servers_from_config()
if count > 0:
@@ -114,6 +115,15 @@ def startup_restore() -> None:
except (HaproxyError, OSError, ValueError) as e:
logger.warning("Failed to restore servers: %s", e)
# Restore certificates
try:
from .certificates import restore_certificates
cert_count = restore_certificates()
if cert_count > 0:
logger.info("Restored %d certificates from config", cert_count)
except Exception as e:
logger.warning("Failed to restore certificates: %s", e)
def register_config_tools(mcp):
"""Register configuration management tools with MCP server."""