refactor: migrate data storage from JSON/map files to SQLite
Replace servers.json, certificates.json, and map file parsing with SQLite (WAL mode) as single source of truth. HAProxy map files are now generated from SQLite via sync_map_files(). Key changes: - Add db.py with schema, connection management, and JSON migration - Add DB_FILE config constant - Delegate file_ops.py functions to db.py - Refactor domains.py to use file_ops instead of direct list manipulation - Fix subprocess.TimeoutExpired not caught (doesn't inherit TimeoutError) - Add DB health check in health.py - Init DB on startup in server.py and __main__.py - Update all 359 tests to use SQLite-backed functions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,6 +6,8 @@ from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from haproxy_mcp.file_ops import add_cert_to_config
|
||||
|
||||
|
||||
class TestGetPemPaths:
|
||||
"""Tests for get_pem_paths function."""
|
||||
@@ -127,8 +129,7 @@ class TestRestoreCertificates:
|
||||
def test_restore_certificates_success(self, patch_config_paths, tmp_path, mock_socket_class, mock_select):
|
||||
"""Restore certificates successfully."""
|
||||
# Save config
|
||||
with open(patch_config_paths["certs_file"], "w") as f:
|
||||
json.dump({"domains": ["example.com"]}, f)
|
||||
add_cert_to_config("example.com")
|
||||
|
||||
# Create PEM
|
||||
certs_dir = tmp_path / "certs"
|
||||
@@ -283,11 +284,17 @@ class TestHaproxyCertInfo:
|
||||
pem_file = tmp_path / "example.com.pem"
|
||||
pem_file.write_text("cert content")
|
||||
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=0,
|
||||
stdout="subject=CN = example.com\nissuer=CN = Google Trust Services\nnotBefore=Jan 1 00:00:00 2024 GMT\nnotAfter=Apr 1 00:00:00 2024 GMT",
|
||||
stderr=""
|
||||
)
|
||||
def subprocess_side_effect(*args, **kwargs):
|
||||
cmd = args[0] if args else kwargs.get("args", [])
|
||||
if isinstance(cmd, list) and "stat" in cmd:
|
||||
return MagicMock(returncode=0, stdout="1704067200", stderr="")
|
||||
return MagicMock(
|
||||
returncode=0,
|
||||
stdout="subject=CN = example.com\nissuer=CN = Google Trust Services\nnotBefore=Jan 1 00:00:00 2024 GMT\nnotAfter=Apr 1 00:00:00 2024 GMT",
|
||||
stderr=""
|
||||
)
|
||||
|
||||
mock_subprocess.side_effect = subprocess_side_effect
|
||||
|
||||
mock_sock = mock_socket_class(responses={
|
||||
"show ssl cert": "/etc/haproxy/certs/example.com.pem",
|
||||
@@ -337,25 +344,33 @@ class TestHaproxyIssueCert:
|
||||
assert "Error" in result
|
||||
assert "Invalid domain" in result
|
||||
|
||||
def test_issue_cert_no_cf_token(self, tmp_path):
|
||||
def test_issue_cert_no_cf_token(self, tmp_path, mock_subprocess):
|
||||
"""Fail when CF_Token is not set."""
|
||||
acme_sh = str(tmp_path / "acme.sh")
|
||||
mock_subprocess.return_value = MagicMock(
|
||||
returncode=1,
|
||||
stdout="",
|
||||
stderr="CF_Token is not set. Please export CF_Token environment variable.",
|
||||
)
|
||||
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path)):
|
||||
with patch("os.path.exists", return_value=False):
|
||||
from haproxy_mcp.tools.certificates import register_certificate_tools
|
||||
mcp = MagicMock()
|
||||
registered_tools = {}
|
||||
with patch("haproxy_mcp.tools.certificates.ACME_SH", acme_sh):
|
||||
with patch("os.path.exists", return_value=False):
|
||||
from haproxy_mcp.tools.certificates import register_certificate_tools
|
||||
mcp = MagicMock()
|
||||
registered_tools = {}
|
||||
|
||||
def capture_tool():
|
||||
def decorator(func):
|
||||
registered_tools[func.__name__] = func
|
||||
return func
|
||||
return decorator
|
||||
def capture_tool():
|
||||
def decorator(func):
|
||||
registered_tools[func.__name__] = func
|
||||
return func
|
||||
return decorator
|
||||
|
||||
mcp.tool = capture_tool
|
||||
register_certificate_tools(mcp)
|
||||
mcp.tool = capture_tool
|
||||
register_certificate_tools(mcp)
|
||||
|
||||
result = registered_tools["haproxy_issue_cert"](domain="example.com", wildcard=True)
|
||||
result = registered_tools["haproxy_issue_cert"](domain="example.com", wildcard=True)
|
||||
|
||||
assert "CF_Token" in result
|
||||
|
||||
@@ -845,8 +860,8 @@ class TestHaproxyRenewAllCertsMultiple:
|
||||
def test_renew_all_certs_multiple_renewals(self, mock_subprocess, mock_socket_class, mock_select, patch_config_paths, tmp_path):
|
||||
"""Renew multiple certificates successfully."""
|
||||
# Write config with multiple domains
|
||||
with open(patch_config_paths["certs_file"], "w") as f:
|
||||
json.dump({"domains": ["example.com", "example.org"]}, f)
|
||||
add_cert_to_config("example.com")
|
||||
add_cert_to_config("example.org")
|
||||
|
||||
# Create PEM files
|
||||
certs_dir = tmp_path / "certs"
|
||||
@@ -1038,30 +1053,32 @@ class TestHaproxyDeleteCertPartialFailure:
|
||||
"show ssl cert": "", # Not loaded
|
||||
})
|
||||
|
||||
# Mock os.remove to fail
|
||||
def mock_remove(path):
|
||||
if "example.com.pem" in str(path):
|
||||
raise PermissionError("Permission denied")
|
||||
raise FileNotFoundError()
|
||||
# Mock subprocess to succeed for acme.sh remove but fail for rm (PEM removal)
|
||||
def subprocess_side_effect(*args, **kwargs):
|
||||
cmd = args[0] if args else kwargs.get("args", [])
|
||||
if isinstance(cmd, list) and cmd[0] == "rm":
|
||||
return MagicMock(returncode=1, stdout="", stderr="Permission denied")
|
||||
return MagicMock(returncode=0, stdout="", stderr="")
|
||||
|
||||
mock_subprocess.side_effect = subprocess_side_effect
|
||||
|
||||
with patch("haproxy_mcp.tools.certificates.ACME_HOME", str(tmp_path / "acme")):
|
||||
with patch("haproxy_mcp.tools.certificates.CERTS_DIR", str(certs_dir)):
|
||||
with patch("socket.socket", return_value=mock_sock):
|
||||
with patch("os.remove", side_effect=mock_remove):
|
||||
from haproxy_mcp.tools.certificates import register_certificate_tools
|
||||
mcp = MagicMock()
|
||||
registered_tools = {}
|
||||
from haproxy_mcp.tools.certificates import register_certificate_tools
|
||||
mcp = MagicMock()
|
||||
registered_tools = {}
|
||||
|
||||
def capture_tool():
|
||||
def decorator(func):
|
||||
registered_tools[func.__name__] = func
|
||||
return func
|
||||
return decorator
|
||||
def capture_tool():
|
||||
def decorator(func):
|
||||
registered_tools[func.__name__] = func
|
||||
return func
|
||||
return decorator
|
||||
|
||||
mcp.tool = capture_tool
|
||||
register_certificate_tools(mcp)
|
||||
mcp.tool = capture_tool
|
||||
register_certificate_tools(mcp)
|
||||
|
||||
result = registered_tools["haproxy_delete_cert"](domain="example.com")
|
||||
result = registered_tools["haproxy_delete_cert"](domain="example.com")
|
||||
|
||||
# Should report partial success (acme.sh deleted) and error (PEM failed)
|
||||
assert "Deleted" in result
|
||||
@@ -1118,8 +1135,8 @@ class TestRestoreCertificatesFailure:
|
||||
def test_restore_certificates_partial_failure(self, patch_config_paths, tmp_path, mock_socket_class, mock_select):
|
||||
"""Handle partial failure when restoring certificates."""
|
||||
# Save config with multiple domains
|
||||
with open(patch_config_paths["certs_file"], "w") as f:
|
||||
json.dump({"domains": ["example.com", "missing.com"]}, f)
|
||||
add_cert_to_config("example.com")
|
||||
add_cert_to_config("missing.com")
|
||||
|
||||
# Create only one PEM file
|
||||
certs_dir = tmp_path / "certs"
|
||||
|
||||
Reference in New Issue
Block a user