refactor: Move certificate config functions to file_ops.py

- Move load_certs_config, save_certs_config, add_cert_to_config,
  remove_cert_from_config from certificates.py to file_ops.py
- Add CERTS_FILE constant to config.py
- Add file locking for certificate config operations (was missing)
- Consistent pattern with servers.json handling

certificates.py: 543 → 503 lines
file_ops.py: 263 → 337 lines

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
kaffa
2026-02-02 04:26:55 +00:00
parent 79254835e9
commit 6ced2b42d4
3 changed files with 83 additions and 47 deletions

View File

@@ -1,6 +1,5 @@
"""Certificate management tools for HAProxy MCP Server."""
import json
import os
import subprocess
from datetime import datetime
@@ -11,14 +10,17 @@ from pydantic import Field
from ..config import logger, SUBPROCESS_TIMEOUT
from ..validation import validate_domain
from ..haproxy_client import haproxy_cmd
from ..file_ops import atomic_write_file
from ..file_ops import (
load_certs_config,
add_cert_to_config,
remove_cert_from_config,
)
# Certificate paths
ACME_SH = os.path.expanduser("~/.acme.sh/acme.sh")
ACME_HOME = os.path.expanduser("~/.acme.sh")
CERTS_DIR = "/opt/haproxy/certs"
CERTS_DIR_CONTAINER = "/etc/haproxy/certs"
CERTS_JSON = "/opt/haproxy/conf/certificates.json"
# Longer timeout for certificate operations (ACME can be slow)
CERT_TIMEOUT = 120
@@ -39,48 +41,6 @@ def get_pem_paths(domain: str) -> tuple[str, str]:
)
def load_cert_config() -> list[str]:
"""Load certificate domain list from JSON file.
Returns:
List of domain names
"""
try:
with open(CERTS_JSON, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("domains", [])
except FileNotFoundError:
return []
except json.JSONDecodeError as e:
logger.warning("Corrupt certificates.json: %s", e)
return []
def save_cert_config(domains: list[str]) -> None:
"""Save certificate domain list to JSON file atomically.
Args:
domains: List of domain names
"""
atomic_write_file(CERTS_JSON, json.dumps({"domains": sorted(domains)}, indent=2))
def add_cert_to_config(domain: str) -> None:
"""Add a domain to the certificate config."""
domains = load_cert_config()
if domain not in domains:
domains.append(domain)
save_cert_config(domains)
def remove_cert_from_config(domain: str) -> None:
"""Remove a domain from the certificate config."""
domains = load_cert_config()
if domain in domains:
domains.remove(domain)
save_cert_config(domains)
def load_cert_to_haproxy(domain: str) -> tuple[bool, str]:
"""Load a certificate into HAProxy via Runtime API (zero-downtime).
@@ -149,7 +109,7 @@ def restore_certificates() -> int:
Returns:
Number of certificates restored
"""
domains = load_cert_config()
domains = load_certs_config()
restored = 0
for domain in domains:
@@ -436,7 +396,7 @@ def register_certificate_tools(mcp):
# Reload any renewed certs into HAProxy
if renewed > 0:
domains = load_cert_config()
domains = load_certs_config()
reloaded = 0
for domain in domains:
success, _ = load_cert_to_haproxy(domain)