Files
haproxy-mcp/haproxy_mcp/haproxy_client.py
kaffa 7bee373684 refactor: Modularize MCP server with command batching
- Split monolithic mcp/server.py (1874 lines) into haproxy_mcp/ package:
  - config.py: Configuration constants and environment variables
  - exceptions.py: Custom exception classes
  - validation.py: Input validation functions
  - haproxy_client.py: HAProxy Runtime API client with batch support
  - file_ops.py: Atomic file operations with locking
  - utils.py: CSV parsing utilities
  - tools/: MCP tools organized by function
    - domains.py: Domain management (3 tools)
    - servers.py: Server management (7 tools)
    - health.py: Health checks (3 tools)
    - monitoring.py: Monitoring (4 tools)
    - configuration.py: Config management (4 tools)

- Add haproxy_cmd_batch() for sending multiple commands in single TCP connection
- Optimize server operations: 1 connection instead of 2 per server
- Optimize startup restore: All servers in 1 connection (was 2×N)
- Update type hints to Python 3.9+ style (built-in generics)
- Remove unused imports and functions
- Update CLAUDE.md with new structure and performance notes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 03:50:42 +00:00

216 lines
6.9 KiB
Python

"""HAProxy Runtime API client functions."""
import socket
import subprocess
import select
import time
from .config import (
HAPROXY_SOCKET,
HAPROXY_CONTAINER,
SOCKET_TIMEOUT,
SOCKET_RECV_TIMEOUT,
MAX_RESPONSE_SIZE,
SUBPROCESS_TIMEOUT,
)
from .exceptions import HaproxyError
def haproxy_cmd(command: str) -> str:
"""Send command to HAProxy Runtime API.
Args:
command: The HAProxy runtime API command to execute
Returns:
The response from HAProxy
Raises:
HaproxyError: If connection fails, times out, or response exceeds size limit
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(SOCKET_TIMEOUT)
s.connect(HAPROXY_SOCKET)
s.sendall(f"{command}\n".encode())
s.shutdown(socket.SHUT_WR)
# Set socket to non-blocking for select-based recv loop
s.setblocking(False)
response = b""
start_time = time.time()
while True:
# Check for overall timeout
elapsed = time.time() - start_time
if elapsed >= SOCKET_RECV_TIMEOUT:
raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds")
# Wait for data with timeout (remaining time)
remaining = SOCKET_RECV_TIMEOUT - elapsed
ready, _, _ = select.select([s], [], [], min(remaining, 1.0))
if ready:
data = s.recv(8192)
if not data:
break
response += data
if len(response) > MAX_RESPONSE_SIZE:
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
return response.decode().strip()
except socket.timeout:
raise HaproxyError("Connection timeout")
except ConnectionRefusedError:
raise HaproxyError("Connection refused - HAProxy not running?")
except UnicodeDecodeError:
raise HaproxyError("Invalid UTF-8 in response")
except HaproxyError:
raise
except Exception as e:
raise HaproxyError(str(e)) from e
def haproxy_cmd_checked(command: str) -> str:
"""Send command to HAProxy and raise on error response.
Args:
command: HAProxy command to execute
Returns:
Command response
Raises:
HaproxyError: If HAProxy returns an error message
"""
result = haproxy_cmd(command)
_check_response_for_errors(result)
return result
def _check_response_for_errors(response: str) -> None:
"""Check HAProxy response for error indicators.
Args:
response: Response string from HAProxy
Raises:
HaproxyError: If response contains error indicators
"""
error_indicators = ["No such", "not found", "error", "failed", "invalid", "unknown"]
if response:
response_lower = response.lower()
for indicator in error_indicators:
if indicator.lower() in response_lower:
raise HaproxyError(f"HAProxy command failed: {response.strip()}")
def haproxy_cmd_batch(commands: list[str]) -> list[str]:
"""Send multiple commands to HAProxy in a single connection.
This is more efficient than multiple haproxy_cmd calls as it reuses
the same TCP connection for all commands.
Args:
commands: List of HAProxy commands to execute
Returns:
List of responses for each command
Raises:
HaproxyError: If connection fails or any command returns an error
"""
if not commands:
return []
if len(commands) == 1:
return [haproxy_cmd_checked(commands[0])]
# Send all commands separated by newlines
combined = "\n".join(commands)
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(SOCKET_TIMEOUT)
s.connect(HAPROXY_SOCKET)
s.sendall(f"{combined}\n".encode())
s.shutdown(socket.SHUT_WR)
# Set socket to non-blocking for select-based recv loop
s.setblocking(False)
response = b""
start_time = time.time()
while True:
elapsed = time.time() - start_time
if elapsed >= SOCKET_RECV_TIMEOUT:
raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds")
remaining = SOCKET_RECV_TIMEOUT - elapsed
ready, _, _ = select.select([s], [], [], min(remaining, 1.0))
if ready:
data = s.recv(8192)
if not data:
break
response += data
if len(response) > MAX_RESPONSE_SIZE:
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
full_response = response.decode().strip()
# Split responses - HAProxy separates responses with empty lines
# For commands that return nothing, we get empty strings
responses = full_response.split("\n\n") if full_response else [""] * len(commands)
# If we got fewer responses than commands, pad with empty strings
while len(responses) < len(commands):
responses.append("")
# Check each response for errors
for i, resp in enumerate(responses):
resp = resp.strip()
_check_response_for_errors(resp)
responses[i] = resp
return responses
except socket.timeout:
raise HaproxyError("Connection timeout")
except ConnectionRefusedError:
raise HaproxyError("Connection refused - HAProxy not running?")
except UnicodeDecodeError:
raise HaproxyError("Invalid UTF-8 in response")
except HaproxyError:
raise
except Exception as e:
raise HaproxyError(str(e)) from e
def reload_haproxy() -> tuple[bool, str]:
"""Validate and reload HAProxy configuration.
Returns:
Tuple of (success, message)
"""
try:
validate = subprocess.run(
["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if validate.returncode != 0:
return False, f"Config validation failed:\n{validate.stderr}"
result = subprocess.run(
["podman", "kill", "--signal", "USR2", HAPROXY_CONTAINER],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return False, f"Reload failed: {result.stderr}"
return True, "OK"
except subprocess.TimeoutExpired:
return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return False, "podman command not found"
except OSError as e:
return False, f"OS error: {e}"