"""Unit tests for certificate management tools.""" import json import os from unittest.mock import patch, MagicMock import pytest class TestGetPemPaths: """Tests for get_pem_paths function.""" def test_get_pem_paths(self): """Get correct PEM paths.""" from haproxy_mcp.tools.certificates import get_pem_paths host_path, container_path = get_pem_paths("example.com") assert host_path == "/opt/haproxy/certs/example.com.pem" assert container_path == "/etc/haproxy/certs/example.com.pem" class TestLoadCertToHaproxy: """Tests for load_cert_to_haproxy function.""" def test_load_cert_file_not_found(self, tmp_path): """Fail when PEM file doesn't exist.""" from haproxy_mcp.tools.certificates import load_cert_to_haproxy with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): success, msg = load_cert_to_haproxy("example.com") assert success is False assert "not found" in msg.lower() def test_load_cert_new_cert(self, tmp_path, mock_socket_class, mock_select): """Load new certificate into HAProxy.""" # Create PEM file certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----") mock_sock = mock_socket_class(responses={ "show ssl cert": "", # No existing cert "new ssl cert": "", "set ssl cert": "", "commit ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import load_cert_to_haproxy success, msg = load_cert_to_haproxy("example.com") assert success is True assert msg == "added" def test_load_cert_update_existing(self, tmp_path, mock_socket_class, mock_select): """Update existing certificate in HAProxy.""" certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----") mock_sock = mock_socket_class(responses={ "show ssl cert": "/etc/haproxy/certs/example.com.pem", "set ssl cert": "", "commit ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import load_cert_to_haproxy success, msg = load_cert_to_haproxy("example.com") assert success is True assert msg == "updated" class TestUnloadCertFromHaproxy: """Tests for unload_cert_from_haproxy function.""" def test_unload_cert_not_loaded(self, mock_socket_class, mock_select): """Unload cert that's not loaded.""" mock_sock = mock_socket_class(responses={ "show ssl cert": "", }) with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import unload_cert_from_haproxy success, msg = unload_cert_from_haproxy("example.com") assert success is True assert msg == "not loaded" def test_unload_cert_success(self, mock_socket_class, mock_select): """Unload certificate successfully.""" mock_sock = mock_socket_class(responses={ "show ssl cert": "/etc/haproxy/certs/example.com.pem", "del ssl cert": "", }) with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import unload_cert_from_haproxy success, msg = unload_cert_from_haproxy("example.com") assert success is True assert msg == "unloaded" class TestRestoreCertificates: """Tests for restore_certificates function.""" def test_restore_no_certificates(self, patch_config_paths): """No certificates to restore.""" from haproxy_mcp.tools.certificates import restore_certificates result = restore_certificates() assert result == 0 def test_restore_certificates_success(self, patch_config_paths, tmp_path, mock_socket_class, mock_select): """Restore certificates successfully.""" # Save config with open(patch_config_paths["certs_file"], "w") as f: json.dump({"domains": ["example.com"]}, f) # Create PEM certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("cert content") mock_sock = mock_socket_class(responses={ "show ssl cert": "", "new ssl cert": "", "set ssl cert": "", "commit ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import restore_certificates result = restore_certificates() assert result == 1 class TestHaproxyListCerts: """Tests for haproxy_list_certs tool function.""" def test_list_certs_no_acme(self, mock_subprocess): """acme.sh not found.""" mock_subprocess.side_effect = FileNotFoundError() from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_list_certs"]() assert "not found" in result.lower() def test_list_certs_empty(self, mock_subprocess): """No certificates found.""" mock_subprocess.return_value = MagicMock( returncode=0, stdout="Main_Domain KeyLength SAN_Domains CA Created Renew\n", stderr="" ) from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_list_certs"]() assert "No certificates" in result def test_list_certs_success(self, mock_subprocess, mock_socket_class, mock_select, tmp_path): """List certificates successfully.""" mock_subprocess.return_value = MagicMock( returncode=0, stdout="Main_Domain KeyLength SAN_Domains CA Created Renew\nexample.com ec-256 *.example.com Google 2024-01-01T00:00:00Z 2024-03-01T00:00:00Z\n", stderr="" ) mock_sock = mock_socket_class(responses={ "show ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_list_certs"]() assert "example.com" in result class TestHaproxyCertInfo: """Tests for haproxy_cert_info tool function.""" def test_cert_info_invalid_domain(self): """Reject invalid domain format.""" from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_cert_info"](domain="-invalid") assert "Error" in result assert "Invalid domain" in result def test_cert_info_not_found(self, tmp_path): """Certificate not found.""" with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_cert_info"](domain="example.com") assert "Error" in result assert "not found" in result.lower() def test_cert_info_success(self, tmp_path, mock_subprocess, mock_socket_class, mock_select): """Get certificate info successfully.""" # Create PEM file pem_file = tmp_path / "example.com.pem" pem_file.write_text("cert content") mock_subprocess.return_value = MagicMock( returncode=0, stdout="subject=CN = example.com\nissuer=CN = Google Trust Services\nnotBefore=Jan 1 00:00:00 2024 GMT\nnotAfter=Apr 1 00:00:00 2024 GMT", stderr="" ) mock_sock = mock_socket_class(responses={ "show ssl cert": "/etc/haproxy/certs/example.com.pem", }) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_cert_info"](domain="example.com") assert "example.com" in result assert "Loaded in HAProxy: Yes" in result class TestHaproxyIssueCert: """Tests for haproxy_issue_cert tool function.""" def test_issue_cert_invalid_domain(self): """Reject invalid domain format.""" from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_issue_cert"](domain="-invalid", wildcard=True) assert "Error" in result assert "Invalid domain" in result def test_issue_cert_no_cf_token(self, tmp_path): """Fail when CF_Token is not set.""" with patch.dict(os.environ, {}, clear=True): with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): with patch("os.path.exists", return_value=False): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_issue_cert"](domain="example.com", wildcard=True) assert "CF_Token" in result def test_issue_cert_already_exists(self, tmp_path): """Fail when certificate already exists.""" cert_dir = tmp_path / "example.com_ecc" cert_dir.mkdir() with patch.dict(os.environ, {"CF_Token": "test_token"}): with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_issue_cert"](domain="example.com", wildcard=True) assert "already exists" in result class TestHaproxyRenewCert: """Tests for haproxy_renew_cert tool function.""" def test_renew_cert_invalid_domain(self): """Reject invalid domain format.""" from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_cert"](domain="-invalid", force=False) assert "Error" in result assert "Invalid domain" in result def test_renew_cert_not_found(self, tmp_path): """Fail when certificate doesn't exist.""" with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_cert"](domain="example.com", force=False) assert "Error" in result assert "No certificate found" in result def test_renew_cert_not_due(self, tmp_path, mock_subprocess): """Certificate not due for renewal.""" cert_dir = tmp_path / "example.com_ecc" cert_dir.mkdir() mock_subprocess.return_value = MagicMock( returncode=0, stdout="Skip, Next renewal time is: ...\n", stderr="Not yet due for renewal" ) with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_cert"](domain="example.com", force=False) assert "not due for renewal" in result class TestHaproxyRenewAllCerts: """Tests for haproxy_renew_all_certs tool function.""" def test_renew_all_no_renewals(self, mock_subprocess): """No certificates due for renewal.""" mock_subprocess.return_value = MagicMock( returncode=0, stdout="Checking: example.com\nSkip, Next renewal time...", stderr="" ) from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_all_certs"]() assert "No certificates due" in result or "checked" in result class TestHaproxyDeleteCert: """Tests for haproxy_delete_cert tool function.""" def test_delete_cert_invalid_domain(self): """Reject invalid domain format.""" from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_delete_cert"](domain="-invalid") assert "Error" in result assert "Invalid domain" in result def test_delete_cert_not_found(self, tmp_path): """Fail when certificate doesn't exist.""" with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_delete_cert"](domain="example.com") assert "Error" in result assert "No certificate found" in result def test_delete_cert_success(self, tmp_path, mock_subprocess, mock_socket_class, mock_select, patch_config_paths): """Delete certificate successfully.""" # Create cert dir and PEM cert_dir = tmp_path / "acme" / "example.com_ecc" cert_dir.mkdir(parents=True) certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("cert") mock_subprocess.return_value = MagicMock(returncode=0, stdout="", stderr="") mock_sock = mock_socket_class(responses={ "show ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path / "acme")): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_delete_cert"](domain="example.com") assert "Deleted" in result class TestHaproxyLoadCert: """Tests for haproxy_load_cert tool function.""" def test_load_cert_invalid_domain(self): """Reject invalid domain format.""" from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_load_cert"](domain="-invalid") assert "Error" in result assert "Invalid domain" in result def test_load_cert_not_found(self, tmp_path): """Fail when PEM file doesn't exist.""" with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_load_cert"](domain="example.com") assert "Error" in result assert "not found" in result.lower() def test_load_cert_success(self, tmp_path, mock_socket_class, mock_select, patch_config_paths): """Load certificate successfully.""" pem_file = tmp_path / "example.com.pem" pem_file.write_text("cert content") mock_sock = mock_socket_class(responses={ "show ssl cert": "", "new ssl cert": "", "set ssl cert": "", "commit ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_load_cert"](domain="example.com") assert "loaded" in result.lower() assert "example.com" in result class TestHaproxyIssueCertTimeout: """Tests for haproxy_issue_cert timeout scenarios.""" def test_issue_cert_acme_timeout(self, tmp_path, mock_subprocess): """Handle acme.sh timeout during certificate issuance.""" import subprocess mock_subprocess.side_effect = subprocess.TimeoutExpired("acme.sh", 120) with patch.dict(os.environ, {"CF_Token": "test_token"}): with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_issue_cert"]( domain="example.com", wildcard=True ) assert "timed out" in result.lower() def test_issue_cert_acme_failure(self, tmp_path, mock_subprocess): """Handle acme.sh failure during certificate issuance.""" mock_subprocess.return_value = MagicMock( returncode=1, stdout="", stderr="DNS verification failed" ) with patch.dict(os.environ, {"CF_Token": "test_token"}): with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_issue_cert"]( domain="example.com", wildcard=True ) assert "Error" in result assert "DNS verification failed" in result def test_issue_cert_success(self, tmp_path, mock_subprocess, mock_socket_class, mock_select, patch_config_paths): """Successfully issue a certificate.""" # Create certs directory certs_dir = tmp_path / "certs" certs_dir.mkdir() mock_subprocess.return_value = MagicMock( returncode=0, stdout="Cert success", stderr="" ) # Create PEM file (simulating acme.sh reloadcmd) pem_file = certs_dir / "example.com.pem" def create_pem_file(*args, **kwargs): pem_file.write_text("cert content") return MagicMock(returncode=0, stdout="", stderr="") mock_subprocess.side_effect = create_pem_file mock_sock = mock_socket_class(responses={ "show ssl cert": "", "new ssl cert": "", "set ssl cert": "", "commit ssl cert": "", }) with patch.dict(os.environ, {"CF_Token": "test_token"}): with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_issue_cert"]( domain="example.com", wildcard=True ) assert "issued" in result.lower() or "loaded" in result.lower() class TestHaproxyRenewCertTimeout: """Tests for haproxy_renew_cert timeout scenarios.""" def test_renew_cert_timeout(self, tmp_path, mock_subprocess): """Handle acme.sh timeout during certificate renewal.""" import subprocess cert_dir = tmp_path / "example.com_ecc" cert_dir.mkdir() mock_subprocess.side_effect = subprocess.TimeoutExpired("acme.sh", 120) with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_cert"]( domain="example.com", force=True ) assert "timed out" in result.lower() def test_renew_cert_success(self, tmp_path, mock_subprocess, mock_socket_class, mock_select, patch_config_paths): """Successfully renew a certificate.""" cert_dir = tmp_path / "example.com_ecc" cert_dir.mkdir() certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("cert content") mock_subprocess.return_value = MagicMock( returncode=0, stdout="Cert success", stderr="" ) mock_sock = mock_socket_class(responses={ "show ssl cert": "", "new ssl cert": "", "set ssl cert": "", "commit ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_cert"]( domain="example.com", force=True ) assert "renewed" in result.lower() class TestHaproxyRenewAllCertsMultiple: """Tests for haproxy_renew_all_certs with multiple certificates.""" def test_renew_all_certs_multiple_renewals(self, mock_subprocess, mock_socket_class, mock_select, patch_config_paths, tmp_path): """Renew multiple certificates successfully.""" # Write config with multiple domains with open(patch_config_paths["certs_file"], "w") as f: json.dump({"domains": ["example.com", "example.org"]}, f) # Create PEM files certs_dir = tmp_path / "certs" certs_dir.mkdir() (certs_dir / "example.com.pem").write_text("cert1") (certs_dir / "example.org.pem").write_text("cert2") mock_subprocess.return_value = MagicMock( returncode=0, stdout="Cert success\nCert success", # Two successful renewals stderr="" ) mock_sock = mock_socket_class(responses={ "show ssl cert": "", "new ssl cert": "", "set ssl cert": "", "commit ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_all_certs"]() assert "Renewed 2" in result assert "reloaded" in result.lower() def test_renew_all_certs_timeout(self, mock_subprocess): """Handle timeout during renewal cron.""" import subprocess mock_subprocess.side_effect = subprocess.TimeoutExpired("acme.sh", 360) from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_all_certs"]() assert "timed out" in result.lower() def test_renew_all_certs_error(self, mock_subprocess): """Handle error during renewal cron.""" mock_subprocess.return_value = MagicMock( returncode=1, stdout="", stderr="ACME server error" ) from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_renew_all_certs"]() assert "Error" in result or "ACME server error" in result class TestHaproxyDeleteCertPartialFailure: """Tests for haproxy_delete_cert partial failure scenarios.""" def test_delete_cert_haproxy_unload_failure(self, tmp_path, mock_subprocess, mock_socket_class, mock_select, patch_config_paths): """Handle HAProxy unload failure during certificate deletion.""" # Create cert dir and PEM cert_dir = tmp_path / "acme" / "example.com_ecc" cert_dir.mkdir(parents=True) certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("cert") # Mock acme.sh removal success mock_subprocess.return_value = MagicMock(returncode=0, stdout="", stderr="") # Mock HAProxy to fail on unload mock_sock = mock_socket_class(responses={ "show ssl cert": "/etc/haproxy/certs/example.com.pem", "del ssl cert": "error: unable to delete certificate", }) with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path / "acme")): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_delete_cert"](domain="example.com") # Should still delete acme.sh and PEM even if HAProxy unload fails assert "Deleted" in result or "acme.sh" in result def test_delete_cert_acme_removal_failure(self, tmp_path, mock_subprocess, mock_socket_class, mock_select, patch_config_paths): """Handle acme.sh removal failure during certificate deletion.""" # Create cert dir and PEM cert_dir = tmp_path / "acme" / "example.com_ecc" cert_dir.mkdir(parents=True) certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("cert") # Mock acme.sh removal failure mock_subprocess.return_value = MagicMock( returncode=1, stdout="", stderr="Failed to remove certificate" ) mock_sock = mock_socket_class(responses={ "show ssl cert": "", # Not loaded }) with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path / "acme")): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_delete_cert"](domain="example.com") # Should report partial success (PEM deleted) and error (acme.sh failed) assert "Deleted" in result or "PEM" in result assert "Errors" in result or "acme.sh" in result def test_delete_cert_pem_removal_failure(self, tmp_path, mock_subprocess, mock_socket_class, mock_select, patch_config_paths): """Handle PEM file removal failure during certificate deletion.""" # Create cert dir but make PEM read-only cert_dir = tmp_path / "acme" / "example.com_ecc" cert_dir.mkdir(parents=True) certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("cert") # Mock acme.sh removal success mock_subprocess.return_value = MagicMock(returncode=0, stdout="", stderr="") mock_sock = mock_socket_class(responses={ "show ssl cert": "", # Not loaded }) # Mock os.remove to fail def mock_remove(path): if "example.com.pem" in str(path): raise PermissionError("Permission denied") raise FileNotFoundError() with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path / "acme")): with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): with patch("os.remove", side_effect=mock_remove): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_delete_cert"](domain="example.com") # Should report partial success (acme.sh deleted) and error (PEM failed) assert "Deleted" in result assert "Errors" in result or "Permission" in result class TestLoadCertToHaproxyError: """Tests for load_cert_to_haproxy error handling.""" def test_load_cert_exception_handling(self, tmp_path): """Handle exception during certificate loading.""" certs_dir = tmp_path / "certs" certs_dir.mkdir() pem_file = certs_dir / "example.com.pem" pem_file.write_text("cert content") # Mock haproxy_cmd to raise HaproxyError from haproxy_mcp.exceptions import HaproxyError with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("haproxy_mcp.tools.certificates.haproxy_cmd", side_effect=HaproxyError("Connection failed")): from haproxy_mcp.tools.certificates import load_cert_to_haproxy success, msg = load_cert_to_haproxy("example.com") # load_cert_to_haproxy catches exceptions and returns False, msg assert success is False assert "Connection failed" in msg class TestUnloadCertFromHaproxyError: """Tests for unload_cert_from_haproxy error handling.""" def test_unload_cert_haproxy_error(self, mock_socket_class, mock_select): """Handle HAProxy command error during certificate unloading.""" # Mock HAProxy to return error on del ssl cert mock_sock = mock_socket_class(responses={ "show ssl cert": "/etc/haproxy/certs/example.com.pem", "del ssl cert": "error: certificate in use", }) with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import unload_cert_from_haproxy success, msg = unload_cert_from_haproxy("example.com") # unload_cert_from_haproxy catches exceptions and returns False, msg # or may return True "unloaded" since the mock doesn't raise exception assert success is True or "error" in msg.lower() class TestRestoreCertificatesFailure: """Tests for restore_certificates failure scenarios.""" def test_restore_certificates_partial_failure(self, patch_config_paths, tmp_path, mock_socket_class, mock_select): """Handle partial failure when restoring certificates.""" # Save config with multiple domains with open(patch_config_paths["certs_file"], "w") as f: json.dump({"domains": ["example.com", "missing.com"]}, f) # Create only one PEM file certs_dir = tmp_path / "certs" certs_dir.mkdir() (certs_dir / "example.com.pem").write_text("cert content") mock_sock = mock_socket_class(responses={ "show ssl cert": "", "new ssl cert": "", "set ssl cert": "", "commit ssl cert": "", }) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)): with patch("socket.socket", return_value=mock_sock): from haproxy_mcp.tools.certificates import restore_certificates result = restore_certificates() # Should restore 1 (example.com exists), skip 1 (missing.com doesn't exist) assert result == 1 class TestHaproxyListCertsTimeout: """Tests for haproxy_list_certs timeout scenarios.""" def test_list_certs_timeout(self, mock_subprocess): """Handle timeout during certificate listing.""" import subprocess mock_subprocess.side_effect = subprocess.TimeoutExpired("acme.sh", 30) from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_list_certs"]() assert "timed out" in result.lower() class TestHaproxyCertInfoTimeout: """Tests for haproxy_cert_info timeout scenarios.""" def test_cert_info_timeout(self, tmp_path, mock_subprocess): """Handle timeout during certificate info retrieval.""" import subprocess pem_file = tmp_path / "example.com.pem" pem_file.write_text("cert content") mock_subprocess.side_effect = subprocess.TimeoutExpired("openssl", 30) with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(tmp_path)): from haproxy_mcp.tools.certificates import register_certificate_tools mcp = MagicMock() registered_tools = {} def capture_tool(): def decorator(func): registered_tools[func.__name__] = func return func return decorator mcp.tool = capture_tool register_certificate_tools(mcp) result = registered_tools["haproxy_cert_info"](domain="example.com") assert "timed out" in result.lower()