Files
haproxy-mcp/haproxy_mcp/tools/certificates.py
kaffa 5c0af11735 Fix list_certs to use PEM files + openssl instead of acme.sh
acme.sh --list failed in uv/systemd context. Now scans certs directory
directly and uses openssl for cert details + HAProxy show ssl cert API
for loaded status.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 14:40:11 +09:00

501 lines
17 KiB
Python

"""Certificate management tools for HAProxy MCP Server."""
import os
import subprocess
from datetime import datetime
from typing import Annotated
from pydantic import Field
from ..config import (
logger,
SUBPROCESS_TIMEOUT,
CERTS_DIR,
CERTS_DIR_CONTAINER,
ACME_HOME,
HAPROXY_PORT,
REMOTE_MODE,
)
from ..exceptions import HaproxyError
from ..validation import validate_domain
from ..haproxy_client import haproxy_cmd
from ..file_ops import (
load_certs_config,
add_cert_to_config,
remove_cert_from_config,
_read_file,
)
from ..ssh_ops import run_command, remote_file_exists
# acme.sh script path (derived from ACME_HOME)
ACME_SH = f"{ACME_HOME}/acme.sh"
# Longer timeout for certificate operations (ACME can be slow)
CERT_TIMEOUT = 120
def _file_exists(path: str) -> bool:
"""Check file existence locally or remotely."""
if REMOTE_MODE:
return remote_file_exists(path)
return os.path.exists(path)
def get_pem_paths(domain: str) -> tuple[str, str]:
"""Get host and container PEM paths for a domain."""
return (
f"{CERTS_DIR}/{domain}.pem",
f"{CERTS_DIR_CONTAINER}/{domain}.pem",
)
def load_cert_to_haproxy(domain: str) -> tuple[bool, str]:
"""Load a certificate into HAProxy via Runtime API (zero-downtime)."""
host_path, container_path = get_pem_paths(domain)
if not _file_exists(host_path):
return False, f"PEM file not found: {host_path}"
try:
pem_content = _read_file(host_path)
result = haproxy_cmd("show ssl cert")
if container_path in result:
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "updated"
else:
haproxy_cmd(f"new ssl cert {container_path}")
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "added"
except HaproxyError as e:
logger.error("HAProxy error loading certificate %s: %s", domain, e)
return False, str(e)
except (IOError, OSError) as e:
logger.error("File error loading certificate %s: %s", domain, e)
return False, str(e)
def unload_cert_from_haproxy(domain: str) -> tuple[bool, str]:
"""Unload a certificate from HAProxy via Runtime API."""
_, container_path = get_pem_paths(domain)
try:
result = haproxy_cmd("show ssl cert")
if container_path not in result:
return True, "not loaded"
haproxy_cmd(f"del ssl cert {container_path}")
return True, "unloaded"
except HaproxyError as e:
logger.error("HAProxy error unloading certificate %s: %s", domain, e)
return False, str(e)
def restore_certificates() -> int:
"""Restore all certificates from config to HAProxy on startup."""
domains = load_certs_config()
restored = 0
for domain in domains:
success, msg = load_cert_to_haproxy(domain)
if success:
restored += 1
logger.debug("Certificate %s: %s", domain, msg)
else:
logger.warning("Failed to restore certificate %s: %s", domain, msg)
return restored
def _haproxy_list_certs_impl() -> str:
"""Implementation of haproxy_list_certs."""
try:
# Get loaded certs from HAProxy Runtime API
try:
haproxy_certs = haproxy_cmd("show ssl cert")
except HaproxyError as e:
logger.debug("Could not get HAProxy certs: %s", e)
haproxy_certs = ""
# Scan PEM files in certs directory
pem_files = sorted(
f for f in os.listdir(CERTS_DIR)
if f.endswith(".pem")
) if os.path.isdir(CERTS_DIR) else []
if not pem_files:
return "No certificates found"
certs = []
for pem_file in pem_files:
domain = pem_file[:-4] # strip .pem
host_path = f"{CERTS_DIR}/{pem_file}"
_, container_path = get_pem_paths(domain)
# Get cert details via openssl
issuer = "unknown"
not_after = "unknown"
sans = ""
try:
result = run_command(
["openssl", "x509", "-in", host_path, "-noout",
"-issuer", "-enddate", "-ext", "subjectAltName"],
timeout=SUBPROCESS_TIMEOUT,
)
if result.returncode == 0:
for line in result.stdout.split("\n"):
line = line.strip()
if line.startswith("issuer="):
# Extract CN or O from issuer
for field in line.split(","):
field = field.strip()
if field.startswith("CN =") or field.startswith("CN="):
issuer = field.split("=", 1)[1].strip()
elif field.startswith("O =") or field.startswith("O="):
if issuer == "unknown":
issuer = field.split("=", 1)[1].strip()
elif line.startswith("notAfter="):
not_after = line.split("=", 1)[1].strip()
elif "DNS:" in line:
sans = ", ".join(
s.strip().replace("DNS:", "")
for s in line.split(",")
if "DNS:" in s
)
except (TimeoutError, OSError):
pass
status = "loaded" if container_path in haproxy_certs else "file only"
san_info = f"\n SANs: {sans}" if sans else ""
certs.append(
f"{domain} ({issuer})\n Expires: {not_after}{san_info}\n Status: {status}"
)
return "\n\n".join(certs) if certs else "No certificates found"
except OSError as e:
logger.error("Error listing certificates: %s", e)
return f"Error: {e}"
def _haproxy_cert_info_impl(domain: str) -> str:
"""Implementation of haproxy_cert_info."""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, container_path = get_pem_paths(domain)
if not _file_exists(host_path):
return f"Error: Certificate not found for {domain}"
try:
result = run_command(
["openssl", "x509", "-in", host_path, "-noout",
"-subject", "-issuer", "-dates", "-ext", "subjectAltName"],
timeout=SUBPROCESS_TIMEOUT,
)
if result.returncode != 0:
return f"Error reading certificate: {result.stderr}"
# Get file modification time
stat_result = run_command(["stat", "-c", "%Y", host_path])
if stat_result.returncode == 0:
ts = int(stat_result.stdout.strip())
modified = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
else:
modified = "unknown"
try:
haproxy_certs = haproxy_cmd("show ssl cert")
loaded = "Yes" if container_path in haproxy_certs else "No"
except HaproxyError as e:
logger.debug("Could not check HAProxy cert status: %s", e)
loaded = "Unknown"
info = [
f"Certificate: {domain}",
f"File: {host_path}",
f"Modified: {modified}",
f"Loaded in HAProxy: {loaded}",
"---",
result.stdout.strip()
]
return "\n".join(info)
except (TimeoutError, subprocess.TimeoutExpired):
return "Error: Command timed out"
except OSError as e:
logger.error("Error getting certificate info for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_issue_cert_impl(domain: str, wildcard: bool) -> str:
"""Implementation of haproxy_issue_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = f"{ACME_HOME}/{domain}_ecc"
if _file_exists(cert_dir):
return f"Error: Certificate for {domain} already exists. Use haproxy_renew_cert to renew."
host_path, container_path = get_pem_paths(domain)
cert_dir = f"{ACME_HOME}/{domain}_ecc"
reload_cmd = (
f"cat {cert_dir}/fullchain.cer {cert_dir}/{domain}.key > {host_path}"
f' && printf "set ssl cert {container_path} <<\\n%s\\n\\n" "$(cat {host_path})" | nc localhost {HAPROXY_PORT}'
f' && echo "commit ssl cert {container_path}" | nc localhost {HAPROXY_PORT}'
)
cmd = [ACME_SH, "--issue", "--dns", "dns_cf", "-d", domain]
if wildcard:
cmd.extend(["-d", f"*.{domain}"])
cmd.extend(["--days", "60", "--reloadcmd", reload_cmd])
try:
logger.info("Issuing certificate for %s", domain)
result = run_command(cmd, timeout=CERT_TIMEOUT)
if result.returncode != 0:
error_msg = result.stderr or result.stdout
return f"Error issuing certificate:\n{error_msg}"
if _file_exists(host_path):
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate issued and loaded for {domain} ({msg})"
else:
return f"Certificate issued but HAProxy loading failed: {msg}"
else:
return f"Certificate issued but PEM file not created. Check {host_path}"
except (TimeoutError, subprocess.TimeoutExpired):
return f"Error: Certificate issuance timed out after {CERT_TIMEOUT}s"
except OSError as e:
logger.error("Error issuing certificate for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_renew_cert_impl(domain: str, force: bool) -> str:
"""Implementation of haproxy_renew_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = f"{ACME_HOME}/{domain}_ecc"
if not _file_exists(cert_dir):
return f"Error: No certificate found for {domain}. Use haproxy_issue_cert first."
cmd = [ACME_SH, "--renew", "-d", domain]
if force:
cmd.append("--force")
try:
logger.info("Renewing certificate for %s", domain)
result = run_command(cmd, timeout=CERT_TIMEOUT)
output = result.stdout + result.stderr
if "Skip" in output and "Not yet due" in output:
return f"Certificate for {domain} not due for renewal. Use force=True to force renewal."
if "Cert success" in output or result.returncode == 0:
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate renewed and reloaded for {domain} ({msg})"
else:
return f"Certificate renewed but HAProxy reload failed: {msg}"
else:
return f"Error renewing certificate:\n{output}"
except (TimeoutError, subprocess.TimeoutExpired):
return f"Error: Certificate renewal timed out after {CERT_TIMEOUT}s"
except FileNotFoundError:
return "Error: acme.sh not found"
except OSError as e:
logger.error("Error renewing certificate for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_renew_all_certs_impl() -> str:
"""Implementation of haproxy_renew_all_certs."""
try:
logger.info("Running certificate renewal cron")
result = run_command([ACME_SH, "--cron"], timeout=CERT_TIMEOUT * 3)
output = result.stdout + result.stderr
renewed = output.count("Cert success")
skipped = output.count("Skip")
if renewed > 0:
domains = load_certs_config()
reloaded = 0
for domain in domains:
success, _ = load_cert_to_haproxy(domain)
if success:
reloaded += 1
return f"Renewed {renewed} certificate(s), reloaded {reloaded} into HAProxy"
elif skipped > 0:
return f"No certificates due for renewal ({skipped} checked)"
elif result.returncode != 0:
return f"Error running renewal:\n{output}"
else:
return "Renewal check completed"
except (TimeoutError, subprocess.TimeoutExpired):
return "Error: Renewal cron timed out"
except FileNotFoundError:
return "Error: acme.sh not found"
except OSError as e:
logger.error("Error running certificate renewal cron: %s", e)
return f"Error: {e}"
def _haproxy_delete_cert_impl(domain: str) -> str:
"""Implementation of haproxy_delete_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = f"{ACME_HOME}/{domain}_ecc"
host_path, _ = get_pem_paths(domain)
if not _file_exists(cert_dir) and not _file_exists(host_path):
return f"Error: No certificate found for {domain}"
errors = []
deleted = []
success, msg = unload_cert_from_haproxy(domain)
if success:
deleted.append(f"HAProxy ({msg})")
else:
errors.append(f"HAProxy unload: {msg}")
if _file_exists(cert_dir):
try:
result = run_command(
[ACME_SH, "--remove", "-d", domain],
timeout=SUBPROCESS_TIMEOUT,
)
if result.returncode == 0:
deleted.append("acme.sh")
else:
errors.append(f"acme.sh: {result.stderr}")
except (TimeoutError, OSError) as e:
errors.append(f"acme.sh: {e}")
if _file_exists(host_path):
try:
result = run_command(["rm", "-f", host_path])
if result.returncode == 0:
deleted.append("PEM file")
else:
errors.append(f"PEM file: {result.stderr}")
except OSError as e:
errors.append(f"PEM file: {e}")
remove_cert_from_config(domain)
result_parts = []
if deleted:
result_parts.append(f"Deleted: {', '.join(deleted)}")
if errors:
result_parts.append(f"Errors: {'; '.join(errors)}")
return "\n".join(result_parts) if result_parts else f"Certificate {domain} deleted"
def _haproxy_load_cert_impl(domain: str) -> str:
"""Implementation of haproxy_load_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, _ = get_pem_paths(domain)
if not _file_exists(host_path):
return f"Error: PEM file not found: {host_path}"
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate {domain} loaded into HAProxy ({msg})"
else:
return f"Error loading certificate: {msg}"
def register_certificate_tools(mcp):
"""Register certificate management tools with MCP server."""
@mcp.tool()
def haproxy_list_certs() -> str:
"""List all SSL/TLS certificates with expiry information.
Returns:
List of certificates with domain, CA, created date, and renewal date
"""
return _haproxy_list_certs_impl()
@mcp.tool()
def haproxy_cert_info(
domain: Annotated[str, Field(description="Domain name to check (e.g., example.com)")]
) -> str:
"""Get detailed certificate information for a domain.
Shows expiry date, issuer, SANs, and file paths.
"""
return _haproxy_cert_info_impl(domain)
@mcp.tool()
def haproxy_issue_cert(
domain: Annotated[str, Field(description="Primary domain (e.g., example.com)")],
wildcard: Annotated[bool, Field(default=True, description="Include wildcard (*.example.com). Default: true")] = True
) -> str:
"""Issue a new SSL/TLS certificate using acme.sh with Cloudflare DNS.
Automatically deploys to HAProxy via Runtime API (zero-downtime).
Example: haproxy_issue_cert("example.com", wildcard=True)
"""
return _haproxy_issue_cert_impl(domain, wildcard)
@mcp.tool()
def haproxy_renew_cert(
domain: Annotated[str, Field(description="Domain name to renew (e.g., example.com)")],
force: Annotated[bool, Field(default=False, description="Force renewal even if not due. Default: false")] = False
) -> str:
"""Renew an existing certificate.
Uses Runtime API for zero-downtime reload.
Example: haproxy_renew_cert("example.com", force=True)
"""
return _haproxy_renew_cert_impl(domain, force)
@mcp.tool()
def haproxy_renew_all_certs() -> str:
"""Renew all certificates that are due for renewal.
This runs the acme.sh cron job to check and renew all certificates.
"""
return _haproxy_renew_all_certs_impl()
@mcp.tool()
def haproxy_delete_cert(
domain: Annotated[str, Field(description="Domain name to delete certificate for")]
) -> str:
"""Delete a certificate from acme.sh and HAProxy.
WARNING: This permanently removes the certificate. The domain will lose HTTPS.
Example: haproxy_delete_cert("example.com")
"""
return _haproxy_delete_cert_impl(domain)
@mcp.tool()
def haproxy_load_cert(
domain: Annotated[str, Field(description="Domain name to load certificate for")]
) -> str:
"""Load/reload a certificate into HAProxy (zero-downtime).
Use after manually updating a certificate file.
Example: haproxy_load_cert("example.com")
"""
return _haproxy_load_cert_impl(domain)