- Split monolithic mcp/server.py (1874 lines) into haproxy_mcp/ package:
- config.py: Configuration constants and environment variables
- exceptions.py: Custom exception classes
- validation.py: Input validation functions
- haproxy_client.py: HAProxy Runtime API client with batch support
- file_ops.py: Atomic file operations with locking
- utils.py: CSV parsing utilities
- tools/: MCP tools organized by function
- domains.py: Domain management (3 tools)
- servers.py: Server management (7 tools)
- health.py: Health checks (3 tools)
- monitoring.py: Monitoring (4 tools)
- configuration.py: Config management (4 tools)
- Add haproxy_cmd_batch() for sending multiple commands in single TCP connection
- Optimize server operations: 1 connection instead of 2 per server
- Optimize startup restore: All servers in 1 connection (was 2×N)
- Update type hints to Python 3.9+ style (built-in generics)
- Remove unused imports and functions
- Update CLAUDE.md with new structure and performance notes
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
216 lines
6.9 KiB
Python
216 lines
6.9 KiB
Python
"""HAProxy Runtime API client functions."""
|
|
|
|
import socket
|
|
import subprocess
|
|
import select
|
|
import time
|
|
|
|
from .config import (
|
|
HAPROXY_SOCKET,
|
|
HAPROXY_CONTAINER,
|
|
SOCKET_TIMEOUT,
|
|
SOCKET_RECV_TIMEOUT,
|
|
MAX_RESPONSE_SIZE,
|
|
SUBPROCESS_TIMEOUT,
|
|
)
|
|
from .exceptions import HaproxyError
|
|
|
|
|
|
def haproxy_cmd(command: str) -> str:
|
|
"""Send command to HAProxy Runtime API.
|
|
|
|
Args:
|
|
command: The HAProxy runtime API command to execute
|
|
|
|
Returns:
|
|
The response from HAProxy
|
|
|
|
Raises:
|
|
HaproxyError: If connection fails, times out, or response exceeds size limit
|
|
"""
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.settimeout(SOCKET_TIMEOUT)
|
|
s.connect(HAPROXY_SOCKET)
|
|
s.sendall(f"{command}\n".encode())
|
|
s.shutdown(socket.SHUT_WR)
|
|
|
|
# Set socket to non-blocking for select-based recv loop
|
|
s.setblocking(False)
|
|
response = b""
|
|
start_time = time.time()
|
|
|
|
while True:
|
|
# Check for overall timeout
|
|
elapsed = time.time() - start_time
|
|
if elapsed >= SOCKET_RECV_TIMEOUT:
|
|
raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds")
|
|
|
|
# Wait for data with timeout (remaining time)
|
|
remaining = SOCKET_RECV_TIMEOUT - elapsed
|
|
ready, _, _ = select.select([s], [], [], min(remaining, 1.0))
|
|
|
|
if ready:
|
|
data = s.recv(8192)
|
|
if not data:
|
|
break
|
|
response += data
|
|
if len(response) > MAX_RESPONSE_SIZE:
|
|
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
|
|
|
|
return response.decode().strip()
|
|
except socket.timeout:
|
|
raise HaproxyError("Connection timeout")
|
|
except ConnectionRefusedError:
|
|
raise HaproxyError("Connection refused - HAProxy not running?")
|
|
except UnicodeDecodeError:
|
|
raise HaproxyError("Invalid UTF-8 in response")
|
|
except HaproxyError:
|
|
raise
|
|
except Exception as e:
|
|
raise HaproxyError(str(e)) from e
|
|
|
|
|
|
def haproxy_cmd_checked(command: str) -> str:
|
|
"""Send command to HAProxy and raise on error response.
|
|
|
|
Args:
|
|
command: HAProxy command to execute
|
|
|
|
Returns:
|
|
Command response
|
|
|
|
Raises:
|
|
HaproxyError: If HAProxy returns an error message
|
|
"""
|
|
result = haproxy_cmd(command)
|
|
_check_response_for_errors(result)
|
|
return result
|
|
|
|
|
|
def _check_response_for_errors(response: str) -> None:
|
|
"""Check HAProxy response for error indicators.
|
|
|
|
Args:
|
|
response: Response string from HAProxy
|
|
|
|
Raises:
|
|
HaproxyError: If response contains error indicators
|
|
"""
|
|
error_indicators = ["No such", "not found", "error", "failed", "invalid", "unknown"]
|
|
if response:
|
|
response_lower = response.lower()
|
|
for indicator in error_indicators:
|
|
if indicator.lower() in response_lower:
|
|
raise HaproxyError(f"HAProxy command failed: {response.strip()}")
|
|
|
|
|
|
def haproxy_cmd_batch(commands: list[str]) -> list[str]:
|
|
"""Send multiple commands to HAProxy in a single connection.
|
|
|
|
This is more efficient than multiple haproxy_cmd calls as it reuses
|
|
the same TCP connection for all commands.
|
|
|
|
Args:
|
|
commands: List of HAProxy commands to execute
|
|
|
|
Returns:
|
|
List of responses for each command
|
|
|
|
Raises:
|
|
HaproxyError: If connection fails or any command returns an error
|
|
"""
|
|
if not commands:
|
|
return []
|
|
|
|
if len(commands) == 1:
|
|
return [haproxy_cmd_checked(commands[0])]
|
|
|
|
# Send all commands separated by newlines
|
|
combined = "\n".join(commands)
|
|
try:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.settimeout(SOCKET_TIMEOUT)
|
|
s.connect(HAPROXY_SOCKET)
|
|
s.sendall(f"{combined}\n".encode())
|
|
s.shutdown(socket.SHUT_WR)
|
|
|
|
# Set socket to non-blocking for select-based recv loop
|
|
s.setblocking(False)
|
|
response = b""
|
|
start_time = time.time()
|
|
|
|
while True:
|
|
elapsed = time.time() - start_time
|
|
if elapsed >= SOCKET_RECV_TIMEOUT:
|
|
raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds")
|
|
|
|
remaining = SOCKET_RECV_TIMEOUT - elapsed
|
|
ready, _, _ = select.select([s], [], [], min(remaining, 1.0))
|
|
|
|
if ready:
|
|
data = s.recv(8192)
|
|
if not data:
|
|
break
|
|
response += data
|
|
if len(response) > MAX_RESPONSE_SIZE:
|
|
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
|
|
|
|
full_response = response.decode().strip()
|
|
|
|
# Split responses - HAProxy separates responses with empty lines
|
|
# For commands that return nothing, we get empty strings
|
|
responses = full_response.split("\n\n") if full_response else [""] * len(commands)
|
|
|
|
# If we got fewer responses than commands, pad with empty strings
|
|
while len(responses) < len(commands):
|
|
responses.append("")
|
|
|
|
# Check each response for errors
|
|
for i, resp in enumerate(responses):
|
|
resp = resp.strip()
|
|
_check_response_for_errors(resp)
|
|
responses[i] = resp
|
|
|
|
return responses
|
|
|
|
except socket.timeout:
|
|
raise HaproxyError("Connection timeout")
|
|
except ConnectionRefusedError:
|
|
raise HaproxyError("Connection refused - HAProxy not running?")
|
|
except UnicodeDecodeError:
|
|
raise HaproxyError("Invalid UTF-8 in response")
|
|
except HaproxyError:
|
|
raise
|
|
except Exception as e:
|
|
raise HaproxyError(str(e)) from e
|
|
|
|
|
|
def reload_haproxy() -> tuple[bool, str]:
|
|
"""Validate and reload HAProxy configuration.
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
try:
|
|
validate = subprocess.run(
|
|
["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
|
|
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
|
|
)
|
|
if validate.returncode != 0:
|
|
return False, f"Config validation failed:\n{validate.stderr}"
|
|
|
|
result = subprocess.run(
|
|
["podman", "kill", "--signal", "USR2", HAPROXY_CONTAINER],
|
|
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
|
|
)
|
|
if result.returncode != 0:
|
|
return False, f"Reload failed: {result.stderr}"
|
|
return True, "OK"
|
|
except subprocess.TimeoutExpired:
|
|
return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds"
|
|
except FileNotFoundError:
|
|
return False, "podman command not found"
|
|
except OSError as e:
|
|
return False, f"OS error: {e}"
|