Files
haproxy-mcp/haproxy_mcp/tools/certificates.py
kaffa 06ab47aca8 refactor: Extract large functions, improve exception handling, remove duplicates
## Large function extraction
- servers.py: Extract 8 _impl functions from register_server_tools (449 lines)
- certificates.py: Extract 7 _impl functions from register_certificate_tools (386 lines)
- MCP tool wrappers now delegate to module-level implementation functions

## Exception handling improvements
- Replace 11 broad `except Exception` with specific types
- health.py: (OSError, subprocess.SubprocessError)
- configuration.py: (HaproxyError, IOError, OSError, ValueError)
- servers.py: (IOError, OSError, ValueError)
- certificates.py: FileNotFoundError, (subprocess.SubprocessError, OSError)

## Duplicate code extraction
- Add parse_servers_state() to utils.py (replaces 4 duplicate parsers)
- Add disable_server_slot() to utils.py (replaces duplicate patterns)
- Update health.py, servers.py, domains.py to use new helpers

## Other improvements
- Add TypedDict types in file_ops.py and health.py
- Set file permissions (0o600) for sensitive files
- Update tests to use specific exception types

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 13:23:51 +09:00

572 lines
19 KiB
Python

"""Certificate management tools for HAProxy MCP Server."""
import os
import subprocess
from datetime import datetime
from typing import Annotated
from pydantic import Field
from ..config import (
logger,
SUBPROCESS_TIMEOUT,
CERTS_DIR,
CERTS_DIR_CONTAINER,
ACME_HOME,
)
from ..exceptions import HaproxyError
from ..validation import validate_domain
from ..haproxy_client import haproxy_cmd
from ..file_ops import (
load_certs_config,
add_cert_to_config,
remove_cert_from_config,
)
# acme.sh script path (derived from ACME_HOME)
ACME_SH = os.path.join(ACME_HOME, "acme.sh")
# Longer timeout for certificate operations (ACME can be slow)
CERT_TIMEOUT = 120
def get_pem_paths(domain: str) -> tuple[str, str]:
"""Get host and container PEM paths for a domain.
Args:
domain: Domain name
Returns:
Tuple of (host_path, container_path)
"""
return (
os.path.join(CERTS_DIR, f"{domain}.pem"),
os.path.join(CERTS_DIR_CONTAINER, f"{domain}.pem")
)
def load_cert_to_haproxy(domain: str) -> tuple[bool, str]:
"""Load a certificate into HAProxy via Runtime API (zero-downtime).
Args:
domain: Domain name
Returns:
Tuple of (success, message)
"""
host_path, container_path = get_pem_paths(domain)
if not os.path.exists(host_path):
return False, f"PEM file not found: {host_path}"
try:
# Read PEM content
with open(host_path, "r", encoding="utf-8") as f:
pem_content = f.read()
# Check if cert already loaded
result = haproxy_cmd("show ssl cert")
if container_path in result:
# Update existing cert
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "updated"
else:
# Add new cert
haproxy_cmd(f"new ssl cert {container_path}")
haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n")
haproxy_cmd(f"commit ssl cert {container_path}")
return True, "added"
except HaproxyError as e:
logger.error("HAProxy error loading certificate %s: %s", domain, e)
return False, str(e)
except (IOError, OSError) as e:
logger.error("File error loading certificate %s: %s", domain, e)
return False, str(e)
def unload_cert_from_haproxy(domain: str) -> tuple[bool, str]:
"""Unload a certificate from HAProxy via Runtime API.
Args:
domain: Domain name
Returns:
Tuple of (success, message)
"""
_, container_path = get_pem_paths(domain)
try:
# Check if cert is loaded
result = haproxy_cmd("show ssl cert")
if container_path not in result:
return True, "not loaded"
# Delete from HAProxy runtime
haproxy_cmd(f"del ssl cert {container_path}")
return True, "unloaded"
except HaproxyError as e:
logger.error("HAProxy error unloading certificate %s: %s", domain, e)
return False, str(e)
def restore_certificates() -> int:
"""Restore all certificates from config to HAProxy on startup.
Returns:
Number of certificates restored
"""
domains = load_certs_config()
restored = 0
for domain in domains:
success, msg = load_cert_to_haproxy(domain)
if success:
restored += 1
logger.debug("Certificate %s: %s", domain, msg)
else:
logger.warning("Failed to restore certificate %s: %s", domain, msg)
return restored
# =============================================================================
# Implementation functions (module-level)
# =============================================================================
def _haproxy_list_certs_impl() -> str:
"""Implementation of haproxy_list_certs."""
try:
result = subprocess.run(
[ACME_SH, "--list"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode != 0:
return f"Error: {result.stderr}"
lines = result.stdout.strip().split("\n")
if len(lines) <= 1:
return "No certificates found"
# Get HAProxy loaded certs
try:
haproxy_certs = haproxy_cmd("show ssl cert")
except HaproxyError as e:
logger.debug("Could not get HAProxy certs: %s", e)
haproxy_certs = ""
# Parse and format output
certs = []
for line in lines[1:]: # Skip header
parts = line.split()
if len(parts) >= 4:
domain = parts[0]
ca = "unknown"
created = "unknown"
renew = "unknown"
for part in parts:
if "Google" in part or "LetsEncrypt" in part or "ZeroSSL" in part:
ca = part
elif part.endswith("Z") and "T" in part:
if created == "unknown":
created = part
else:
renew = part
# Check deployment status
host_path, container_path = get_pem_paths(domain)
if container_path in haproxy_certs:
status = "loaded"
elif os.path.exists(host_path):
status = "file exists (not loaded)"
else:
status = "not deployed"
certs.append(f"{domain} ({ca})\n Created: {created}\n Renew: {renew}\n Status: {status}")
return "\n\n".join(certs) if certs else "No certificates found"
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except FileNotFoundError:
return "Error: acme.sh not found"
except subprocess.SubprocessError as e:
logger.error("Subprocess error listing certificates: %s", e)
return f"Error: {e}"
except OSError as e:
logger.error("OS error listing certificates: %s", e)
return f"Error: {e}"
def _haproxy_cert_info_impl(domain: str) -> str:
"""Implementation of haproxy_cert_info."""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, container_path = get_pem_paths(domain)
if not os.path.exists(host_path):
return f"Error: Certificate not found for {domain}"
try:
# Use openssl to get certificate info
result = subprocess.run(
["openssl", "x509", "-in", host_path, "-noout",
"-subject", "-issuer", "-dates", "-ext", "subjectAltName"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return f"Error reading certificate: {result.stderr}"
# Get file info
stat = os.stat(host_path)
modified = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
# Check HAProxy status
try:
haproxy_certs = haproxy_cmd("show ssl cert")
loaded = "Yes" if container_path in haproxy_certs else "No"
except HaproxyError as e:
logger.debug("Could not check HAProxy cert status: %s", e)
loaded = "Unknown"
info = [
f"Certificate: {domain}",
f"File: {host_path}",
f"Modified: {modified}",
f"Loaded in HAProxy: {loaded}",
"---",
result.stdout.strip()
]
return "\n".join(info)
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except (subprocess.SubprocessError, OSError) as e:
logger.error("Error getting certificate info for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_issue_cert_impl(domain: str, wildcard: bool) -> str:
"""Implementation of haproxy_issue_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Check if CF_Token is available
if not os.environ.get("CF_Token"):
secrets_file = os.path.expanduser("~/.secrets/cloudflare.ini")
if os.path.exists(secrets_file):
try:
with open(secrets_file) as f:
for line in f:
if "=" in line and "token" in line.lower():
token = line.split("=", 1)[1].strip().strip('"').strip("'")
os.environ["CF_Token"] = token
break
except (IOError, OSError) as e:
logger.warning("Failed to read Cloudflare token: %s", e)
if not os.environ.get("CF_Token"):
return "Error: CF_Token not set. Export CF_Token or add to ~/.secrets/cloudflare.ini"
# Check if certificate already exists
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
if os.path.exists(cert_dir):
return f"Error: Certificate for {domain} already exists. Use haproxy_renew_cert to renew."
# Build acme.sh command (without reload - we'll do it via Runtime API)
host_path, _ = get_pem_paths(domain)
# Create PEM after issuance
install_cmd = f"cat {ACME_HOME}/{domain}_ecc/fullchain.cer {ACME_HOME}/{domain}_ecc/{domain}.key > {host_path}"
cmd = [
ACME_SH, "--issue",
"--dns", "dns_cf",
"-d", domain
]
if wildcard:
cmd.extend(["-d", f"*.{domain}"])
cmd.extend(["--reloadcmd", install_cmd])
try:
logger.info("Issuing certificate for %s", domain)
result = subprocess.run(
cmd,
capture_output=True, text=True, timeout=CERT_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode != 0:
error_msg = result.stderr or result.stdout
return f"Error issuing certificate:\n{error_msg}"
# Load into HAProxy via Runtime API (zero-downtime)
if os.path.exists(host_path):
success, msg = load_cert_to_haproxy(domain)
if success:
# Save to config for persistence
add_cert_to_config(domain)
return f"Certificate issued and loaded for {domain} ({msg})"
else:
return f"Certificate issued but HAProxy loading failed: {msg}"
else:
return f"Certificate issued but PEM file not created. Check {host_path}"
except subprocess.TimeoutExpired:
return f"Error: Certificate issuance timed out after {CERT_TIMEOUT}s"
except (subprocess.SubprocessError, OSError) as e:
logger.error("Error issuing certificate for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_renew_cert_impl(domain: str, force: bool) -> str:
"""Implementation of haproxy_renew_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
if not os.path.exists(cert_dir):
return f"Error: No certificate found for {domain}. Use haproxy_issue_cert first."
cmd = [ACME_SH, "--renew", "-d", domain]
if force:
cmd.append("--force")
try:
logger.info("Renewing certificate for %s", domain)
result = subprocess.run(
cmd,
capture_output=True, text=True, timeout=CERT_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
output = result.stdout + result.stderr
if "Skip" in output and "Not yet due" in output:
return f"Certificate for {domain} not due for renewal. Use force=True to force renewal."
if "Cert success" in output or result.returncode == 0:
# Reload into HAProxy via Runtime API
success, msg = load_cert_to_haproxy(domain)
if success:
# Ensure in config
add_cert_to_config(domain)
return f"Certificate renewed and reloaded for {domain} ({msg})"
else:
return f"Certificate renewed but HAProxy reload failed: {msg}"
else:
return f"Error renewing certificate:\n{output}"
except subprocess.TimeoutExpired:
return f"Error: Certificate renewal timed out after {CERT_TIMEOUT}s"
except FileNotFoundError:
return "Error: acme.sh not found"
except (subprocess.SubprocessError, OSError) as e:
logger.error("Error renewing certificate for %s: %s", domain, e)
return f"Error: {e}"
def _haproxy_renew_all_certs_impl() -> str:
"""Implementation of haproxy_renew_all_certs."""
try:
logger.info("Running certificate renewal cron")
result = subprocess.run(
[ACME_SH, "--cron"],
capture_output=True, text=True, timeout=CERT_TIMEOUT * 3,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
output = result.stdout + result.stderr
# Count renewals
renewed = output.count("Cert success")
skipped = output.count("Skip")
# Reload any renewed certs into HAProxy
if renewed > 0:
domains = load_certs_config()
reloaded = 0
for domain in domains:
success, _ = load_cert_to_haproxy(domain)
if success:
reloaded += 1
return f"Renewed {renewed} certificate(s), reloaded {reloaded} into HAProxy"
elif skipped > 0:
return f"No certificates due for renewal ({skipped} checked)"
elif result.returncode != 0:
return f"Error running renewal:\n{output}"
else:
return "Renewal check completed"
except subprocess.TimeoutExpired:
return "Error: Renewal cron timed out"
except FileNotFoundError:
return "Error: acme.sh not found"
except (subprocess.SubprocessError, OSError) as e:
logger.error("Error running certificate renewal cron: %s", e)
return f"Error: {e}"
def _haproxy_delete_cert_impl(domain: str) -> str:
"""Implementation of haproxy_delete_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc")
host_path, _ = get_pem_paths(domain)
if not os.path.exists(cert_dir) and not os.path.exists(host_path):
return f"Error: No certificate found for {domain}"
errors = []
deleted = []
# Unload from HAProxy first (zero-downtime)
success, msg = unload_cert_from_haproxy(domain)
if success:
deleted.append(f"HAProxy ({msg})")
else:
errors.append(f"HAProxy unload: {msg}")
# Remove from acme.sh
if os.path.exists(cert_dir):
try:
result = subprocess.run(
[ACME_SH, "--remove", "-d", domain],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT,
env={**os.environ, "HOME": os.path.expanduser("~")}
)
if result.returncode == 0:
deleted.append("acme.sh")
else:
errors.append(f"acme.sh: {result.stderr}")
except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError) as e:
errors.append(f"acme.sh: {e}")
# Remove PEM file
if os.path.exists(host_path):
try:
os.remove(host_path)
deleted.append("PEM file")
except OSError as e:
errors.append(f"PEM file: {e}")
# Remove from config
remove_cert_from_config(domain)
result_parts = []
if deleted:
result_parts.append(f"Deleted: {', '.join(deleted)}")
if errors:
result_parts.append(f"Errors: {'; '.join(errors)}")
return "\n".join(result_parts) if result_parts else f"Certificate {domain} deleted"
def _haproxy_load_cert_impl(domain: str) -> str:
"""Implementation of haproxy_load_cert."""
if not validate_domain(domain):
return "Error: Invalid domain format"
host_path, _ = get_pem_paths(domain)
if not os.path.exists(host_path):
return f"Error: PEM file not found: {host_path}"
success, msg = load_cert_to_haproxy(domain)
if success:
add_cert_to_config(domain)
return f"Certificate {domain} loaded into HAProxy ({msg})"
else:
return f"Error loading certificate: {msg}"
# =============================================================================
# MCP Tool Registration
# =============================================================================
def register_certificate_tools(mcp):
"""Register certificate management tools with MCP server."""
@mcp.tool()
def haproxy_list_certs() -> str:
"""List all SSL/TLS certificates with expiry information.
Returns:
List of certificates with domain, CA, created date, and renewal date
"""
return _haproxy_list_certs_impl()
@mcp.tool()
def haproxy_cert_info(
domain: Annotated[str, Field(description="Domain name to check (e.g., example.com)")]
) -> str:
"""Get detailed certificate information for a domain.
Shows expiry date, issuer, SANs, and file paths.
"""
return _haproxy_cert_info_impl(domain)
@mcp.tool()
def haproxy_issue_cert(
domain: Annotated[str, Field(description="Primary domain (e.g., example.com)")],
wildcard: Annotated[bool, Field(default=True, description="Include wildcard (*.example.com). Default: true")]
) -> str:
"""Issue a new SSL/TLS certificate using acme.sh with Cloudflare DNS.
Automatically deploys to HAProxy via Runtime API (zero-downtime).
Example: haproxy_issue_cert("example.com", wildcard=True)
"""
return _haproxy_issue_cert_impl(domain, wildcard)
@mcp.tool()
def haproxy_renew_cert(
domain: Annotated[str, Field(description="Domain name to renew (e.g., example.com)")],
force: Annotated[bool, Field(default=False, description="Force renewal even if not due. Default: false")]
) -> str:
"""Renew an existing certificate.
Uses Runtime API for zero-downtime reload.
Example: haproxy_renew_cert("example.com", force=True)
"""
return _haproxy_renew_cert_impl(domain, force)
@mcp.tool()
def haproxy_renew_all_certs() -> str:
"""Renew all certificates that are due for renewal.
This runs the acme.sh cron job to check and renew all certificates.
"""
return _haproxy_renew_all_certs_impl()
@mcp.tool()
def haproxy_delete_cert(
domain: Annotated[str, Field(description="Domain name to delete certificate for")]
) -> str:
"""Delete a certificate from acme.sh and HAProxy.
WARNING: This permanently removes the certificate. The domain will lose HTTPS.
Example: haproxy_delete_cert("example.com")
"""
return _haproxy_delete_cert_impl(domain)
@mcp.tool()
def haproxy_load_cert(
domain: Annotated[str, Field(description="Domain name to load certificate for")]
) -> str:
"""Load/reload a certificate into HAProxy (zero-downtime).
Use after manually updating a certificate file.
Example: haproxy_load_cert("example.com")
"""
return _haproxy_load_cert_impl(domain)