Files
haproxy-mcp/haproxy_mcp/tools/certificates.py
kaffa 6ced2b42d4 refactor: Move certificate config functions to file_ops.py
- Move load_certs_config, save_certs_config, add_cert_to_config,
  remove_cert_from_config from certificates.py to file_ops.py
- Add CERTS_FILE constant to config.py
- Add file locking for certificate config operations (was missing)
- Consistent pattern with servers.json handling

certificates.py: 543 → 503 lines
file_ops.py: 263 → 337 lines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 04:26:55 +00:00

504 lines
17 KiB
Python

"""Certificate management tools for HAProxy MCP Server."""
import os
import subprocess
from datetime import datetime
from typing import Annotated
from pydantic import Field
from ..config import logger, SUBPROCESS_TIMEOUT
from ..validation import validate_domain
from ..haproxy_client import haproxy_cmd
from ..file_ops import (
load_certs_config,
add_cert_to_config,
remove_cert_from_config,
)
# Certificate paths
ACME_SH = os.path.expanduser("~/.acme.sh/acme.sh")
ACME_HOME = os.path.expanduser("~/.acme.sh")
CERTS_DIR = "/opt/haproxy/certs"
CERTS_DIR_CONTAINER = "/etc/haproxy/certs"
# Longer timeout for certificate operations (ACME can be slow)
CERT_TIMEOUT = 120
def get_pem_paths(domain: str) -> tuple[str, str]:
"""Get host and container PEM paths for a domain.
Args:
domain: Domain name
Returns:
Tuple of (host_path, container_path)
"""
return (
os.path.join(CERTS_DIR, f"{domain}.pem"),
os.path.join(CERTS_DIR_CONTAINER, f"{domain}.pem")
)
def load_cert_to_haproxy(domain: str) -> tuple[bool, str]:
"""Load a certificate into HAProxy via Runtime API (zero-downtime).
Args:
domain: Domain name
Returns:
Tuple of (success, message)
"""
host_path, container_path = get_pem_paths(domain)
if not os.path.exists(host_path):
return False, f"PEM file not found: {host_path}"
try:
# Read PEM content
with open(host_path, "r", encoding="utf-8") as f:
pem_content = f.read()
# Check if cert already loaded
result = haproxy_cmd("show ssl cert")
if container_path in result:
# Update existing cert
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "updated"
else:
# Add new cert
haproxy_cmd(f"new ssl cert {container_path}")
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "added"
except Exception as e:
return False, str(e)
def unload_cert_from_haproxy(domain: str) -> tuple[bool, str]:
"""Unload a certificate from HAProxy via Runtime API.
Args:
domain: Domain name
Returns:
Tuple of (success, message)
"""
_, container_path = get_pem_paths(domain)
try:
# Check if cert is loaded
result = haproxy_cmd("show ssl cert")
if container_path not in result:
return True, "not loaded"
# Delete from HAProxy runtime
haproxy_cmd(f"del ssl cert {container_path}")
return True, "unloaded"
except Exception as e:
return False, str(e)
def restore_certificates() -> int:
"""Restore all certificates from config to HAProxy on startup.
Returns:
Number of certificates restored
"""
domains = load_certs_config()
restored = 0
for domain in domains:
success, msg = load_cert_to_haproxy(domain)
if success:
restored += 1
logger.debug("Certificate %s: %s", domain, msg)
else:
logger.warning("Failed to restore certificate %s: %s", domain, msg)
return restored
def register_certificate_tools(mcp):
"""Register certificate management tools with MCP server."""
@mcp.tool()
def haproxy_list_certs() -> str:
"""List all SSL/TLS certificates with expiry information.
Returns:
List of certificates with domain, CA, created date, and renewal date
"""
try:
result = subprocess.run(
[ACME_SH, "--list"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode != 0:
return f"Error: {result.stderr}"
lines = result.stdout.strip().split("\n")
if len(lines) <= 1:
return "No certificates found"
# Get HAProxy loaded certs
try:
haproxy_certs = haproxy_cmd("show ssl cert")
except Exception:
haproxy_certs = ""
# Parse and format output
certs = []
for line in lines[1:]: # Skip header
parts = line.split()
if len(parts) >= 4:
domain = parts[0]
ca = "unknown"
created = "unknown"
renew = "unknown"
for part in parts:
if "Google" in part or "LetsEncrypt" in part or "ZeroSSL" in part:
ca = part
elif part.endswith("Z") and "T" in part:
if created == "unknown":
created = part
else:
renew = part
# Check deployment status
host_path, container_path = get_pem_paths(domain)
if container_path in haproxy_certs:
status = "loaded"
elif os.path.exists(host_path):
status = "file exists (not loaded)"
else:
status = "not deployed"
certs.append(f"{domain} ({ca})\n Created: {created}\n Renew: {renew}\n Status: {status}")
return "\n\n".join(certs) if certs else "No certificates found"
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except FileNotFoundError:
return "Error: acme.sh not found"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_cert_info(
domain: Annotated[str, Field(description="Domain name to check (e.g., example.com)")]
) -> str:
"""Get detailed certificate information for a domain.
Shows expiry date, issuer, SANs, and file paths.
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, container_path = get_pem_paths(domain)
if not os.path.exists(host_path):
return f"Error: Certificate not found for {domain}"
try:
# Use openssl to get certificate info
result = subprocess.run(
["openssl", "x509", "-in", host_path, "-noout",
"-subject", "-issuer", "-dates", "-ext", "subjectAltName"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return f"Error reading certificate: {result.stderr}"
# Get file info
stat = os.stat(host_path)
modified = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
# Check HAProxy status
try:
haproxy_certs = haproxy_cmd("show ssl cert")
loaded = "Yes" if container_path in haproxy_certs else "No"
except Exception:
loaded = "Unknown"
info = [
f"Certificate: {domain}",
f"File: {host_path}",
f"Modified: {modified}",
f"Loaded in HAProxy: {loaded}",
"---",
result.stdout.strip()
]
return "\n".join(info)
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_issue_cert(
domain: Annotated[str, Field(description="Primary domain (e.g., example.com)")],
wildcard: Annotated[bool, Field(default=True, description="Include wildcard (*.example.com). Default: true")]
) -> str:
"""Issue a new SSL/TLS certificate using acme.sh with Cloudflare DNS.
Automatically deploys to HAProxy via Runtime API (zero-downtime).
Example: haproxy_issue_cert("example.com", wildcard=True)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Check if CF_Token is available
if not os.environ.get("CF_Token"):
secrets_file = os.path.expanduser("~/.secrets/cloudflare.ini")
if os.path.exists(secrets_file):
try:
with open(secrets_file) as f:
for line in f:
if "=" in line and "token" in line.lower():
token = line.split("=", 1)[1].strip().strip('"').strip("'")
os.environ["CF_Token"] = token
break
except Exception as e:
logger.warning("Failed to read Cloudflare token: %s", e)
if not os.environ.get("CF_Token"):
return "Error: CF_Token not set. Export CF_Token or add to ~/.secrets/cloudflare.ini"
# Check if certificate already exists
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
if os.path.exists(cert_dir):
return f"Error: Certificate for {domain} already exists. Use haproxy_renew_cert to renew."
# Build acme.sh command (without reload - we'll do it via Runtime API)
host_path, _ = get_pem_paths(domain)
# Create PEM after issuance
install_cmd = f"cat {ACME_HOME}/{domain}_ecc/fullchain.cer {ACME_HOME}/{domain}_ecc/{domain}.key > {host_path}"
cmd = [
ACME_SH, "--issue",
"--dns", "dns_cf",
"-d", domain
]
if wildcard:
cmd.extend(["-d", f"*.{domain}"])
cmd.extend(["--reloadcmd", install_cmd])
try:
logger.info("Issuing certificate for %s", domain)
result = subprocess.run(
cmd,
capture_output=True, text=True, timeout=CERT_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode != 0:
error_msg = result.stderr or result.stdout
return f"Error issuing certificate:\n{error_msg}"
# Load into HAProxy via Runtime API (zero-downtime)
if os.path.exists(host_path):
success, msg = load_cert_to_haproxy(domain)
if success:
# Save to config for persistence
add_cert_to_config(domain)
return f"Certificate issued and loaded for {domain} ({msg})"
else:
return f"Certificate issued but HAProxy loading failed: {msg}"
else:
return f"Certificate issued but PEM file not created. Check {host_path}"
except subprocess.TimeoutExpired:
return f"Error: Certificate issuance timed out after {CERT_TIMEOUT}s"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_renew_cert(
domain: Annotated[str, Field(description="Domain name to renew (e.g., example.com)")],
force: Annotated[bool, Field(default=False, description="Force renewal even if not due. Default: false")]
) -> str:
"""Renew an existing certificate.
Uses Runtime API for zero-downtime reload.
Example: haproxy_renew_cert("example.com", force=True)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
if not os.path.exists(cert_dir):
return f"Error: No certificate found for {domain}. Use haproxy_issue_cert first."
cmd = [ACME_SH, "--renew", "-d", domain]
if force:
cmd.append("--force")
try:
logger.info("Renewing certificate for %s", domain)
result = subprocess.run(
cmd,
capture_output=True, text=True, timeout=CERT_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
output = result.stdout + result.stderr
if "Skip" in output and "Not yet due" in output:
return f"Certificate for {domain} not due for renewal. Use force=True to force renewal."
if "Cert success" in output or result.returncode == 0:
# Reload into HAProxy via Runtime API
success, msg = load_cert_to_haproxy(domain)
if success:
# Ensure in config
add_cert_to_config(domain)
return f"Certificate renewed and reloaded for {domain} ({msg})"
else:
return f"Certificate renewed but HAProxy reload failed: {msg}"
else:
return f"Error renewing certificate:\n{output}"
except subprocess.TimeoutExpired:
return f"Error: Certificate renewal timed out after {CERT_TIMEOUT}s"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_renew_all_certs() -> str:
"""Renew all certificates that are due for renewal.
This runs the acme.sh cron job to check and renew all certificates.
"""
try:
logger.info("Running certificate renewal cron")
result = subprocess.run(
[ACME_SH, "--cron"],
capture_output=True, text=True, timeout=CERT_TIMEOUT * 3,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
output = result.stdout + result.stderr
# Count renewals
renewed = output.count("Cert success")
skipped = output.count("Skip")
# Reload any renewed certs into HAProxy
if renewed > 0:
domains = load_certs_config()
reloaded = 0
for domain in domains:
success, _ = load_cert_to_haproxy(domain)
if success:
reloaded += 1
return f"Renewed {renewed} certificate(s), reloaded {reloaded} into HAProxy"
elif skipped > 0:
return f"No certificates due for renewal ({skipped} checked)"
elif result.returncode != 0:
return f"Error running renewal:\n{output}"
else:
return "Renewal check completed"
except subprocess.TimeoutExpired:
return "Error: Renewal cron timed out"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_delete_cert(
domain: Annotated[str, Field(description="Domain name to delete certificate for")]
) -> str:
"""Delete a certificate from acme.sh and HAProxy.
WARNING: This permanently removes the certificate. The domain will lose HTTPS.
Example: haproxy_delete_cert("example.com")
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
host_path, _ = get_pem_paths(domain)
if not os.path.exists(cert_dir) and not os.path.exists(host_path):
return f"Error: No certificate found for {domain}"
errors = []
deleted = []
# Unload from HAProxy first (zero-downtime)
success, msg = unload_cert_from_haproxy(domain)
if success:
deleted.append(f"HAProxy ({msg})")
else:
errors.append(f"HAProxy unload: {msg}")
# Remove from acme.sh
if os.path.exists(cert_dir):
try:
result = subprocess.run(
[ACME_SH, "--remove", "-d", domain],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode == 0:
deleted.append("acme.sh")
else:
errors.append(f"acme.sh: {result.stderr}")
except Exception as e:
errors.append(f"acme.sh: {e}")
# Remove PEM file
if os.path.exists(host_path):
try:
os.remove(host_path)
deleted.append("PEM file")
except Exception as e:
errors.append(f"PEM file: {e}")
# Remove from config
remove_cert_from_config(domain)
result_parts = []
if deleted:
result_parts.append(f"Deleted: {', '.join(deleted)}")
if errors:
result_parts.append(f"Errors: {'; '.join(errors)}")
return "\n".join(result_parts) if result_parts else f"Certificate {domain} deleted"
@mcp.tool()
def haproxy_load_cert(
domain: Annotated[str, Field(description="Domain name to load certificate for")]
) -> str:
"""Load/reload a certificate into HAProxy (zero-downtime).
Use after manually updating a certificate file.
Example: haproxy_load_cert("example.com")
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, _ = get_pem_paths(domain)
if not os.path.exists(host_path):
return f"Error: PEM file not found: {host_path}"
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate {domain} loaded into HAProxy ({msg})"
else:
return f"Error loading certificate: {msg}"