Files
haproxy-mcp/haproxy_mcp/tools/certificates.py
kaffa dbacb86d60 feat: Add certificate management tools (6 new MCP tools)
New tools for SSL/TLS certificate management via acme.sh:
- haproxy_list_certs: List all certificates with expiry info
- haproxy_cert_info: Get detailed certificate info (expiry, issuer, SANs)
- haproxy_issue_cert: Issue new certificate via Cloudflare DNS validation
- haproxy_renew_cert: Renew specific certificate (with force option)
- haproxy_renew_all_certs: Renew all certificates due for renewal
- haproxy_delete_cert: Delete certificate from acme.sh and HAProxy

Features:
- Automatic PEM deployment to HAProxy certs directory
- HAProxy hot-reload after certificate changes (USR2 signal)
- Cloudflare DNS validation with CF_Token support
- Wildcard certificate support

Total MCP tools: 22 → 28

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 03:55:26 +00:00

333 lines
12 KiB
Python

"""Certificate management tools for HAProxy MCP Server."""
import os
import subprocess
from datetime import datetime
from typing import Annotated
from pydantic import Field
from ..config import logger, SUBPROCESS_TIMEOUT, HAPROXY_CONTAINER
from ..validation import validate_domain
# Certificate paths
ACME_SH = os.path.expanduser("~/.acme.sh/acme.sh")
ACME_HOME = os.path.expanduser("~/.acme.sh")
CERTS_DIR = "/opt/haproxy/certs"
# Longer timeout for certificate operations (ACME can be slow)
CERT_TIMEOUT = 120
def register_certificate_tools(mcp):
"""Register certificate management tools with MCP server."""
@mcp.tool()
def haproxy_list_certs() -> str:
"""List all SSL/TLS certificates with expiry information.
Returns:
List of certificates with domain, CA, created date, and renewal date
"""
try:
result = subprocess.run(
[ACME_SH, "--list"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode != 0:
return f"Error: {result.stderr}"
lines = result.stdout.strip().split("\n")
if len(lines) <= 1:
return "No certificates found"
# Parse and format output
# Format: Main_Domain KeyLength SAN_Domains Profile CA Created Renew
certs = []
for line in lines[1:]: # Skip header
parts = line.split()
if len(parts) >= 4:
domain = parts[0]
# Find CA and dates by looking for known patterns
ca = "unknown"
created = "unknown"
renew = "unknown"
for i, part in enumerate(parts):
if "Google" in part or "LetsEncrypt" in part or "ZeroSSL" in part:
ca = part
elif part.endswith("Z") and "T" in part:
if created == "unknown":
created = part
else:
renew = part
# Check if PEM exists in HAProxy certs dir
pem_path = os.path.join(CERTS_DIR, f"{domain}.pem")
deployed = "deployed" if os.path.exists(pem_path) else "not deployed"
certs.append(f"{domain} ({ca})\n Created: {created}\n Renew: {renew}\n Status: {deployed}")
return "\n\n".join(certs) if certs else "No certificates found"
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except FileNotFoundError:
return "Error: acme.sh not found"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_cert_info(
domain: Annotated[str, Field(description="Domain name to check (e.g., example.com)")]
) -> str:
"""Get detailed certificate information for a domain.
Shows expiry date, issuer, SANs, and file paths.
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
pem_path = os.path.join(CERTS_DIR, f"{domain}.pem")
if not os.path.exists(pem_path):
return f"Error: Certificate not found for {domain}"
try:
# Use openssl to get certificate info
result = subprocess.run(
["openssl", "x509", "-in", pem_path, "-noout",
"-subject", "-issuer", "-dates", "-ext", "subjectAltName"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return f"Error reading certificate: {result.stderr}"
# Get file info
stat = os.stat(pem_path)
modified = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
info = [
f"Certificate: {domain}",
f"File: {pem_path}",
f"Modified: {modified}",
"---",
result.stdout.strip()
]
return "\n".join(info)
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_issue_cert(
domain: Annotated[str, Field(description="Primary domain (e.g., example.com)")],
wildcard: Annotated[bool, Field(default=True, description="Include wildcard (*.example.com). Default: true")]
) -> str:
"""Issue a new SSL/TLS certificate using acme.sh with Cloudflare DNS.
Automatically deploys to HAProxy after issuance.
Example: haproxy_issue_cert("example.com", wildcard=True)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Check if CF_Token is available
if not os.environ.get("CF_Token"):
# Try to load from secrets file
secrets_file = os.path.expanduser("~/.secrets/cloudflare.ini")
if os.path.exists(secrets_file):
try:
with open(secrets_file) as f:
for line in f:
if "=" in line and "token" in line.lower():
token = line.split("=", 1)[1].strip().strip('"').strip("'")
os.environ["CF_Token"] = token
break
except Exception as e:
logger.warning("Failed to read Cloudflare token: %s", e)
if not os.environ.get("CF_Token"):
return "Error: CF_Token not set. Export CF_Token or add to ~/.secrets/cloudflare.ini"
# Check if certificate already exists
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
if os.path.exists(cert_dir):
return f"Error: Certificate for {domain} already exists. Use haproxy_renew_cert to renew."
# Build acme.sh command
pem_path = os.path.join(CERTS_DIR, f"{domain}.pem")
reload_cmd = f"cat {ACME_HOME}/{domain}_ecc/fullchain.cer {ACME_HOME}/{domain}_ecc/{domain}.key > {pem_path} && podman exec {HAPROXY_CONTAINER} kill -USR2 1"
cmd = [
ACME_SH, "--issue",
"--dns", "dns_cf",
"-d", domain
]
if wildcard:
cmd.extend(["-d", f"*.{domain}"])
cmd.extend(["--reloadcmd", reload_cmd])
try:
logger.info("Issuing certificate for %s", domain)
result = subprocess.run(
cmd,
capture_output=True, text=True, timeout=CERT_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode != 0:
error_msg = result.stderr or result.stdout
return f"Error issuing certificate:\n{error_msg}"
# Verify deployment
if os.path.exists(pem_path):
return f"Certificate issued and deployed for {domain}\nFile: {pem_path}"
else:
return f"Certificate issued but deployment may have failed. Check {pem_path}"
except subprocess.TimeoutExpired:
return f"Error: Certificate issuance timed out after {CERT_TIMEOUT}s"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_renew_cert(
domain: Annotated[str, Field(description="Domain name to renew (e.g., example.com)")],
force: Annotated[bool, Field(default=False, description="Force renewal even if not due. Default: false")]
) -> str:
"""Renew an existing certificate.
Example: haproxy_renew_cert("example.com", force=True)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
if not os.path.exists(cert_dir):
return f"Error: No certificate found for {domain}. Use haproxy_issue_cert first."
cmd = [ACME_SH, "--renew", "-d", domain]
if force:
cmd.append("--force")
try:
logger.info("Renewing certificate for %s", domain)
result = subprocess.run(
cmd,
capture_output=True, text=True, timeout=CERT_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
output = result.stdout + result.stderr
if "Cert success" in output or "Reload success" in output:
return f"Certificate renewed for {domain}"
elif "Skip" in output and "Not in renewal period" in output:
return f"Certificate for {domain} not due for renewal. Use force=True to force renewal."
elif result.returncode != 0:
return f"Error renewing certificate:\n{output}"
else:
return f"Renewal completed:\n{output}"
except subprocess.TimeoutExpired:
return f"Error: Certificate renewal timed out after {CERT_TIMEOUT}s"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_renew_all_certs() -> str:
"""Renew all certificates that are due for renewal.
This runs the acme.sh cron job to check and renew all certificates.
"""
try:
logger.info("Running certificate renewal cron")
result = subprocess.run(
[ACME_SH, "--cron"],
capture_output=True, text=True, timeout=CERT_TIMEOUT * 3, # Longer timeout for all certs
env={**os.environ, "HOME": os.path.expanduser("~")}
)
output = result.stdout + result.stderr
# Count renewals
renewed = output.count("Cert success")
skipped = output.count("Skip")
if renewed > 0:
return f"Renewed {renewed} certificate(s), skipped {skipped}"
elif skipped > 0:
return f"No certificates due for renewal ({skipped} checked)"
elif result.returncode != 0:
return f"Error running renewal:\n{output}"
else:
return "Renewal check completed"
except subprocess.TimeoutExpired:
return "Error: Renewal cron timed out"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_delete_cert(
domain: Annotated[str, Field(description="Domain name to delete certificate for")]
) -> str:
"""Delete a certificate from acme.sh and HAProxy.
WARNING: This permanently removes the certificate. The domain will lose HTTPS.
Example: haproxy_delete_cert("example.com")
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
pem_path = os.path.join(CERTS_DIR, f"{domain}.pem")
if not os.path.exists(cert_dir) and not os.path.exists(pem_path):
return f"Error: No certificate found for {domain}"
errors = []
deleted = []
# Remove from acme.sh
if os.path.exists(cert_dir):
try:
result = subprocess.run(
[ACME_SH, "--remove", "-d", domain],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode == 0:
deleted.append("acme.sh")
else:
errors.append(f"acme.sh removal: {result.stderr}")
except Exception as e:
errors.append(f"acme.sh removal: {e}")
# Remove PEM file
if os.path.exists(pem_path):
try:
os.remove(pem_path)
deleted.append("HAProxy PEM")
# Signal HAProxy to reload certs
subprocess.run(
["podman", "exec", HAPROXY_CONTAINER, "kill", "-USR2", "1"],
capture_output=True, timeout=SUBPROCESS_TIMEOUT
)
except Exception as e:
errors.append(f"PEM removal: {e}")
result_parts = []
if deleted:
result_parts.append(f"Deleted from: {', '.join(deleted)}")
if errors:
result_parts.append(f"Errors: {'; '.join(errors)}")
return "\n".join(result_parts) if result_parts else f"Certificate {domain} deleted"