From 7ebe204f893bbb6a688543f3a45d9021e67045dc Mon Sep 17 00:00:00 2001 From: kaffa Date: Mon, 2 Feb 2026 04:15:02 +0000 Subject: [PATCH] feat: Add certificate coverage check to haproxy_add_domain When adding a domain, now checks if an SSL certificate covers it: - Exact match: domain.com.pem - Wildcard match: parent.com.pem with *.parent.com SAN Output examples: - "SSL: Using certificate inouter.com (wildcard)" - "SSL: No certificate found. Use haproxy_issue_cert(...) to issue one." Co-Authored-By: Claude Opus 4.5 --- haproxy_mcp/tools/domains.py | 61 ++++++++++++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/haproxy_mcp/tools/domains.py b/haproxy_mcp/tools/domains.py index ad2b5ec..aa5ad09 100644 --- a/haproxy_mcp/tools/domains.py +++ b/haproxy_mcp/tools/domains.py @@ -1,6 +1,8 @@ """Domain management tools for HAProxy MCP Server.""" import fcntl +import os +import subprocess from typing import Annotated from pydantic import Field @@ -12,6 +14,7 @@ from ..config import ( MAX_SLOTS, StateField, STATE_MIN_COLUMNS, + SUBPROCESS_TIMEOUT, logger, ) from ..exceptions import HaproxyError @@ -27,6 +30,51 @@ from ..file_ops import ( remove_domain_from_config, ) +# Certificate paths +CERTS_DIR = "/opt/haproxy/certs" + + +def check_certificate_coverage(domain: str) -> tuple[bool, str]: + """Check if a domain is covered by an existing certificate. + + Args: + domain: Domain name to check (e.g., api.example.com) + + Returns: + Tuple of (is_covered, certificate_name or message) + """ + if not os.path.isdir(CERTS_DIR): + return False, "Certificate directory not found" + + # Check for exact match first + exact_pem = os.path.join(CERTS_DIR, f"{domain}.pem") + if os.path.exists(exact_pem): + return True, domain + + # Check for wildcard coverage (e.g., api.example.com covered by *.example.com) + parts = domain.split(".") + if len(parts) >= 2: + # Try parent domain (example.com for api.example.com) + parent_domain = ".".join(parts[1:]) + parent_pem = os.path.join(CERTS_DIR, f"{parent_domain}.pem") + + if os.path.exists(parent_pem): + # Verify the certificate has wildcard SAN + try: + result = subprocess.run( + ["openssl", "x509", "-in", parent_pem, "-noout", "-ext", "subjectAltName"], + capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT + ) + if result.returncode == 0: + # Check if wildcard covers this domain + wildcard = f"*.{parent_domain}" + if wildcard in result.stdout: + return True, f"{parent_domain} (wildcard)" + except (subprocess.TimeoutExpired, OSError): + pass + + return False, "No matching certificate" + def register_domain_tools(mcp): """Register domain management tools with MCP server.""" @@ -160,9 +208,18 @@ def register_domain_tools(mcp): remove_server_from_config(domain, 1) return f"Domain {domain} added to {pool} but server config failed: {e}" - return f"Domain {domain} added to {pool} with server {ip}:{http_port}" + result = f"Domain {domain} added to {pool} with server {ip}:{http_port}" + else: + result = f"Domain {domain} added to {pool} (no servers configured)" - return f"Domain {domain} added to {pool} (no servers configured)" + # Check certificate coverage + cert_covered, cert_info = check_certificate_coverage(domain) + if cert_covered: + result += f"\nSSL: Using certificate {cert_info}" + else: + result += f"\nSSL: No certificate found. Use haproxy_issue_cert(\"{domain}\") to issue one." + + return result except HaproxyError as e: return f"Error: {e}"