"""Certificate management tools for HAProxy MCP Server.""" import os import subprocess from datetime import datetime from typing import Annotated from pydantic import Field from ..config import ( logger, SUBPROCESS_TIMEOUT, CERTS_DIR, CERTS_DIR_CONTAINER, ACME_HOME, ) from ..exceptions import HaproxyError from ..validation import validate_domain from ..haproxy_client import haproxy_cmd from ..file_ops import ( load_certs_config, add_cert_to_config, remove_cert_from_config, ) # acme.sh script path (derived from ACME_HOME) ACME_SH = os.path.join(ACME_HOME, "acme.sh") # Longer timeout for certificate operations (ACME can be slow) CERT_TIMEOUT = 120 def get_pem_paths(domain: str) -> tuple[str, str]: """Get host and container PEM paths for a domain. Args: domain: Domain name Returns: Tuple of (host_path, container_path) """ return ( os.path.join(CERTS_DIR, f"{domain}.pem"), os.path.join(CERTS_DIR_CONTAINER, f"{domain}.pem") ) def load_cert_to_haproxy(domain: str) -> tuple[bool, str]: """Load a certificate into HAProxy via Runtime API (zero-downtime). Args: domain: Domain name Returns: Tuple of (success, message) """ host_path, container_path = get_pem_paths(domain) if not os.path.exists(host_path): return False, f"PEM file not found: {host_path}" try: # Read PEM content with open(host_path, "r", encoding="utf-8") as f: pem_content = f.read() # Check if cert already loaded result = haproxy_cmd("show ssl cert") if container_path in result: # Update existing cert haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n") haproxy_cmd(f"commit ssl cert {container_path}") return True, "updated" else: # Add new cert haproxy_cmd(f"new ssl cert {container_path}") haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n") haproxy_cmd(f"commit ssl cert {container_path}") return True, "added" except HaproxyError as e: logger.error("HAProxy error loading certificate %s: %s", domain, e) return False, str(e) except (IOError, OSError) as e: logger.error("File error loading certificate %s: %s", domain, e) return False, str(e) def unload_cert_from_haproxy(domain: str) -> tuple[bool, str]: """Unload a certificate from HAProxy via Runtime API. Args: domain: Domain name Returns: Tuple of (success, message) """ _, container_path = get_pem_paths(domain) try: # Check if cert is loaded result = haproxy_cmd("show ssl cert") if container_path not in result: return True, "not loaded" # Delete from HAProxy runtime haproxy_cmd(f"del ssl cert {container_path}") return True, "unloaded" except HaproxyError as e: logger.error("HAProxy error unloading certificate %s: %s", domain, e) return False, str(e) def restore_certificates() -> int: """Restore all certificates from config to HAProxy on startup. Returns: Number of certificates restored """ domains = load_certs_config() restored = 0 for domain in domains: success, msg = load_cert_to_haproxy(domain) if success: restored += 1 logger.debug("Certificate %s: %s", domain, msg) else: logger.warning("Failed to restore certificate %s: %s", domain, msg) return restored # ============================================================================= # Implementation functions (module-level) # ============================================================================= def _haproxy_list_certs_impl() -> str: """Implementation of haproxy_list_certs.""" try: result = subprocess.run( [ACME_SH, "--list"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT, env={**os.environ, "HOME": os.path.expanduser("~")} ) if result.returncode != 0: return f"Error: {result.stderr}" lines = result.stdout.strip().split("\n") if len(lines) <= 1: return "No certificates found" # Get HAProxy loaded certs try: haproxy_certs = haproxy_cmd("show ssl cert") except HaproxyError as e: logger.debug("Could not get HAProxy certs: %s", e) haproxy_certs = "" # Parse and format output certs = [] for line in lines[1:]: # Skip header parts = line.split() if len(parts) >= 4: domain = parts[0] ca = "unknown" created = "unknown" renew = "unknown" for part in parts: if "Google" in part or "LetsEncrypt" in part or "ZeroSSL" in part: ca = part elif part.endswith("Z") and "T" in part: if created == "unknown": created = part else: renew = part # Check deployment status host_path, container_path = get_pem_paths(domain) if container_path in haproxy_certs: status = "loaded" elif os.path.exists(host_path): status = "file exists (not loaded)" else: status = "not deployed" certs.append(f"• {domain} ({ca})\n Created: {created}\n Renew: {renew}\n Status: {status}") return "\n\n".join(certs) if certs else "No certificates found" except subprocess.TimeoutExpired: return "Error: Command timed out" except FileNotFoundError: return "Error: acme.sh not found" except subprocess.SubprocessError as e: logger.error("Subprocess error listing certificates: %s", e) return f"Error: {e}" except OSError as e: logger.error("OS error listing certificates: %s", e) return f"Error: {e}" def _haproxy_cert_info_impl(domain: str) -> str: """Implementation of haproxy_cert_info.""" if not validate_domain(domain): return "Error: Invalid domain format" host_path, container_path = get_pem_paths(domain) if not os.path.exists(host_path): return f"Error: Certificate not found for {domain}" try: # Use openssl to get certificate info result = subprocess.run( ["openssl", "x509", "-in", host_path, "-noout", "-subject", "-issuer", "-dates", "-ext", "subjectAltName"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if result.returncode != 0: return f"Error reading certificate: {result.stderr}" # Get file info stat = os.stat(host_path) modified = datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S") # Check HAProxy status try: haproxy_certs = haproxy_cmd("show ssl cert") loaded = "Yes" if container_path in haproxy_certs else "No" except HaproxyError as e: logger.debug("Could not check HAProxy cert status: %s", e) loaded = "Unknown" info = [ f"Certificate: {domain}", f"File: {host_path}", f"Modified: {modified}", f"Loaded in HAProxy: {loaded}", "---", result.stdout.strip() ] return "\n".join(info) except subprocess.TimeoutExpired: return "Error: Command timed out" except (subprocess.SubprocessError, OSError) as e: logger.error("Error getting certificate info for %s: %s", domain, e) return f"Error: {e}" def _haproxy_issue_cert_impl(domain: str, wildcard: bool) -> str: """Implementation of haproxy_issue_cert.""" if not validate_domain(domain): return "Error: Invalid domain format" # Check if CF_Token is available if not os.environ.get("CF_Token"): secrets_file = os.path.expanduser("~/.secrets/cloudflare.ini") if os.path.exists(secrets_file): try: with open(secrets_file) as f: for line in f: if "=" in line and "token" in line.lower(): token = line.split("=", 1)[1].strip().strip('"').strip("'") os.environ["CF_Token"] = token break except (IOError, OSError) as e: logger.warning("Failed to read Cloudflare token: %s", e) if not os.environ.get("CF_Token"): return "Error: CF_Token not set. Export CF_Token or add to ~/.secrets/cloudflare.ini" # Check if certificate already exists cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc") if os.path.exists(cert_dir): return f"Error: Certificate for {domain} already exists. Use haproxy_renew_cert to renew." # Build acme.sh command (without reload - we'll do it via Runtime API) host_path, _ = get_pem_paths(domain) # Create PEM after issuance install_cmd = f"cat {ACME_HOME}/{domain}_ecc/fullchain.cer {ACME_HOME}/{domain}_ecc/{domain}.key > {host_path}" cmd = [ ACME_SH, "--issue", "--dns", "dns_cf", "-d", domain ] if wildcard: cmd.extend(["-d", f"*.{domain}"]) cmd.extend(["--reloadcmd", install_cmd]) try: logger.info("Issuing certificate for %s", domain) result = subprocess.run( cmd, capture_output=True, text=True, timeout=CERT_TIMEOUT, env={**os.environ, "HOME": os.path.expanduser("~")} ) if result.returncode != 0: error_msg = result.stderr or result.stdout return f"Error issuing certificate:\n{error_msg}" # Load into HAProxy via Runtime API (zero-downtime) if os.path.exists(host_path): success, msg = load_cert_to_haproxy(domain) if success: # Save to config for persistence add_cert_to_config(domain) return f"Certificate issued and loaded for {domain} ({msg})" else: return f"Certificate issued but HAProxy loading failed: {msg}" else: return f"Certificate issued but PEM file not created. Check {host_path}" except subprocess.TimeoutExpired: return f"Error: Certificate issuance timed out after {CERT_TIMEOUT}s" except (subprocess.SubprocessError, OSError) as e: logger.error("Error issuing certificate for %s: %s", domain, e) return f"Error: {e}" def _haproxy_renew_cert_impl(domain: str, force: bool) -> str: """Implementation of haproxy_renew_cert.""" if not validate_domain(domain): return "Error: Invalid domain format" cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc") if not os.path.exists(cert_dir): return f"Error: No certificate found for {domain}. Use haproxy_issue_cert first." cmd = [ACME_SH, "--renew", "-d", domain] if force: cmd.append("--force") try: logger.info("Renewing certificate for %s", domain) result = subprocess.run( cmd, capture_output=True, text=True, timeout=CERT_TIMEOUT, env={**os.environ, "HOME": os.path.expanduser("~")} ) output = result.stdout + result.stderr if "Skip" in output and "Not yet due" in output: return f"Certificate for {domain} not due for renewal. Use force=True to force renewal." if "Cert success" in output or result.returncode == 0: # Reload into HAProxy via Runtime API success, msg = load_cert_to_haproxy(domain) if success: # Ensure in config add_cert_to_config(domain) return f"Certificate renewed and reloaded for {domain} ({msg})" else: return f"Certificate renewed but HAProxy reload failed: {msg}" else: return f"Error renewing certificate:\n{output}" except subprocess.TimeoutExpired: return f"Error: Certificate renewal timed out after {CERT_TIMEOUT}s" except FileNotFoundError: return "Error: acme.sh not found" except (subprocess.SubprocessError, OSError) as e: logger.error("Error renewing certificate for %s: %s", domain, e) return f"Error: {e}" def _haproxy_renew_all_certs_impl() -> str: """Implementation of haproxy_renew_all_certs.""" try: logger.info("Running certificate renewal cron") result = subprocess.run( [ACME_SH, "--cron"], capture_output=True, text=True, timeout=CERT_TIMEOUT * 3, env={**os.environ, "HOME": os.path.expanduser("~")} ) output = result.stdout + result.stderr # Count renewals renewed = output.count("Cert success") skipped = output.count("Skip") # Reload any renewed certs into HAProxy if renewed > 0: domains = load_certs_config() reloaded = 0 for domain in domains: success, _ = load_cert_to_haproxy(domain) if success: reloaded += 1 return f"Renewed {renewed} certificate(s), reloaded {reloaded} into HAProxy" elif skipped > 0: return f"No certificates due for renewal ({skipped} checked)" elif result.returncode != 0: return f"Error running renewal:\n{output}" else: return "Renewal check completed" except subprocess.TimeoutExpired: return "Error: Renewal cron timed out" except FileNotFoundError: return "Error: acme.sh not found" except (subprocess.SubprocessError, OSError) as e: logger.error("Error running certificate renewal cron: %s", e) return f"Error: {e}" def _haproxy_delete_cert_impl(domain: str) -> str: """Implementation of haproxy_delete_cert.""" if not validate_domain(domain): return "Error: Invalid domain format" cert_dir = os.path.join(ACME_HOME, f"{domain}_ecc") host_path, _ = get_pem_paths(domain) if not os.path.exists(cert_dir) and not os.path.exists(host_path): return f"Error: No certificate found for {domain}" errors = [] deleted = [] # Unload from HAProxy first (zero-downtime) success, msg = unload_cert_from_haproxy(domain) if success: deleted.append(f"HAProxy ({msg})") else: errors.append(f"HAProxy unload: {msg}") # Remove from acme.sh if os.path.exists(cert_dir): try: result = subprocess.run( [ACME_SH, "--remove", "-d", domain], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT, env={**os.environ, "HOME": os.path.expanduser("~")} ) if result.returncode == 0: deleted.append("acme.sh") else: errors.append(f"acme.sh: {result.stderr}") except (subprocess.TimeoutExpired, subprocess.SubprocessError, OSError) as e: errors.append(f"acme.sh: {e}") # Remove PEM file if os.path.exists(host_path): try: os.remove(host_path) deleted.append("PEM file") except OSError as e: errors.append(f"PEM file: {e}") # Remove from config remove_cert_from_config(domain) result_parts = [] if deleted: result_parts.append(f"Deleted: {', '.join(deleted)}") if errors: result_parts.append(f"Errors: {'; '.join(errors)}") return "\n".join(result_parts) if result_parts else f"Certificate {domain} deleted" def _haproxy_load_cert_impl(domain: str) -> str: """Implementation of haproxy_load_cert.""" if not validate_domain(domain): return "Error: Invalid domain format" host_path, _ = get_pem_paths(domain) if not os.path.exists(host_path): return f"Error: PEM file not found: {host_path}" success, msg = load_cert_to_haproxy(domain) if success: add_cert_to_config(domain) return f"Certificate {domain} loaded into HAProxy ({msg})" else: return f"Error loading certificate: {msg}" # ============================================================================= # MCP Tool Registration # ============================================================================= def register_certificate_tools(mcp): """Register certificate management tools with MCP server.""" @mcp.tool() def haproxy_list_certs() -> str: """List all SSL/TLS certificates with expiry information. Returns: List of certificates with domain, CA, created date, and renewal date """ return _haproxy_list_certs_impl() @mcp.tool() def haproxy_cert_info( domain: Annotated[str, Field(description="Domain name to check (e.g., example.com)")] ) -> str: """Get detailed certificate information for a domain. Shows expiry date, issuer, SANs, and file paths. """ return _haproxy_cert_info_impl(domain) @mcp.tool() def haproxy_issue_cert( domain: Annotated[str, Field(description="Primary domain (e.g., example.com)")], wildcard: Annotated[bool, Field(default=True, description="Include wildcard (*.example.com). Default: true")] ) -> str: """Issue a new SSL/TLS certificate using acme.sh with Cloudflare DNS. Automatically deploys to HAProxy via Runtime API (zero-downtime). Example: haproxy_issue_cert("example.com", wildcard=True) """ return _haproxy_issue_cert_impl(domain, wildcard) @mcp.tool() def haproxy_renew_cert( domain: Annotated[str, Field(description="Domain name to renew (e.g., example.com)")], force: Annotated[bool, Field(default=False, description="Force renewal even if not due. Default: false")] ) -> str: """Renew an existing certificate. Uses Runtime API for zero-downtime reload. Example: haproxy_renew_cert("example.com", force=True) """ return _haproxy_renew_cert_impl(domain, force) @mcp.tool() def haproxy_renew_all_certs() -> str: """Renew all certificates that are due for renewal. This runs the acme.sh cron job to check and renew all certificates. """ return _haproxy_renew_all_certs_impl() @mcp.tool() def haproxy_delete_cert( domain: Annotated[str, Field(description="Domain name to delete certificate for")] ) -> str: """Delete a certificate from acme.sh and HAProxy. WARNING: This permanently removes the certificate. The domain will lose HTTPS. Example: haproxy_delete_cert("example.com") """ return _haproxy_delete_cert_impl(domain) @mcp.tool() def haproxy_load_cert( domain: Annotated[str, Field(description="Domain name to load certificate for")] ) -> str: """Load/reload a certificate into HAProxy (zero-downtime). Use after manually updating a certificate file. Example: haproxy_load_cert("example.com") """ return _haproxy_load_cert_impl(domain)