## Large function extraction - servers.py: Extract 8 _impl functions from register_server_tools (449 lines) - certificates.py: Extract 7 _impl functions from register_certificate_tools (386 lines) - MCP tool wrappers now delegate to module-level implementation functions ## Exception handling improvements - Replace 11 broad `except Exception` with specific types - health.py: (OSError, subprocess.SubprocessError) - configuration.py: (HaproxyError, IOError, OSError, ValueError) - servers.py: (IOError, OSError, ValueError) - certificates.py: FileNotFoundError, (subprocess.SubprocessError, OSError) ## Duplicate code extraction - Add parse_servers_state() to utils.py (replaces 4 duplicate parsers) - Add disable_server_slot() to utils.py (replaces duplicate patterns) - Update health.py, servers.py, domains.py to use new helpers ## Other improvements - Add TypedDict types in file_ops.py and health.py - Set file permissions (0o600) for sensitive files - Update tests to use specific exception types Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
81 lines
2.8 KiB
Python
81 lines
2.8 KiB
Python
"""Utility functions for HAProxy MCP Server."""
|
|
|
|
from typing import Dict, Generator
|
|
from .config import StatField, StateField, STATE_MIN_COLUMNS
|
|
from .haproxy_client import haproxy_cmd_batch
|
|
|
|
|
|
def parse_servers_state(state_output: str) -> dict[str, dict[str, dict[str, str]]]:
|
|
"""Parse 'show servers state' output.
|
|
|
|
Args:
|
|
state_output: Raw output from HAProxy 'show servers state' command
|
|
|
|
Returns:
|
|
Nested dict: {backend: {server: {addr: str, port: str, state: str}}}
|
|
|
|
Example:
|
|
state = haproxy_cmd("show servers state")
|
|
parsed = parse_servers_state(state)
|
|
# parsed["pool_1"]["pool_1_1"] == {"addr": "10.0.0.1", "port": "8080", "state": "2"}
|
|
"""
|
|
result: dict[str, dict[str, dict[str, str]]] = {}
|
|
for line in state_output.split("\n"):
|
|
parts = line.split()
|
|
if len(parts) >= STATE_MIN_COLUMNS:
|
|
backend = parts[StateField.BE_NAME]
|
|
server = parts[StateField.SRV_NAME]
|
|
addr = parts[StateField.SRV_ADDR]
|
|
port = parts[StateField.SRV_PORT]
|
|
state = parts[StateField.SRV_OP_STATE] if len(parts) > StateField.SRV_OP_STATE else ""
|
|
|
|
if backend not in result:
|
|
result[backend] = {}
|
|
result[backend][server] = {
|
|
"addr": addr,
|
|
"port": port,
|
|
"state": state,
|
|
}
|
|
return result
|
|
|
|
|
|
def disable_server_slot(backend: str, server: str) -> None:
|
|
"""Disable a server slot (set to maint and clear address).
|
|
|
|
Args:
|
|
backend: Backend name (e.g., 'pool_1')
|
|
server: Server name (e.g., 'pool_1_1')
|
|
|
|
Raises:
|
|
HaproxyError: If HAProxy command fails
|
|
"""
|
|
haproxy_cmd_batch([
|
|
f"set server {backend}/{server} state maint",
|
|
f"set server {backend}/{server} addr 0.0.0.0 port 0"
|
|
])
|
|
|
|
|
|
def parse_stat_csv(stat_output: str) -> Generator[Dict[str, str], None, None]:
|
|
"""Parse HAProxy stat CSV output into structured data.
|
|
|
|
Args:
|
|
stat_output: Raw output from 'show stat' command
|
|
|
|
Yields:
|
|
Dictionaries with parsed stat fields for each row
|
|
"""
|
|
for line in stat_output.split("\n"):
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
parts = line.split(",")
|
|
if len(parts) > StatField.STATUS:
|
|
yield {
|
|
"pxname": parts[StatField.PXNAME],
|
|
"svname": parts[StatField.SVNAME],
|
|
"scur": parts[StatField.SCUR] if len(parts) > StatField.SCUR else "0",
|
|
"smax": parts[StatField.SMAX] if len(parts) > StatField.SMAX else "0",
|
|
"status": parts[StatField.STATUS],
|
|
"weight": parts[StatField.WEIGHT] if len(parts) > StatField.WEIGHT else "0",
|
|
"check_status": parts[StatField.CHECK_STATUS] if len(parts) > StatField.CHECK_STATUS else "",
|
|
}
|