"""Certificate management tools for HAProxy MCP Server.""" import os from datetime import datetime from typing import Annotated from pydantic import Field from ..config import ( logger, SUBPROCESS_TIMEOUT, CERTS_DIR, CERTS_DIR_CONTAINER, ACME_HOME, HAPROXY_PORT, REMOTE_MODE, ) from ..exceptions import HaproxyError from ..validation import validate_domain from ..haproxy_client import haproxy_cmd from ..file_ops import ( load_certs_config, add_cert_to_config, remove_cert_from_config, _read_file, ) from ..ssh_ops import run_command, remote_file_exists # acme.sh script path (derived from ACME_HOME) ACME_SH = f"{ACME_HOME}/acme.sh" # Longer timeout for certificate operations (ACME can be slow) CERT_TIMEOUT = 120 def _file_exists(path: str) -> bool: """Check file existence locally or remotely.""" if REMOTE_MODE: return remote_file_exists(path) return os.path.exists(path) def get_pem_paths(domain: str) -> tuple[str, str]: """Get host and container PEM paths for a domain.""" return ( f"{CERTS_DIR}/{domain}.pem", f"{CERTS_DIR_CONTAINER}/{domain}.pem", ) def load_cert_to_haproxy(domain: str) -> tuple[bool, str]: """Load a certificate into HAProxy via Runtime API (zero-downtime).""" host_path, container_path = get_pem_paths(domain) if not _file_exists(host_path): return False, f"PEM file not found: {host_path}" try: pem_content = _read_file(host_path) result = haproxy_cmd("show ssl cert") if container_path in result: haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n") haproxy_cmd(f"commit ssl cert {container_path}") return True, "updated" else: haproxy_cmd(f"new ssl cert {container_path}") haproxy_cmd(f"set ssl cert {container_path} <<\n{pem_content}\n") haproxy_cmd(f"commit ssl cert {container_path}") return True, "added" except HaproxyError as e: logger.error("HAProxy error loading certificate %s: %s", domain, e) return False, str(e) except (IOError, OSError) as e: logger.error("File error loading certificate %s: %s", domain, e) return False, str(e) def unload_cert_from_haproxy(domain: str) -> tuple[bool, str]: """Unload a certificate from HAProxy via Runtime API.""" _, container_path = get_pem_paths(domain) try: result = haproxy_cmd("show ssl cert") if container_path not in result: return True, "not loaded" haproxy_cmd(f"del ssl cert {container_path}") return True, "unloaded" except HaproxyError as e: logger.error("HAProxy error unloading certificate %s: %s", domain, e) return False, str(e) def restore_certificates() -> int: """Restore all certificates from config to HAProxy on startup.""" domains = load_certs_config() restored = 0 for domain in domains: success, msg = load_cert_to_haproxy(domain) if success: restored += 1 logger.debug("Certificate %s: %s", domain, msg) else: logger.warning("Failed to restore certificate %s: %s", domain, msg) return restored def _haproxy_list_certs_impl() -> str: """Implementation of haproxy_list_certs.""" try: result = run_command([ACME_SH, "--list"], timeout=SUBPROCESS_TIMEOUT) if result.returncode != 0: return f"Error: {result.stderr}" lines = result.stdout.strip().split("\n") if len(lines) <= 1: return "No certificates found" try: haproxy_certs = haproxy_cmd("show ssl cert") except HaproxyError as e: logger.debug("Could not get HAProxy certs: %s", e) haproxy_certs = "" certs = [] for line in lines[1:]: parts = line.split() if len(parts) >= 4: domain = parts[0] ca = "unknown" created = "unknown" renew = "unknown" for part in parts: if "Google" in part or "LetsEncrypt" in part or "ZeroSSL" in part: ca = part elif part.endswith("Z") and "T" in part: if created == "unknown": created = part else: renew = part host_path, container_path = get_pem_paths(domain) if container_path in haproxy_certs: status = "loaded" elif _file_exists(host_path): status = "file exists (not loaded)" else: status = "not deployed" certs.append(f"• {domain} ({ca})\n Created: {created}\n Renew: {renew}\n Status: {status}") return "\n\n".join(certs) if certs else "No certificates found" except TimeoutError: return "Error: Command timed out" except FileNotFoundError: return "Error: acme.sh not found" except OSError as e: logger.error("Error listing certificates: %s", e) return f"Error: {e}" def _haproxy_cert_info_impl(domain: str) -> str: """Implementation of haproxy_cert_info.""" if not validate_domain(domain): return "Error: Invalid domain format" host_path, container_path = get_pem_paths(domain) if not _file_exists(host_path): return f"Error: Certificate not found for {domain}" try: result = run_command( ["openssl", "x509", "-in", host_path, "-noout", "-subject", "-issuer", "-dates", "-ext", "subjectAltName"], timeout=SUBPROCESS_TIMEOUT, ) if result.returncode != 0: return f"Error reading certificate: {result.stderr}" # Get file modification time stat_result = run_command(["stat", "-c", "%Y", host_path]) if stat_result.returncode == 0: ts = int(stat_result.stdout.strip()) modified = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") else: modified = "unknown" try: haproxy_certs = haproxy_cmd("show ssl cert") loaded = "Yes" if container_path in haproxy_certs else "No" except HaproxyError as e: logger.debug("Could not check HAProxy cert status: %s", e) loaded = "Unknown" info = [ f"Certificate: {domain}", f"File: {host_path}", f"Modified: {modified}", f"Loaded in HAProxy: {loaded}", "---", result.stdout.strip() ] return "\n".join(info) except TimeoutError: return "Error: Command timed out" except OSError as e: logger.error("Error getting certificate info for %s: %s", domain, e) return f"Error: {e}" def _haproxy_issue_cert_impl(domain: str, wildcard: bool) -> str: """Implementation of haproxy_issue_cert.""" if not validate_domain(domain): return "Error: Invalid domain format" cert_dir = f"{ACME_HOME}/{domain}_ecc" if _file_exists(cert_dir): return f"Error: Certificate for {domain} already exists. Use haproxy_renew_cert to renew." host_path, container_path = get_pem_paths(domain) cert_dir = f"{ACME_HOME}/{domain}_ecc" reload_cmd = ( f"cat {cert_dir}/fullchain.cer {cert_dir}/{domain}.key > {host_path}" f' && printf "set ssl cert {container_path} <<\\n%s\\n\\n" "$(cat {host_path})" | nc localhost {HAPROXY_PORT}' f' && echo "commit ssl cert {container_path}" | nc localhost {HAPROXY_PORT}' ) cmd = [ACME_SH, "--issue", "--dns", "dns_cf", "-d", domain] if wildcard: cmd.extend(["-d", f"*.{domain}"]) cmd.extend(["--reloadcmd", reload_cmd]) try: logger.info("Issuing certificate for %s", domain) result = run_command(cmd, timeout=CERT_TIMEOUT) if result.returncode != 0: error_msg = result.stderr or result.stdout return f"Error issuing certificate:\n{error_msg}" if _file_exists(host_path): success, msg = load_cert_to_haproxy(domain) if success: add_cert_to_config(domain) return f"Certificate issued and loaded for {domain} ({msg})" else: return f"Certificate issued but HAProxy loading failed: {msg}" else: return f"Certificate issued but PEM file not created. Check {host_path}" except TimeoutError: return f"Error: Certificate issuance timed out after {CERT_TIMEOUT}s" except OSError as e: logger.error("Error issuing certificate for %s: %s", domain, e) return f"Error: {e}" def _haproxy_renew_cert_impl(domain: str, force: bool) -> str: """Implementation of haproxy_renew_cert.""" if not validate_domain(domain): return "Error: Invalid domain format" cert_dir = f"{ACME_HOME}/{domain}_ecc" if not _file_exists(cert_dir): return f"Error: No certificate found for {domain}. Use haproxy_issue_cert first." cmd = [ACME_SH, "--renew", "-d", domain] if force: cmd.append("--force") try: logger.info("Renewing certificate for %s", domain) result = run_command(cmd, timeout=CERT_TIMEOUT) output = result.stdout + result.stderr if "Skip" in output and "Not yet due" in output: return f"Certificate for {domain} not due for renewal. Use force=True to force renewal." if "Cert success" in output or result.returncode == 0: success, msg = load_cert_to_haproxy(domain) if success: add_cert_to_config(domain) return f"Certificate renewed and reloaded for {domain} ({msg})" else: return f"Certificate renewed but HAProxy reload failed: {msg}" else: return f"Error renewing certificate:\n{output}" except TimeoutError: return f"Error: Certificate renewal timed out after {CERT_TIMEOUT}s" except FileNotFoundError: return "Error: acme.sh not found" except OSError as e: logger.error("Error renewing certificate for %s: %s", domain, e) return f"Error: {e}" def _haproxy_renew_all_certs_impl() -> str: """Implementation of haproxy_renew_all_certs.""" try: logger.info("Running certificate renewal cron") result = run_command([ACME_SH, "--cron"], timeout=CERT_TIMEOUT * 3) output = result.stdout + result.stderr renewed = output.count("Cert success") skipped = output.count("Skip") if renewed > 0: domains = load_certs_config() reloaded = 0 for domain in domains: success, _ = load_cert_to_haproxy(domain) if success: reloaded += 1 return f"Renewed {renewed} certificate(s), reloaded {reloaded} into HAProxy" elif skipped > 0: return f"No certificates due for renewal ({skipped} checked)" elif result.returncode != 0: return f"Error running renewal:\n{output}" else: return "Renewal check completed" except TimeoutError: return "Error: Renewal cron timed out" except FileNotFoundError: return "Error: acme.sh not found" except OSError as e: logger.error("Error running certificate renewal cron: %s", e) return f"Error: {e}" def _haproxy_delete_cert_impl(domain: str) -> str: """Implementation of haproxy_delete_cert.""" if not validate_domain(domain): return "Error: Invalid domain format" cert_dir = f"{ACME_HOME}/{domain}_ecc" host_path, _ = get_pem_paths(domain) if not _file_exists(cert_dir) and not _file_exists(host_path): return f"Error: No certificate found for {domain}" errors = [] deleted = [] success, msg = unload_cert_from_haproxy(domain) if success: deleted.append(f"HAProxy ({msg})") else: errors.append(f"HAProxy unload: {msg}") if _file_exists(cert_dir): try: result = run_command( [ACME_SH, "--remove", "-d", domain], timeout=SUBPROCESS_TIMEOUT, ) if result.returncode == 0: deleted.append("acme.sh") else: errors.append(f"acme.sh: {result.stderr}") except (TimeoutError, OSError) as e: errors.append(f"acme.sh: {e}") if _file_exists(host_path): try: result = run_command(["rm", "-f", host_path]) if result.returncode == 0: deleted.append("PEM file") else: errors.append(f"PEM file: {result.stderr}") except OSError as e: errors.append(f"PEM file: {e}") remove_cert_from_config(domain) result_parts = [] if deleted: result_parts.append(f"Deleted: {', '.join(deleted)}") if errors: result_parts.append(f"Errors: {'; '.join(errors)}") return "\n".join(result_parts) if result_parts else f"Certificate {domain} deleted" def _haproxy_load_cert_impl(domain: str) -> str: """Implementation of haproxy_load_cert.""" if not validate_domain(domain): return "Error: Invalid domain format" host_path, _ = get_pem_paths(domain) if not _file_exists(host_path): return f"Error: PEM file not found: {host_path}" success, msg = load_cert_to_haproxy(domain) if success: add_cert_to_config(domain) return f"Certificate {domain} loaded into HAProxy ({msg})" else: return f"Error loading certificate: {msg}" def register_certificate_tools(mcp): """Register certificate management tools with MCP server.""" @mcp.tool() def haproxy_list_certs() -> str: """List all SSL/TLS certificates with expiry information. Returns: List of certificates with domain, CA, created date, and renewal date """ return _haproxy_list_certs_impl() @mcp.tool() def haproxy_cert_info( domain: Annotated[str, Field(description="Domain name to check (e.g., example.com)")] ) -> str: """Get detailed certificate information for a domain. Shows expiry date, issuer, SANs, and file paths. """ return _haproxy_cert_info_impl(domain) @mcp.tool() def haproxy_issue_cert( domain: Annotated[str, Field(description="Primary domain (e.g., example.com)")], wildcard: Annotated[bool, Field(default=True, description="Include wildcard (*.example.com). Default: true")] = True ) -> str: """Issue a new SSL/TLS certificate using acme.sh with Cloudflare DNS. Automatically deploys to HAProxy via Runtime API (zero-downtime). Example: haproxy_issue_cert("example.com", wildcard=True) """ return _haproxy_issue_cert_impl(domain, wildcard) @mcp.tool() def haproxy_renew_cert( domain: Annotated[str, Field(description="Domain name to renew (e.g., example.com)")], force: Annotated[bool, Field(default=False, description="Force renewal even if not due. Default: false")] = False ) -> str: """Renew an existing certificate. Uses Runtime API for zero-downtime reload. Example: haproxy_renew_cert("example.com", force=True) """ return _haproxy_renew_cert_impl(domain, force) @mcp.tool() def haproxy_renew_all_certs() -> str: """Renew all certificates that are due for renewal. This runs the acme.sh cron job to check and renew all certificates. """ return _haproxy_renew_all_certs_impl() @mcp.tool() def haproxy_delete_cert( domain: Annotated[str, Field(description="Domain name to delete certificate for")] ) -> str: """Delete a certificate from acme.sh and HAProxy. WARNING: This permanently removes the certificate. The domain will lose HTTPS. Example: haproxy_delete_cert("example.com") """ return _haproxy_delete_cert_impl(domain) @mcp.tool() def haproxy_load_cert( domain: Annotated[str, Field(description="Domain name to load certificate for")] ) -> str: """Load/reload a certificate into HAProxy (zero-downtime). Use after manually updating a certificate file. Example: haproxy_load_cert("example.com") """ return _haproxy_load_cert_impl(domain)