Files
haproxy-mcp/haproxy_mcp/tools/certificates.py
kappa e40d69a1b1 feat: Add SSH remote execution for HAProxy on remote host
MCP server can now manage HAProxy running on a remote host via SSH.
When SSH_HOST env var is set, all file I/O and subprocess commands
(podman, acme.sh, openssl) are routed through SSH instead of local exec.

- Add ssh_ops.py module with remote_exec, run_command, file I/O helpers
- Modify file_ops.py to support remote reads/writes via SSH
- Update all tools (domains, certificates, health, configuration) for SSH
- Fix domains.py: replace direct fcntl usage with file_lock context manager
- Add openssh-client to Docker image for SSH connectivity
- Update k8s deployment with SSH env vars and SSH key secret mount

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 22:56:54 +09:00

479 lines
16 KiB
Python

"""Certificate management tools for HAProxy MCP Server."""
import os
from datetime import datetime
from typing import Annotated
from pydantic import Field
from ..config import (
logger,
SUBPROCESS_TIMEOUT,
CERTS_DIR,
CERTS_DIR_CONTAINER,
ACME_HOME,
REMOTE_MODE,
)
from ..exceptions import HaproxyError
from ..validation import validate_domain
from ..haproxy_client import haproxy_cmd
from ..file_ops import (
load_certs_config,
add_cert_to_config,
remove_cert_from_config,
_read_file,
)
from ..ssh_ops import run_command, remote_file_exists
# acme.sh script path (derived from ACME_HOME)
ACME_SH = f"{ACME_HOME}/acme.sh"
# Longer timeout for certificate operations (ACME can be slow)
CERT_TIMEOUT = 120
def _file_exists(path: str) -> bool:
"""Check file existence locally or remotely."""
if REMOTE_MODE:
return remote_file_exists(path)
return os.path.exists(path)
def get_pem_paths(domain: str) -> tuple[str, str]:
"""Get host and container PEM paths for a domain."""
return (
f"{CERTS_DIR}/{domain}.pem",
f"{CERTS_DIR_CONTAINER}/{domain}.pem",
)
def load_cert_to_haproxy(domain: str) -> tuple[bool, str]:
"""Load a certificate into HAProxy via Runtime API (zero-downtime)."""
host_path, container_path = get_pem_paths(domain)
if not _file_exists(host_path):
return False, f"PEM file not found: {host_path}"
try:
pem_content = _read_file(host_path)
result = haproxy_cmd("show ssl cert")
if container_path in result:
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "updated"
else:
haproxy_cmd(f"new ssl cert {container_path}")
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "added"
except HaproxyError as e:
logger.error("HAProxy error loading certificate %s: %s", domain, e)
return False, str(e)
except (IOError, OSError) as e:
logger.error("File error loading certificate %s: %s", domain, e)
return False, str(e)
def unload_cert_from_haproxy(domain: str) -> tuple[bool, str]:
"""Unload a certificate from HAProxy via Runtime API."""
_, container_path = get_pem_paths(domain)
try:
result = haproxy_cmd("show ssl cert")
if container_path not in result:
return True, "not loaded"
haproxy_cmd(f"del ssl cert {container_path}")
return True, "unloaded"
except HaproxyError as e:
logger.error("HAProxy error unloading certificate %s: %s", domain, e)
return False, str(e)
def restore_certificates() -> int:
"""Restore all certificates from config to HAProxy on startup."""
domains = load_certs_config()
restored = 0
for domain in domains:
success, msg = load_cert_to_haproxy(domain)
if success:
restored += 1
logger.debug("Certificate %s: %s", domain, msg)
else:
logger.warning("Failed to restore certificate %s: %s", domain, msg)
return restored
def _haproxy_list_certs_impl() -> str:
"""Implementation of haproxy_list_certs."""
try:
result = run_command([ACME_SH, "--list"], timeout=SUBPROCESS_TIMEOUT)
if result.returncode != 0:
return f"Error: {result.stderr}"
lines = result.stdout.strip().split("\n")
if len(lines) <= 1:
return "No certificates found"
try:
haproxy_certs = haproxy_cmd("show ssl cert")
except HaproxyError as e:
logger.debug("Could not get HAProxy certs: %s", e)
haproxy_certs = ""
certs = []
for line in lines[1:]:
parts = line.split()
if len(parts) >= 4:
domain = parts[0]
ca = "unknown"
created = "unknown"
renew = "unknown"
for part in parts:
if "Google" in part or "LetsEncrypt" in part or "ZeroSSL" in part:
ca = part
elif part.endswith("Z") and "T" in part:
if created == "unknown":
created = part
else:
renew = part
host_path, container_path = get_pem_paths(domain)
if container_path in haproxy_certs:
status = "loaded"
elif _file_exists(host_path):
status = "file exists (not loaded)"
else:
status = "not deployed"
certs.append(f"{domain} ({ca})\n Created: {created}\n Renew: {renew}\n Status: {status}")
return "\n\n".join(certs) if certs else "No certificates found"
except TimeoutError:
return "Error: Command timed out"
except FileNotFoundError:
return "Error: acme.sh not found"
except OSError as e:
logger.error("Error listing certificates: %s", e)
return f"Error: {e}"
def _haproxy_cert_info_impl(domain: str) -> str:
"""Implementation of haproxy_cert_info."""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, container_path = get_pem_paths(domain)
if not _file_exists(host_path):
return f"Error: Certificate not found for {domain}"
try:
result = run_command(
["openssl", "x509", "-in", host_path, "-noout",
"-subject", "-issuer", "-dates", "-ext", "subjectAltName"],
timeout=SUBPROCESS_TIMEOUT,
)
if result.returncode != 0:
return f"Error reading certificate: {result.stderr}"
# Get file modification time
stat_result = run_command(["stat", "-c", "%Y", host_path])
if stat_result.returncode == 0:
ts = int(stat_result.stdout.strip())
modified = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
else:
modified = "unknown"
try:
haproxy_certs = haproxy_cmd("show ssl cert")
loaded = "Yes" if container_path in haproxy_certs else "No"
except HaproxyError as e:
logger.debug("Could not check HAProxy cert status: %s", e)
loaded = "Unknown"
info = [
f"Certificate: {domain}",
f"File: {host_path}",
f"Modified: {modified}",
f"Loaded in HAProxy: {loaded}",
"---",
result.stdout.strip()
]
return "\n".join(info)
except TimeoutError:
return "Error: Command timed out"
except OSError as e:
logger.error("Error getting certificate info for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_issue_cert_impl(domain: str, wildcard: bool) -> str:
"""Implementation of haproxy_issue_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = f"{ACME_HOME}/{domain}_ecc"
if _file_exists(cert_dir):
return f"Error: Certificate for {domain} already exists. Use haproxy_renew_cert to renew."
host_path, _ = get_pem_paths(domain)
install_cmd = f"cat {ACME_HOME}/{domain}_ecc/fullchain.cer {ACME_HOME}/{domain}_ecc/{domain}.key > {host_path}"
cmd = [ACME_SH, "--issue", "--dns", "dns_cf", "-d", domain]
if wildcard:
cmd.extend(["-d", f"*.{domain}"])
cmd.extend(["--reloadcmd", install_cmd])
try:
logger.info("Issuing certificate for %s", domain)
result = run_command(cmd, timeout=CERT_TIMEOUT)
if result.returncode != 0:
error_msg = result.stderr or result.stdout
return f"Error issuing certificate:\n{error_msg}"
if _file_exists(host_path):
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate issued and loaded for {domain} ({msg})"
else:
return f"Certificate issued but HAProxy loading failed: {msg}"
else:
return f"Certificate issued but PEM file not created. Check {host_path}"
except TimeoutError:
return f"Error: Certificate issuance timed out after {CERT_TIMEOUT}s"
except OSError as e:
logger.error("Error issuing certificate for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_renew_cert_impl(domain: str, force: bool) -> str:
"""Implementation of haproxy_renew_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = f"{ACME_HOME}/{domain}_ecc"
if not _file_exists(cert_dir):
return f"Error: No certificate found for {domain}. Use haproxy_issue_cert first."
cmd = [ACME_SH, "--renew", "-d", domain]
if force:
cmd.append("--force")
try:
logger.info("Renewing certificate for %s", domain)
result = run_command(cmd, timeout=CERT_TIMEOUT)
output = result.stdout + result.stderr
if "Skip" in output and "Not yet due" in output:
return f"Certificate for {domain} not due for renewal. Use force=True to force renewal."
if "Cert success" in output or result.returncode == 0:
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate renewed and reloaded for {domain} ({msg})"
else:
return f"Certificate renewed but HAProxy reload failed: {msg}"
else:
return f"Error renewing certificate:\n{output}"
except TimeoutError:
return f"Error: Certificate renewal timed out after {CERT_TIMEOUT}s"
except FileNotFoundError:
return "Error: acme.sh not found"
except OSError as e:
logger.error("Error renewing certificate for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_renew_all_certs_impl() -> str:
"""Implementation of haproxy_renew_all_certs."""
try:
logger.info("Running certificate renewal cron")
result = run_command([ACME_SH, "--cron"], timeout=CERT_TIMEOUT * 3)
output = result.stdout + result.stderr
renewed = output.count("Cert success")
skipped = output.count("Skip")
if renewed > 0:
domains = load_certs_config()
reloaded = 0
for domain in domains:
success, _ = load_cert_to_haproxy(domain)
if success:
reloaded += 1
return f"Renewed {renewed} certificate(s), reloaded {reloaded} into HAProxy"
elif skipped > 0:
return f"No certificates due for renewal ({skipped} checked)"
elif result.returncode != 0:
return f"Error running renewal:\n{output}"
else:
return "Renewal check completed"
except TimeoutError:
return "Error: Renewal cron timed out"
except FileNotFoundError:
return "Error: acme.sh not found"
except OSError as e:
logger.error("Error running certificate renewal cron: %s", e)
return f"Error: {e}"
def _haproxy_delete_cert_impl(domain: str) -> str:
"""Implementation of haproxy_delete_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = f"{ACME_HOME}/{domain}_ecc"
host_path, _ = get_pem_paths(domain)
if not _file_exists(cert_dir) and not _file_exists(host_path):
return f"Error: No certificate found for {domain}"
errors = []
deleted = []
success, msg = unload_cert_from_haproxy(domain)
if success:
deleted.append(f"HAProxy ({msg})")
else:
errors.append(f"HAProxy unload: {msg}")
if _file_exists(cert_dir):
try:
result = run_command(
[ACME_SH, "--remove", "-d", domain],
timeout=SUBPROCESS_TIMEOUT,
)
if result.returncode == 0:
deleted.append("acme.sh")
else:
errors.append(f"acme.sh: {result.stderr}")
except (TimeoutError, OSError) as e:
errors.append(f"acme.sh: {e}")
if _file_exists(host_path):
try:
result = run_command(["rm", "-f", host_path])
if result.returncode == 0:
deleted.append("PEM file")
else:
errors.append(f"PEM file: {result.stderr}")
except OSError as e:
errors.append(f"PEM file: {e}")
remove_cert_from_config(domain)
result_parts = []
if deleted:
result_parts.append(f"Deleted: {', '.join(deleted)}")
if errors:
result_parts.append(f"Errors: {'; '.join(errors)}")
return "\n".join(result_parts) if result_parts else f"Certificate {domain} deleted"
def _haproxy_load_cert_impl(domain: str) -> str:
"""Implementation of haproxy_load_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, _ = get_pem_paths(domain)
if not _file_exists(host_path):
return f"Error: PEM file not found: {host_path}"
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate {domain} loaded into HAProxy ({msg})"
else:
return f"Error loading certificate: {msg}"
def register_certificate_tools(mcp):
"""Register certificate management tools with MCP server."""
@mcp.tool()
def haproxy_list_certs() -> str:
"""List all SSL/TLS certificates with expiry information.
Returns:
List of certificates with domain, CA, created date, and renewal date
"""
return _haproxy_list_certs_impl()
@mcp.tool()
def haproxy_cert_info(
domain: Annotated[str, Field(description="Domain name to check (e.g., example.com)")]
) -> str:
"""Get detailed certificate information for a domain.
Shows expiry date, issuer, SANs, and file paths.
"""
return _haproxy_cert_info_impl(domain)
@mcp.tool()
def haproxy_issue_cert(
domain: Annotated[str, Field(description="Primary domain (e.g., example.com)")],
wildcard: Annotated[bool, Field(default=True, description="Include wildcard (*.example.com). Default: true")] = True
) -> str:
"""Issue a new SSL/TLS certificate using acme.sh with Cloudflare DNS.
Automatically deploys to HAProxy via Runtime API (zero-downtime).
Example: haproxy_issue_cert("example.com", wildcard=True)
"""
return _haproxy_issue_cert_impl(domain, wildcard)
@mcp.tool()
def haproxy_renew_cert(
domain: Annotated[str, Field(description="Domain name to renew (e.g., example.com)")],
force: Annotated[bool, Field(default=False, description="Force renewal even if not due. Default: false")] = False
) -> str:
"""Renew an existing certificate.
Uses Runtime API for zero-downtime reload.
Example: haproxy_renew_cert("example.com", force=True)
"""
return _haproxy_renew_cert_impl(domain, force)
@mcp.tool()
def haproxy_renew_all_certs() -> str:
"""Renew all certificates that are due for renewal.
This runs the acme.sh cron job to check and renew all certificates.
"""
return _haproxy_renew_all_certs_impl()
@mcp.tool()
def haproxy_delete_cert(
domain: Annotated[str, Field(description="Domain name to delete certificate for")]
) -> str:
"""Delete a certificate from acme.sh and HAProxy.
WARNING: This permanently removes the certificate. The domain will lose HTTPS.
Example: haproxy_delete_cert("example.com")
"""
return _haproxy_delete_cert_impl(domain)
@mcp.tool()
def haproxy_load_cert(
domain: Annotated[str, Field(description="Domain name to load certificate for")]
) -> str:
"""Load/reload a certificate into HAProxy (zero-downtime).
Use after manually updating a certificate file.
Example: haproxy_load_cert("example.com")
"""
return _haproxy_load_cert_impl(domain)