refactor: Modularize MCP server with command batching

- Split monolithic mcp/server.py (1874 lines) into haproxy_mcp/ package:
  - config.py: Configuration constants and environment variables
  - exceptions.py: Custom exception classes
  - validation.py: Input validation functions
  - haproxy_client.py: HAProxy Runtime API client with batch support
  - file_ops.py: Atomic file operations with locking
  - utils.py: CSV parsing utilities
  - tools/: MCP tools organized by function
    - domains.py: Domain management (3 tools)
    - servers.py: Server management (7 tools)
    - health.py: Health checks (3 tools)
    - monitoring.py: Monitoring (4 tools)
    - configuration.py: Config management (4 tools)

- Add haproxy_cmd_batch() for sending multiple commands in single TCP connection
- Optimize server operations: 1 connection instead of 2 per server
- Optimize startup restore: All servers in 1 connection (was 2×N)
- Update type hints to Python 3.9+ style (built-in generics)
- Remove unused imports and functions
- Update CLAUDE.md with new structure and performance notes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
kaffa
2026-02-02 03:50:42 +00:00
parent a3d5d61454
commit 7bee373684
19 changed files with 2035 additions and 1876 deletions

View File

@@ -319,6 +319,12 @@ api.example.com → pool_6
- **Input validation**: Domain format, IP (v4/v6), port range, slot limits - **Input validation**: Domain format, IP (v4/v6), port range, slot limits
- **Bulk limits**: Max 10 servers per bulk add, 10KB JSON size limit - **Bulk limits**: Max 10 servers per bulk add, 10KB JSON size limit
### Performance Optimization
- **Command batching**: Multiple HAProxy commands sent in single TCP connection
- Server config (addr + state): 1 connection instead of 2
- Startup restore: All servers restored in 1 connection (was 2×N for N servers)
- Example: 7 servers restored = 1 connection (was 14 connections)
## HAProxy Runtime API ## HAProxy Runtime API
```bash ```bash
@@ -345,8 +351,20 @@ echo "set server pool_1/pool_1_1 state ready" | nc localhost 9999
``` ```
/opt/haproxy/ /opt/haproxy/
├── mcp/ # MCP server (streamable-http) ├── haproxy_mcp/ # MCP server package (streamable-http)
── server.py # Main MCP server (~1700 lines, 22 tools) ── server.py # Main entry point
│ ├── config.py # Configuration and constants
│ ├── exceptions.py # Exception classes
│ ├── validation.py # Input validation
│ ├── haproxy_client.py # HAProxy Runtime API client
│ ├── file_ops.py # File I/O operations
│ ├── utils.py # Parsing utilities
│ └── tools/ # MCP tools (22 total)
│ ├── domains.py # Domain management (3 tools)
│ ├── servers.py # Server management (7 tools)
│ ├── health.py # Health checks (3 tools)
│ ├── monitoring.py # Monitoring (4 tools)
│ └── configuration.py # Config management (4 tools)
├── conf/ ├── conf/
│ ├── haproxy.cfg # Main HAProxy config (100 pool backends) │ ├── haproxy.cfg # Main HAProxy config (100 pool backends)
│ ├── domains.map # Domain → Pool mapping │ ├── domains.map # Domain → Pool mapping

1
haproxy_mcp/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""HAProxy MCP Server Package."""

8
haproxy_mcp/__main__.py Normal file
View File

@@ -0,0 +1,8 @@
"""Entry point for running haproxy_mcp as a module."""
from .server import mcp
from .tools.configuration import startup_restore
if __name__ == "__main__":
startup_restore()
mcp.run(transport="streamable-http")

81
haproxy_mcp/config.py Normal file
View File

@@ -0,0 +1,81 @@
"""Configuration constants and environment variables for HAProxy MCP Server."""
import os
import re
import logging
# Configure structured logging
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("haproxy_mcp")
# MCP Server configuration
MCP_HOST: str = os.getenv("MCP_HOST", "0.0.0.0")
MCP_PORT: int = int(os.getenv("MCP_PORT", "8000"))
# HAProxy Runtime API configuration
HAPROXY_HOST: str = os.getenv("HAPROXY_HOST", "localhost")
HAPROXY_PORT: int = int(os.getenv("HAPROXY_PORT", "9999"))
HAPROXY_SOCKET: tuple[str, int] = (HAPROXY_HOST, HAPROXY_PORT)
# File paths (configurable via environment)
STATE_FILE: str = os.getenv("HAPROXY_STATE_FILE", "/opt/haproxy/data/servers.state")
MAP_FILE: str = os.getenv("HAPROXY_MAP_FILE", "/opt/haproxy/conf/domains.map")
MAP_FILE_CONTAINER: str = os.getenv("HAPROXY_MAP_FILE_CONTAINER", "/usr/local/etc/haproxy/domains.map")
SERVERS_FILE: str = os.getenv("HAPROXY_SERVERS_FILE", "/opt/haproxy/conf/servers.json")
# Pool configuration
POOL_COUNT: int = int(os.getenv("HAPROXY_POOL_COUNT", "100"))
MAX_SLOTS: int = int(os.getenv("HAPROXY_MAX_SLOTS", "10"))
# Container configuration
HAPROXY_CONTAINER: str = os.getenv("HAPROXY_CONTAINER", "haproxy")
# Validation patterns - compiled once for performance
DOMAIN_PATTERN = re.compile(
r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?'
r'(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
)
# Backend/server names: alphanumeric, underscore, hyphen only
BACKEND_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_-]+$')
# Pattern for converting domain to backend name
NON_ALNUM_PATTERN = re.compile(r'[^a-zA-Z0-9]')
# Limits
MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB max response from HAProxy
SUBPROCESS_TIMEOUT = 30 # seconds
STARTUP_RETRY_COUNT = 10 # HAProxy ready check retries
STATE_MIN_COLUMNS = 19 # Minimum columns in HAProxy server state output
SOCKET_TIMEOUT = 5 # seconds for HAProxy socket connection
SOCKET_RECV_TIMEOUT = 30 # seconds for HAProxy socket recv loop
MAX_BULK_SERVERS = 10 # Max servers per bulk add call
MAX_SERVERS_JSON_SIZE = 10000 # Max size of servers JSON in haproxy_add_servers
# CSV field indices for HAProxy stats (show stat command)
class StatField:
"""HAProxy CSV stat field indices."""
PXNAME = 0 # Proxy name (frontend/backend)
SVNAME = 1 # Server name (or FRONTEND/BACKEND)
SCUR = 4 # Current sessions
SMAX = 6 # Max sessions
STATUS = 17 # Status (UP/DOWN/MAINT/etc)
WEIGHT = 18 # Server weight
CHECK_STATUS = 36 # Check status
# Field indices for HAProxy server state (show servers state command)
class StateField:
"""HAProxy server state field indices."""
BE_ID = 0 # Backend ID
BE_NAME = 1 # Backend name
SRV_ID = 2 # Server ID
SRV_NAME = 3 # Server name
SRV_ADDR = 4 # Server address
SRV_OP_STATE = 5 # Operational state
SRV_ADMIN_STATE = 6 # Admin state
SRV_PORT = 18 # Server port

11
haproxy_mcp/exceptions.py Normal file
View File

@@ -0,0 +1,11 @@
"""Exception classes for HAProxy MCP Server."""
class HaproxyError(Exception):
"""HAProxy operation error."""
pass
class NoAvailablePoolError(HaproxyError):
"""All pool backends are in use."""
pass

262
haproxy_mcp/file_ops.py Normal file
View File

@@ -0,0 +1,262 @@
"""File I/O operations for HAProxy MCP Server."""
import fcntl
import json
import os
import tempfile
from typing import Any, Optional
from .config import (
MAP_FILE,
SERVERS_FILE,
logger,
)
from .validation import domain_to_backend
def atomic_write_file(file_path: str, content: str) -> None:
"""Write content to file atomically using temp file + rename.
Args:
file_path: Target file path
content: Content to write
Raises:
IOError: If write fails
"""
dir_path = os.path.dirname(file_path)
fd = None
temp_path = None
try:
fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.tmp.')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None # fd is now owned by the file object
f.write(content)
os.rename(temp_path, file_path)
temp_path = None # Rename succeeded
except OSError as e:
raise IOError(f"Failed to write {file_path}: {e}") from e
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if temp_path is not None:
try:
os.unlink(temp_path)
except OSError:
pass
def get_map_contents() -> list[tuple[str, str]]:
"""Read domains.map file and return list of (domain, backend) tuples.
Returns:
List of (domain, backend) tuples from the map file
"""
entries = []
try:
with open(MAP_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2:
entries.append((parts[0], parts[1]))
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
pass
return entries
def save_map_file(entries: list[tuple[str, str]]) -> None:
"""Save entries to domains.map file atomically.
Uses temp file + rename for atomic write to prevent race conditions.
Args:
entries: List of (domain, backend) tuples to write
Raises:
IOError: If the file cannot be written
"""
lines = [
"# Domain to Backend mapping\n",
"# Format: domain backend_name\n",
"# Wildcard: .domain.com matches *.domain.com\n\n",
]
for domain, backend in entries:
lines.append(f"{domain} {backend}\n")
atomic_write_file(MAP_FILE, "".join(lines))
def get_domain_backend(domain: str) -> Optional[str]:
"""Look up the backend for a domain from domains.map.
Args:
domain: The domain to look up
Returns:
Backend name if found, None otherwise
"""
for map_domain, backend in get_map_contents():
if map_domain == domain:
return backend
return None
def is_legacy_backend(backend: str) -> bool:
"""Check if backend is a legacy static backend (not a pool).
Args:
backend: Backend name to check
Returns:
True if this is a legacy backend, False if it's a pool
"""
return not backend.startswith("pool_")
def get_legacy_backend_name(domain: str) -> str:
"""Convert domain to legacy backend name format.
Args:
domain: Domain name
Returns:
Legacy backend name (e.g., 'api_example_com_backend')
"""
return f"{domain_to_backend(domain)}_backend"
def get_backend_and_prefix(domain: str) -> tuple[str, str]:
"""Look up backend and determine server name prefix for a domain.
Args:
domain: The domain name to look up
Returns:
Tuple of (backend_name, server_prefix)
Raises:
ValueError: If domain cannot be mapped to a valid backend
"""
backend = get_domain_backend(domain)
if not backend:
backend = get_legacy_backend_name(domain)
if backend.startswith("pool_"):
server_prefix = backend
else:
server_prefix = domain_to_backend(domain)
return backend, server_prefix
def load_servers_config() -> dict[str, Any]:
"""Load servers configuration from JSON file with file locking.
Returns:
Dictionary with server configurations
"""
try:
with open(SERVERS_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
logger.debug("File locking not supported for %s", SERVERS_FILE)
try:
return json.load(f)
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
return {}
except json.JSONDecodeError as e:
logger.warning("Corrupt config file %s: %s", SERVERS_FILE, e)
return {}
def save_servers_config(config: dict[str, Any]) -> None:
"""Save servers configuration to JSON file atomically.
Uses temp file + rename for atomic write to prevent race conditions.
Args:
config: Dictionary with server configurations
"""
atomic_write_file(SERVERS_FILE, json.dumps(config, indent=2))
def add_server_to_config(domain: str, slot: int, ip: str, http_port: int) -> None:
"""Add server configuration to persistent storage with file locking.
Args:
domain: Domain name
slot: Server slot (1 to MAX_SLOTS)
ip: Server IP address
http_port: HTTP port
"""
lock_path = f"{SERVERS_FILE}.lock"
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
config = load_servers_config()
if domain not in config:
config[domain] = {}
config[domain][str(slot)] = {"ip": ip, "http_port": http_port}
save_servers_config(config)
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def remove_server_from_config(domain: str, slot: int) -> None:
"""Remove server configuration from persistent storage with file locking.
Args:
domain: Domain name
slot: Server slot to remove
"""
lock_path = f"{SERVERS_FILE}.lock"
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
config = load_servers_config()
if domain in config and str(slot) in config[domain]:
del config[domain][str(slot)]
if not config[domain]:
del config[domain]
save_servers_config(config)
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def remove_domain_from_config(domain: str) -> None:
"""Remove domain from persistent config with file locking.
Args:
domain: Domain name to remove
"""
lock_path = f"{SERVERS_FILE}.lock"
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
config = load_servers_config()
if domain in config:
del config[domain]
save_servers_config(config)
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)

View File

@@ -0,0 +1,215 @@
"""HAProxy Runtime API client functions."""
import socket
import subprocess
import select
import time
from .config import (
HAPROXY_SOCKET,
HAPROXY_CONTAINER,
SOCKET_TIMEOUT,
SOCKET_RECV_TIMEOUT,
MAX_RESPONSE_SIZE,
SUBPROCESS_TIMEOUT,
)
from .exceptions import HaproxyError
def haproxy_cmd(command: str) -> str:
"""Send command to HAProxy Runtime API.
Args:
command: The HAProxy runtime API command to execute
Returns:
The response from HAProxy
Raises:
HaproxyError: If connection fails, times out, or response exceeds size limit
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(SOCKET_TIMEOUT)
s.connect(HAPROXY_SOCKET)
s.sendall(f"{command}\n".encode())
s.shutdown(socket.SHUT_WR)
# Set socket to non-blocking for select-based recv loop
s.setblocking(False)
response = b""
start_time = time.time()
while True:
# Check for overall timeout
elapsed = time.time() - start_time
if elapsed >= SOCKET_RECV_TIMEOUT:
raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds")
# Wait for data with timeout (remaining time)
remaining = SOCKET_RECV_TIMEOUT - elapsed
ready, _, _ = select.select([s], [], [], min(remaining, 1.0))
if ready:
data = s.recv(8192)
if not data:
break
response += data
if len(response) > MAX_RESPONSE_SIZE:
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
return response.decode().strip()
except socket.timeout:
raise HaproxyError("Connection timeout")
except ConnectionRefusedError:
raise HaproxyError("Connection refused - HAProxy not running?")
except UnicodeDecodeError:
raise HaproxyError("Invalid UTF-8 in response")
except HaproxyError:
raise
except Exception as e:
raise HaproxyError(str(e)) from e
def haproxy_cmd_checked(command: str) -> str:
"""Send command to HAProxy and raise on error response.
Args:
command: HAProxy command to execute
Returns:
Command response
Raises:
HaproxyError: If HAProxy returns an error message
"""
result = haproxy_cmd(command)
_check_response_for_errors(result)
return result
def _check_response_for_errors(response: str) -> None:
"""Check HAProxy response for error indicators.
Args:
response: Response string from HAProxy
Raises:
HaproxyError: If response contains error indicators
"""
error_indicators = ["No such", "not found", "error", "failed", "invalid", "unknown"]
if response:
response_lower = response.lower()
for indicator in error_indicators:
if indicator.lower() in response_lower:
raise HaproxyError(f"HAProxy command failed: {response.strip()}")
def haproxy_cmd_batch(commands: list[str]) -> list[str]:
"""Send multiple commands to HAProxy in a single connection.
This is more efficient than multiple haproxy_cmd calls as it reuses
the same TCP connection for all commands.
Args:
commands: List of HAProxy commands to execute
Returns:
List of responses for each command
Raises:
HaproxyError: If connection fails or any command returns an error
"""
if not commands:
return []
if len(commands) == 1:
return [haproxy_cmd_checked(commands[0])]
# Send all commands separated by newlines
combined = "\n".join(commands)
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(SOCKET_TIMEOUT)
s.connect(HAPROXY_SOCKET)
s.sendall(f"{combined}\n".encode())
s.shutdown(socket.SHUT_WR)
# Set socket to non-blocking for select-based recv loop
s.setblocking(False)
response = b""
start_time = time.time()
while True:
elapsed = time.time() - start_time
if elapsed >= SOCKET_RECV_TIMEOUT:
raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds")
remaining = SOCKET_RECV_TIMEOUT - elapsed
ready, _, _ = select.select([s], [], [], min(remaining, 1.0))
if ready:
data = s.recv(8192)
if not data:
break
response += data
if len(response) > MAX_RESPONSE_SIZE:
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
full_response = response.decode().strip()
# Split responses - HAProxy separates responses with empty lines
# For commands that return nothing, we get empty strings
responses = full_response.split("\n\n") if full_response else [""] * len(commands)
# If we got fewer responses than commands, pad with empty strings
while len(responses) < len(commands):
responses.append("")
# Check each response for errors
for i, resp in enumerate(responses):
resp = resp.strip()
_check_response_for_errors(resp)
responses[i] = resp
return responses
except socket.timeout:
raise HaproxyError("Connection timeout")
except ConnectionRefusedError:
raise HaproxyError("Connection refused - HAProxy not running?")
except UnicodeDecodeError:
raise HaproxyError("Invalid UTF-8 in response")
except HaproxyError:
raise
except Exception as e:
raise HaproxyError(str(e)) from e
def reload_haproxy() -> tuple[bool, str]:
"""Validate and reload HAProxy configuration.
Returns:
Tuple of (success, message)
"""
try:
validate = subprocess.run(
["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if validate.returncode != 0:
return False, f"Config validation failed:\n{validate.stderr}"
result = subprocess.run(
["podman", "kill", "--signal", "USR2", HAPROXY_CONTAINER],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return False, f"Reload failed: {result.stderr}"
return True, "OK"
except subprocess.TimeoutExpired:
return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return False, "podman command not found"
except OSError as e:
return False, f"OS error: {e}"

36
haproxy_mcp/server.py Normal file
View File

@@ -0,0 +1,36 @@
#!/usr/bin/env python3
"""HAProxy MCP Server - Direct Runtime API Integration
This module provides an MCP (Model Context Protocol) server for managing HAProxy
configuration and runtime state. It supports dynamic domain/server management
with HTTP backends (SSL termination at HAProxy frontend).
Environment Variables:
MCP_HOST: Host to bind MCP server (default: 0.0.0.0)
MCP_PORT: Port for MCP server (default: 8000)
HAPROXY_HOST: HAProxy Runtime API host (default: localhost)
HAPROXY_PORT: HAProxy Runtime API port (default: 9999)
HAPROXY_STATE_FILE: Path to server state file
HAPROXY_MAP_FILE: Path to domains.map file
HAPROXY_MAP_FILE_CONTAINER: Container path for domains.map
HAPROXY_SERVERS_FILE: Path to servers.json file
HAPROXY_POOL_COUNT: Number of pool backends (default: 100)
HAPROXY_MAX_SLOTS: Max servers per pool (default: 10)
"""
from mcp.server.fastmcp import FastMCP
from .config import MCP_HOST, MCP_PORT
from .tools import register_all_tools
from .tools.configuration import startup_restore
# Initialize MCP Server
mcp = FastMCP("haproxy", host=MCP_HOST, port=MCP_PORT)
# Register all tools
register_all_tools(mcp)
if __name__ == "__main__":
startup_restore()
mcp.run(transport="streamable-http")

View File

@@ -0,0 +1,20 @@
"""MCP Tools for HAProxy management."""
from .domains import register_domain_tools
from .servers import register_server_tools
from .health import register_health_tools
from .monitoring import register_monitoring_tools
from .configuration import register_config_tools
def register_all_tools(mcp):
"""Register all MCP tools with the server.
Args:
mcp: FastMCP server instance
"""
register_domain_tools(mcp)
register_server_tools(mcp)
register_health_tools(mcp)
register_monitoring_tools(mcp)
register_config_tools(mcp)

View File

@@ -0,0 +1,262 @@
"""Configuration management tools for HAProxy MCP Server."""
import fcntl
import subprocess
import time
from ..config import (
STATE_FILE,
HAPROXY_CONTAINER,
SUBPROCESS_TIMEOUT,
STARTUP_RETRY_COUNT,
StateField,
STATE_MIN_COLUMNS,
logger,
)
from ..exceptions import HaproxyError
from ..validation import validate_ip, validate_port, validate_backend_name
from ..haproxy_client import haproxy_cmd, haproxy_cmd_batch, reload_haproxy
from ..file_ops import (
atomic_write_file,
load_servers_config,
get_domain_backend,
get_backend_and_prefix,
)
def restore_servers_from_config() -> int:
"""Restore all servers from configuration file.
Uses batched commands for efficiency - single TCP connection for all servers.
Returns:
Number of servers restored
"""
config = load_servers_config()
if not config:
return 0
# Build batch of all commands
commands: list[str] = []
server_info_list: list[tuple[str, str]] = [] # For logging on failure
for domain, slots in config.items():
backend = get_domain_backend(domain)
if not backend:
continue
try:
_, server_prefix = get_backend_and_prefix(domain)
except ValueError as e:
logger.warning("Invalid domain '%s': %s", domain, e)
continue
for slot_str, server_info in slots.items():
try:
slot = int(slot_str)
except ValueError:
logger.warning("Invalid slot '%s' for %s, skipping", slot_str, domain)
continue
ip = server_info.get("ip", "")
if not ip:
continue
try:
port = int(server_info.get("http_port", 80))
except (ValueError, TypeError):
logger.warning("Invalid port for %s slot %d, skipping", domain, slot)
continue
server = f"{server_prefix}_{slot}"
commands.append(f"set server {backend}/{server} addr {ip} port {port}")
commands.append(f"set server {backend}/{server} state ready")
server_info_list.append((backend, server))
if not commands:
return 0
# Execute all commands in single batch
try:
haproxy_cmd_batch(commands)
return len(server_info_list)
except HaproxyError as e:
logger.warning("Batch restore failed: %s", e)
# Fallback: try individual commands
restored = 0
for i in range(0, len(commands), 2):
try:
haproxy_cmd_batch([commands[i], commands[i + 1]])
restored += 1
except HaproxyError as e2:
backend, server = server_info_list[i // 2]
logger.warning("Failed to restore %s/%s: %s", backend, server, e2)
return restored
def startup_restore() -> None:
"""Restore servers from config file on startup."""
# Wait for HAProxy to be ready
for _ in range(STARTUP_RETRY_COUNT):
try:
haproxy_cmd("show info")
break
except HaproxyError:
time.sleep(1)
else:
logger.warning("HAProxy not ready, skipping restore")
return
try:
count = restore_servers_from_config()
if count > 0:
logger.info("Restored %d servers from config", count)
except (HaproxyError, OSError, ValueError) as e:
logger.warning("Failed to restore servers: %s", e)
def register_config_tools(mcp):
"""Register configuration management tools with MCP server."""
@mcp.tool()
def haproxy_reload() -> str:
"""Reload HAProxy configuration (validates config first).
After reload, automatically restores server configurations from servers.json.
Returns:
Success message with restored server count, or error details if failed
"""
success, msg = reload_haproxy()
if not success:
return msg
# Restore servers from config after reload
try:
restored = restore_servers_from_config()
return f"HAProxy configuration reloaded successfully ({restored} servers restored)"
except Exception as e:
logger.error("Failed to restore servers after reload: %s", e)
return f"HAProxy reloaded but server restore failed: {e}"
@mcp.tool()
def haproxy_check_config() -> str:
"""Validate HAProxy configuration file syntax.
Returns:
Validation result or error details
"""
try:
result = subprocess.run(
["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode == 0:
return "Configuration is valid"
return f"Configuration errors:\n{result.stderr}"
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return "Error: podman command not found"
except OSError as e:
return f"Error: OS error: {e}"
@mcp.tool()
def haproxy_save_state() -> str:
"""Save current server state to disk atomically.
Returns:
Success message or error description
"""
try:
state = haproxy_cmd("show servers state")
atomic_write_file(STATE_FILE, state)
return "Server state saved"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_restore_state() -> str:
"""Restore server state from disk.
Uses batched commands for efficiency.
Returns:
Summary of restored servers or error description
"""
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
state = f.read()
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
# Build batch of all commands
commands: list[str] = []
server_info_list: list[tuple[str, str]] = []
skipped = 0
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and not line.startswith("#"):
backend = parts[StateField.BE_NAME]
server = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
port = parts[StateField.SRV_PORT]
# Skip disabled servers
if addr == "0.0.0.0":
continue
# Validate names from state file to prevent injection
if not validate_backend_name(backend) or not validate_backend_name(server):
skipped += 1
continue
# Validate IP and port
if not validate_ip(addr) or not validate_port(port):
skipped += 1
continue
commands.append(f"set server {backend}/{server} addr {addr} port {port}")
commands.append(f"set server {backend}/{server} state ready")
server_info_list.append((backend, server))
if not commands:
result = "No servers to restore"
if skipped:
result += f", {skipped} entries skipped due to validation"
return result
# Execute all commands in single batch
try:
haproxy_cmd_batch(commands)
restored = len(server_info_list)
except HaproxyError:
# Fallback: try individual pairs
restored = 0
for i in range(0, len(commands), 2):
try:
haproxy_cmd_batch([commands[i], commands[i + 1]])
restored += 1
except HaproxyError as e:
backend, server = server_info_list[i // 2]
logger.warning("Failed to restore %s/%s: %s", backend, server, e)
result = f"Server state restored ({restored} servers)"
if skipped:
result += f", {skipped} entries skipped due to validation"
return result
except FileNotFoundError:
return "Error: No saved state found"
except HaproxyError as e:
return f"Error: {e}"

View File

@@ -0,0 +1,223 @@
"""Domain management tools for HAProxy MCP Server."""
import fcntl
from typing import Annotated
from pydantic import Field
from ..config import (
MAP_FILE,
MAP_FILE_CONTAINER,
POOL_COUNT,
MAX_SLOTS,
StateField,
STATE_MIN_COLUMNS,
logger,
)
from ..exceptions import HaproxyError
from ..validation import validate_domain, validate_ip
from ..haproxy_client import haproxy_cmd
from ..file_ops import (
get_map_contents,
save_map_file,
get_domain_backend,
is_legacy_backend,
add_server_to_config,
remove_server_from_config,
remove_domain_from_config,
)
def register_domain_tools(mcp):
"""Register domain management tools with MCP server."""
@mcp.tool()
def haproxy_list_domains(
include_wildcards: Annotated[bool, Field(default=False, description="Include wildcard entries (.example.com). Default: False")]
) -> str:
"""List all configured domains with their backend servers."""
try:
domains = []
state = haproxy_cmd("show servers state")
# Build server map from HAProxy state
server_map: dict[str, list] = {}
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.SRV_ADDR] != "0.0.0.0":
backend = parts[StateField.BE_NAME]
if backend not in server_map:
server_map[backend] = []
server_map[backend].append(
f"{parts[StateField.SRV_NAME]}={parts[StateField.SRV_ADDR]}:{parts[StateField.SRV_PORT]}"
)
# Read from domains.map
seen_domains: set[str] = set()
for domain, backend in get_map_contents():
# Skip wildcard entries unless explicitly requested
if domain.startswith(".") and not include_wildcards:
continue
if domain in seen_domains:
continue
seen_domains.add(domain)
servers = server_map.get(backend, ["(none)"])
if domain.startswith("."):
backend_type = "wildcard"
elif backend.startswith("pool_"):
backend_type = "pool"
else:
backend_type = "static"
domains.append(f"{domain} -> {backend} ({backend_type}): {', '.join(servers)}")
return "\n".join(domains) if domains else "No domains configured"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_domain(
domain: Annotated[str, Field(description="Domain name to add (e.g., api.example.com, example.com)")],
ip: Annotated[str, Field(default="", description="Optional: Initial server IP. If provided, adds server to slot 1")],
http_port: Annotated[int, Field(default=80, description="HTTP port for backend server (default: 80)")]
) -> str:
"""Add a new domain to HAProxy (no reload required).
Creates domain→pool mapping. Use haproxy_add_server to add more servers later.
Example: haproxy_add_domain("api.example.com", ip="10.0.0.1", http_port=8080)
"""
# Validate inputs
if domain.startswith("."):
return "Error: Domain cannot start with '.' (wildcard entries are added automatically)"
if not validate_domain(domain):
return "Error: Invalid domain format"
if not validate_ip(ip, allow_empty=True):
return "Error: Invalid IP address format"
if not (1 <= http_port <= 65535):
return "Error: Port must be between 1 and 65535"
# Use file locking for the entire pool allocation operation
lock_path = f"{MAP_FILE}.lock"
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
# Read map contents once for both existence check and pool lookup
entries = get_map_contents()
# Check if domain already exists (using cached entries)
for domain_entry, backend in entries:
if domain_entry == domain:
return f"Error: Domain {domain} already exists (mapped to {backend})"
# Find available pool (using cached entries)
used_pools: set[str] = set()
for _, backend in entries:
if backend.startswith("pool_"):
used_pools.add(backend)
pool = None
for i in range(1, POOL_COUNT + 1):
pool_name = f"pool_{i}"
if pool_name not in used_pools:
pool = pool_name
break
if not pool:
return f"Error: All {POOL_COUNT} pool backends are in use"
try:
# Save to disk first (atomic write for persistence)
entries.append((domain, pool))
entries.append((f".{domain}", pool))
try:
save_map_file(entries)
except IOError as e:
return f"Error: Failed to save map file: {e}"
# Then update HAProxy map via Runtime API
try:
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} {domain} {pool}")
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} .{domain} {pool}")
except HaproxyError as e:
# Rollback: remove the domain we just added from entries and re-save
rollback_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"]
try:
save_map_file(rollback_entries)
except IOError:
logger.error("Failed to rollback map file after HAProxy error")
return f"Error: Failed to update HAProxy map: {e}"
# If IP provided, add server to slot 1
if ip:
# Save server config to disk first
add_server_to_config(domain, 1, ip, http_port)
try:
server = f"{pool}_1"
haproxy_cmd(f"set server {pool}/{server} addr {ip} port {http_port}")
haproxy_cmd(f"set server {pool}/{server} state ready")
except HaproxyError as e:
# Rollback server config on failure
remove_server_from_config(domain, 1)
return f"Domain {domain} added to {pool} but server config failed: {e}"
return f"Domain {domain} added to {pool} with server {ip}:{http_port}"
return f"Domain {domain} added to {pool} (no servers configured)"
except HaproxyError as e:
return f"Error: {e}"
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
@mcp.tool()
def haproxy_remove_domain(
domain: Annotated[str, Field(description="Domain name to remove (e.g., api.example.com)")]
) -> str:
"""Remove a domain from HAProxy (no reload required)."""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Look up the domain in the map
backend = get_domain_backend(domain)
if not backend:
return f"Error: Domain {domain} not found"
# Check if this is a legacy backend (not a pool)
if is_legacy_backend(backend):
return f"Error: Cannot remove legacy domain {domain} (uses static backend {backend})"
try:
# Save to disk first (atomic write for persistence)
entries = get_map_contents()
new_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"]
save_map_file(new_entries)
# Remove from persistent server config
remove_domain_from_config(domain)
# Clear map entries via Runtime API (immediate effect)
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} {domain}")
try:
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} .{domain}")
except HaproxyError as e:
logger.warning("Failed to remove wildcard entry for %s: %s", domain, e)
# Disable all servers in the pool (reset to 0.0.0.0:0)
for slot in range(1, MAX_SLOTS + 1):
server = f"{backend}_{slot}"
try:
haproxy_cmd(f"set server {backend}/{server} state maint")
haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0")
except HaproxyError as e:
logger.warning(
"Failed to clear server %s/%s for domain %s: %s",
backend, server, domain, e
)
# Continue with remaining cleanup
return f"Domain {domain} removed from {backend}"
except IOError as e:
return f"Error: Failed to update map file: {e}"
except HaproxyError as e:
return f"Error: {e}"

211
haproxy_mcp/tools/health.py Normal file
View File

@@ -0,0 +1,211 @@
"""Health check tools for HAProxy MCP Server."""
import json
import os
import subprocess
import time
from typing import Annotated, Any
from pydantic import Field
from ..config import (
MAP_FILE,
SERVERS_FILE,
HAPROXY_CONTAINER,
StateField,
STATE_MIN_COLUMNS,
)
from ..exceptions import HaproxyError
from ..validation import validate_domain, validate_backend_name
from ..haproxy_client import haproxy_cmd
from ..file_ops import get_backend_and_prefix
from ..utils import parse_stat_csv
def register_health_tools(mcp):
"""Register health check tools with MCP server."""
@mcp.tool()
def haproxy_health() -> str:
"""Check overall system health (MCP server, HAProxy, config files).
Use this for monitoring integration. Returns "healthy" if all components are OK.
Returns:
JSON with:
- status: "healthy" or "unhealthy"
- components.mcp: MCP server status
- components.haproxy: HAProxy connectivity, version, uptime
- components.config_files: map_file and servers_file accessibility
Example:
# Returns: {"status": "healthy", "components": {"mcp": {"status": "ok"}, ...}}
"""
result: dict[str, Any] = {
"status": "healthy",
"timestamp": time.time(),
"components": {
"mcp": {"status": "ok"},
"haproxy": {"status": "unknown"},
"config_files": {"status": "unknown"}
}
}
# Check HAProxy connectivity
try:
info = haproxy_cmd("show info")
for line in info.split("\n"):
if line.startswith("Version:"):
result["components"]["haproxy"]["version"] = line.split(":", 1)[1].strip()
elif line.startswith("Uptime_sec:"):
result["components"]["haproxy"]["uptime_sec"] = int(line.split(":", 1)[1].strip())
result["components"]["haproxy"]["status"] = "ok"
except HaproxyError as e:
result["components"]["haproxy"]["status"] = "error"
result["components"]["haproxy"]["error"] = str(e)
result["status"] = "degraded"
# Check container status
try:
container_result = subprocess.run(
["podman", "inspect", "--format", "{{.State.Status}}", HAPROXY_CONTAINER],
capture_output=True, text=True, timeout=5
)
if container_result.returncode == 0:
container_status = container_result.stdout.strip()
result["components"]["container"] = {
"status": "ok" if container_status == "running" else container_status,
"state": container_status
}
else:
result["components"]["container"] = {"status": "error", "error": container_result.stderr.strip()}
result["status"] = "unhealthy"
except subprocess.TimeoutExpired:
result["components"]["container"] = {"status": "timeout"}
result["status"] = "unhealthy"
except Exception as e:
result["components"]["container"] = {"status": "error", "error": str(e)}
# Check configuration files
files_ok = True
file_status: dict[str, str] = {}
for name, path in [("map_file", MAP_FILE), ("servers_file", SERVERS_FILE)]:
if os.path.exists(path):
file_status[name] = "ok"
else:
file_status[name] = "missing"
files_ok = False
result["components"]["config_files"]["files"] = file_status
result["components"]["config_files"]["status"] = "ok" if files_ok else "warning"
if not files_ok:
result["status"] = "degraded"
return json.dumps(result, indent=2)
@mcp.tool()
def haproxy_domain_health(
domain: Annotated[str, Field(description="Domain name to check health for (e.g., api.example.com)")]
) -> str:
"""Check health status of backend servers for a specific domain.
Returns JSON with status (healthy/degraded/down/no_servers), server list, and counts.
"""
if not validate_domain(domain):
return json.dumps({"error": "Invalid domain format"})
try:
backend, _ = get_backend_and_prefix(domain)
except ValueError as e:
return json.dumps({"error": str(e)})
result: dict[str, Any] = {
"domain": domain,
"backend": backend,
"status": "unknown",
"servers": [],
"healthy_count": 0,
"total_count": 0
}
try:
# Get server states
state_output = haproxy_cmd("show servers state")
stat_output = haproxy_cmd("show stat")
# Build status map from stat output (has UP/DOWN/MAINT status)
status_map: dict[str, dict[str, str]] = {}
for stat in parse_stat_csv(stat_output):
if stat["pxname"] == backend and stat["svname"] not in ["FRONTEND", "BACKEND"]:
status_map[stat["svname"]] = {
"status": stat["status"],
"check_status": stat["check_status"],
"weight": stat["weight"]
}
# Parse server state for address info
for line in state_output.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
server_name = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
port = parts[StateField.SRV_PORT]
# Skip disabled servers (0.0.0.0)
if addr == "0.0.0.0":
continue
server_info: dict[str, Any] = {
"name": server_name,
"addr": f"{addr}:{port}",
"status": "unknown"
}
# Get status from stat output
if server_name in status_map:
server_info["status"] = status_map[server_name]["status"]
server_info["check_status"] = status_map[server_name]["check_status"]
server_info["weight"] = status_map[server_name]["weight"]
result["servers"].append(server_info)
result["total_count"] += 1
if server_info["status"] == "UP":
result["healthy_count"] += 1
# Determine overall status
if result["total_count"] == 0:
result["status"] = "no_servers"
elif result["healthy_count"] == result["total_count"]:
result["status"] = "healthy"
elif result["healthy_count"] > 0:
result["status"] = "degraded"
else:
result["status"] = "down"
return json.dumps(result, indent=2)
except HaproxyError as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def haproxy_get_server_health(
backend: Annotated[str, Field(default="", description="Optional: Backend name to filter (e.g., 'pool_1'). Empty = all backends")]
) -> str:
"""Get health status of all servers (low-level view). For domain-specific, use haproxy_domain_health."""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
servers = []
for stat in parse_stat_csv(result):
if stat["svname"] not in ["FRONTEND", "BACKEND", ""]:
if backend and stat["pxname"] != backend:
continue
servers.append(
f"{stat['pxname']}/{stat['svname']}: {stat['status']} "
f"(weight: {stat['weight']}, check: {stat['check_status']})"
)
return "\n".join(servers) if servers else "No servers found"
except HaproxyError as e:
return f"Error: {e}"

View File

@@ -0,0 +1,97 @@
"""Monitoring tools for HAProxy MCP Server."""
from typing import Annotated
from pydantic import Field
from ..exceptions import HaproxyError
from ..validation import validate_backend_name
from ..haproxy_client import haproxy_cmd
from ..utils import parse_stat_csv
def register_monitoring_tools(mcp):
"""Register monitoring tools with MCP server."""
@mcp.tool()
def haproxy_stats() -> str:
"""Get HAProxy status and statistics.
Returns:
Key HAProxy metrics (name, version, uptime, connections, etc.)
"""
try:
result = haproxy_cmd("show info")
stats = {}
for line in result.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
stats[key.strip()] = value.strip()
important = ["Name", "Version", "Uptime_sec", "CurrConns", "MaxConn", "Run_queue", "Tasks"]
output = []
for key in important:
if key in stats:
output.append(f"{key}: {stats[key]}")
return "\n".join(output) if output else result
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_backends() -> str:
"""List all HAProxy backends.
Returns:
List of all configured backend names
"""
try:
result = haproxy_cmd("show backend")
backends = [line for line in result.split("\n") if line and not line.startswith("#")]
return "Backends:\n" + "\n".join(f"{b}" for b in backends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_list_frontends() -> str:
"""List all HAProxy frontends with their status.
Returns:
List of frontends with status and session counts
"""
try:
result = haproxy_cmd("show stat")
frontends = []
for stat in parse_stat_csv(result):
if stat["svname"] == "FRONTEND":
frontends.append(
f"{stat['pxname']}: {stat['status']} (sessions: {stat['scur']})"
)
if not frontends:
return "No frontends found"
return "Frontends:\n" + "\n".join(frontends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_connections(
backend: Annotated[str, Field(default="", description="Optional: Backend name to filter (e.g., 'pool_1'). Empty = all backends")]
) -> str:
"""Get active connection counts per server for monitoring traffic distribution."""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
connections = []
for stat in parse_stat_csv(result):
if backend and stat["pxname"] != backend:
continue
if stat["svname"] in ["FRONTEND", "BACKEND"]:
connections.append(
f"{stat['pxname']} ({stat['svname']}): {stat['scur']} current, {stat['smax']} max"
)
elif stat["svname"]:
connections.append(f" - {stat['svname']}: {stat['scur']} connections")
return "\n".join(connections) if connections else "No connection data"
except HaproxyError as e:
return f"Error: {e}"

View File

@@ -0,0 +1,475 @@
"""Server management tools for HAProxy MCP Server."""
import json
import time
from typing import Annotated
from pydantic import Field
from ..config import (
MAX_SLOTS,
MAX_BULK_SERVERS,
MAX_SERVERS_JSON_SIZE,
StateField,
StatField,
STATE_MIN_COLUMNS,
)
from ..exceptions import HaproxyError
from ..validation import validate_domain, validate_ip, validate_backend_name
from ..haproxy_client import haproxy_cmd, haproxy_cmd_checked, haproxy_cmd_batch
from ..file_ops import (
get_backend_and_prefix,
load_servers_config,
add_server_to_config,
remove_server_from_config,
)
def configure_server_slot(backend: str, server_prefix: str, slot: int, ip: str, http_port: int) -> str:
"""Configure a server slot in HAProxy.
Args:
backend: Backend name (e.g., 'pool_1')
server_prefix: Server name prefix (e.g., 'pool_1')
slot: Slot number (1-10)
ip: Server IP address
http_port: HTTP port
Returns:
Server name that was configured
Raises:
HaproxyError: If HAProxy command fails
"""
server = f"{server_prefix}_{slot}"
# Batch both commands in single TCP connection
haproxy_cmd_batch([
f"set server {backend}/{server} addr {ip} port {http_port}",
f"set server {backend}/{server} state ready"
])
return server
def register_server_tools(mcp):
"""Register server management tools with MCP server."""
@mcp.tool()
def haproxy_list_servers(
domain: Annotated[str, Field(description="Domain name to list servers for (e.g., api.example.com)")]
) -> str:
"""List all servers for a domain with slot numbers, addresses, and status (UP/DOWN/MAINT).
Example:
haproxy_list_servers("api.example.com")
# Output: pool_1_1: 10.0.0.1:8080 (UP)
# pool_1_2: 10.0.0.2:8080 (UP)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
try:
backend, _ = get_backend_and_prefix(domain)
servers = []
state = haproxy_cmd("show servers state")
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
addr = parts[StateField.SRV_ADDR]
status = "active" if addr != "0.0.0.0" else "disabled"
servers.append(
f"{parts[StateField.SRV_NAME]}: {addr}:{parts[StateField.SRV_PORT]} ({status})"
)
if not servers:
return f"Backend {backend} not found"
return f"Servers for {domain} ({backend}):\n" + "\n".join(servers)
except (HaproxyError, ValueError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_server(
domain: Annotated[str, Field(description="Domain name to add server to (e.g., api.example.com)")],
slot: Annotated[int, Field(description="Server slot number 1-10, or 0 for auto-select next available slot")],
ip: Annotated[str, Field(description="Server IP address (IPv4 like 10.0.0.1 or IPv6 like 2001:db8::1)")],
http_port: Annotated[int, Field(default=80, description="HTTP port for backend connection (default: 80)")]
) -> str:
"""Add a server to a domain's backend pool for load balancing.
Each domain can have up to 10 servers (slots 1-10). HAProxy distributes traffic
across all configured servers using round-robin.
Example: haproxy_add_server("api.example.com", slot=1, ip="10.0.0.1", http_port=8080)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not ip:
return "Error: IP address is required"
if not validate_ip(ip):
return "Error: Invalid IP address format"
if not (1 <= http_port <= 65535):
return "Error: Port must be between 1 and 65535"
try:
backend, server_prefix = get_backend_and_prefix(domain)
# Auto-select slot if slot <= 0
if slot <= 0:
state = haproxy_cmd("show servers state")
used_slots: set[int] = set()
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
if parts[StateField.SRV_ADDR] != "0.0.0.0":
# Extract slot number from server name (e.g., pool_1_3 -> 3)
server_name = parts[StateField.SRV_NAME]
try:
used_slots.add(int(server_name.rsplit("_", 1)[1]))
except (ValueError, IndexError):
pass
for s in range(1, MAX_SLOTS + 1):
if s not in used_slots:
slot = s
break
else:
return f"Error: No available slots (all {MAX_SLOTS} slots in use)"
elif not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}, or 0/-1 for auto-select"
# Save to persistent config FIRST (disk-first pattern)
add_server_to_config(domain, slot, ip, http_port)
try:
server = configure_server_slot(backend, server_prefix, slot, ip, http_port)
return f"Added to {domain} ({backend}) slot {slot}:\n{server}{ip}:{http_port}"
except HaproxyError as e:
# Rollback config on HAProxy failure
remove_server_from_config(domain, slot)
return f"Error: {e}"
except (ValueError, IOError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_servers(
domain: Annotated[str, Field(description="Domain name to add servers to (e.g., api.example.com)")],
servers: Annotated[str, Field(description='JSON array of servers. Each object: {"slot": 1-10, "ip": "10.0.0.1", "http_port": 80}. Example: \'[{"slot":1,"ip":"10.0.0.1"},{"slot":2,"ip":"10.0.0.2"}]\'')]
) -> str:
"""Add multiple servers to a domain's backend at once (bulk operation).
Example: haproxy_add_servers("api.example.com", '[{"slot":1,"ip":"10.0.0.1"},{"slot":2,"ip":"10.0.0.2"}]')
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Check JSON size before parsing
if len(servers) > MAX_SERVERS_JSON_SIZE:
return f"Error: servers JSON exceeds maximum size ({MAX_SERVERS_JSON_SIZE} bytes)"
# Parse JSON array
try:
server_list = json.loads(servers)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON - {e}"
if not isinstance(server_list, list):
return "Error: servers must be a JSON array"
if not server_list:
return "Error: servers array is empty"
if len(server_list) > MAX_BULK_SERVERS:
return f"Error: Cannot add more than {MAX_BULK_SERVERS} servers at once"
# Validate all servers first before adding any
validated_servers = []
validation_errors = []
for i, srv in enumerate(server_list):
if not isinstance(srv, dict):
validation_errors.append(f"Server {i+1}: must be an object")
continue
# Extract and validate slot
slot = srv.get("slot")
if slot is None:
validation_errors.append(f"Server {i+1}: missing 'slot' field")
continue
try:
slot = int(slot)
except (ValueError, TypeError):
validation_errors.append(f"Server {i+1}: slot must be an integer")
continue
if not (1 <= slot <= MAX_SLOTS):
validation_errors.append(f"Server {i+1}: slot must be between 1 and {MAX_SLOTS}")
continue
# Extract and validate IP
ip = srv.get("ip")
if not ip:
validation_errors.append(f"Server {i+1}: missing 'ip' field")
continue
if not validate_ip(ip):
validation_errors.append(f"Server {i+1}: invalid IP address '{ip}'")
continue
# Extract and validate port
http_port = srv.get("http_port", 80)
try:
http_port = int(http_port)
except (ValueError, TypeError):
validation_errors.append(f"Server {i+1}: http_port must be an integer")
continue
if not (1 <= http_port <= 65535):
validation_errors.append(f"Server {i+1}: port must be between 1 and 65535")
continue
validated_servers.append({"slot": slot, "ip": ip, "http_port": http_port})
# Return validation errors if any
if validation_errors:
return "Validation errors:\n" + "\n".join(f"{e}" for e in validation_errors)
# Check for duplicate slots
slots = [s["slot"] for s in validated_servers]
if len(slots) != len(set(slots)):
return "Error: Duplicate slot numbers in servers array"
# Get backend info
try:
backend, server_prefix = get_backend_and_prefix(domain)
except ValueError as e:
return f"Error: {e}"
# Save ALL servers to config FIRST (disk-first pattern)
for server_config in validated_servers:
slot = server_config["slot"]
ip = server_config["ip"]
http_port = server_config["http_port"]
add_server_to_config(domain, slot, ip, http_port)
# Then update HAProxy
added = []
errors = []
failed_slots = []
try:
for server_config in validated_servers:
slot = server_config["slot"]
ip = server_config["ip"]
http_port = server_config["http_port"]
try:
configure_server_slot(backend, server_prefix, slot, ip, http_port)
added.append(f"slot {slot}: {ip}:{http_port}")
except HaproxyError as e:
failed_slots.append(slot)
errors.append(f"slot {slot}: {e}")
except Exception as e:
# Rollback all saved configs on unexpected error
for server_config in validated_servers:
remove_server_from_config(domain, server_config["slot"])
return f"Error: {e}"
# Rollback failed slots from config
for slot in failed_slots:
remove_server_from_config(domain, slot)
# Build result message
result_parts = []
if added:
result_parts.append(f"Added {len(added)} servers to {domain} ({backend}):")
result_parts.extend(f"{s}" for s in added)
if errors:
result_parts.append(f"Failed to add {len(errors)} servers:")
result_parts.extend(f"{e}" for e in errors)
return "\n".join(result_parts) if result_parts else "No servers added"
@mcp.tool()
def haproxy_remove_server(
domain: Annotated[str, Field(description="Domain name to remove server from (e.g., api.example.com)")],
slot: Annotated[int, Field(description="Server slot number to remove (1-10)")]
) -> str:
"""Remove a server from a domain's backend at specified slot.
Example: haproxy_remove_server("api.example.com", slot=2)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}"
try:
backend, server_prefix = get_backend_and_prefix(domain)
# Get current server info for potential rollback
config = load_servers_config()
old_config = config.get(domain, {}).get(str(slot), {})
# Remove from persistent config FIRST (disk-first pattern)
remove_server_from_config(domain, slot)
try:
# HTTP only - single server per slot
server = f"{server_prefix}_{slot}"
# Batch both commands in single TCP connection
haproxy_cmd_batch([
f"set server {backend}/{server} state maint",
f"set server {backend}/{server} addr 0.0.0.0 port 0"
])
return f"Removed server at slot {slot} from {domain} ({backend})"
except HaproxyError as e:
# Rollback: re-add config if HAProxy command failed
if old_config:
add_server_to_config(domain, slot, old_config.get("ip", ""), old_config.get("http_port", 80))
return f"Error: {e}"
except (ValueError, IOError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_domain_state(
domain: Annotated[str, Field(description="Domain name (e.g., api.example.com)")],
state: Annotated[str, Field(description="Target state: 'ready' (normal), 'drain' (stop new connections), or 'maint' (maintenance)")]
) -> str:
"""Set state for all servers of a domain at once.
Example: haproxy_set_domain_state("api.example.com", state="drain")
Example:
# Put all servers in maintenance for deployment
haproxy_set_domain_state("api.example.com", "maint")
# Re-enable all servers after deployment
haproxy_set_domain_state("api.example.com", "ready")
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if state not in ["ready", "drain", "maint"]:
return "Error: State must be 'ready', 'drain', or 'maint'"
try:
backend, _ = get_backend_and_prefix(domain)
except ValueError as e:
return f"Error: {e}"
# Get active servers for this domain
try:
servers_state = haproxy_cmd("show servers state")
except HaproxyError as e:
return f"Error: {e}"
changed = []
errors = []
for line in servers_state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
server_name = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
# Only change state for configured servers (not 0.0.0.0)
if addr != "0.0.0.0":
try:
haproxy_cmd_checked(f"set server {backend}/{server_name} state {state}")
changed.append(server_name)
except HaproxyError as e:
errors.append(f"{server_name}: {e}")
if not changed and not errors:
return f"No active servers found for {domain}"
result = f"Set {len(changed)} servers to '{state}' for {domain}"
if changed:
result += ":\n" + "\n".join(f"{s}" for s in changed)
if errors:
result += f"\n\nErrors ({len(errors)}):\n" + "\n".join(f"{e}" for e in errors)
return result
@mcp.tool()
def haproxy_wait_drain(
domain: Annotated[str, Field(description="Domain name to wait for (e.g., api.example.com)")],
timeout: Annotated[int, Field(default=30, description="Maximum seconds to wait (default: 30, max: 300)")]
) -> str:
"""Wait for all active connections to drain from a domain's servers.
Use after setting servers to 'drain' state before maintenance.
Example: haproxy_wait_drain("api.example.com", timeout=60)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not (1 <= timeout <= 300):
return "Error: Timeout must be between 1 and 300 seconds"
try:
backend, _ = get_backend_and_prefix(domain)
except ValueError as e:
return f"Error: {e}"
start_time = time.time()
while time.time() - start_time < timeout:
try:
stats = haproxy_cmd("show stat")
total_connections = 0
for line in stats.split("\n"):
parts = line.split(",")
if len(parts) > StatField.SCUR and parts[0] == backend and parts[1] not in ["FRONTEND", "BACKEND", ""]:
try:
scur = int(parts[StatField.SCUR]) if parts[StatField.SCUR] else 0
total_connections += scur
except ValueError:
pass
if total_connections == 0:
elapsed = int(time.time() - start_time)
return f"All connections drained for {domain} ({elapsed}s)"
time.sleep(1)
except HaproxyError as e:
return f"Error checking connections: {e}"
return f"Timeout: Connections still active after {timeout}s"
@mcp.tool()
def haproxy_set_server_state(
backend: Annotated[str, Field(description="Backend name (e.g., 'pool_1')")],
server: Annotated[str, Field(description="Server name (e.g., 'pool_1_1')")],
state: Annotated[str, Field(description="'ready' (enable), 'drain' (graceful shutdown), or 'maint' (maintenance)")]
) -> str:
"""Set server state for maintenance or traffic control.
Example: haproxy_set_server_state("pool_1", "pool_1_2", "maint")
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if state not in ["ready", "drain", "maint"]:
return "Error: state must be 'ready', 'drain', or 'maint'"
try:
haproxy_cmd_checked(f"set server {backend}/{server} state {state}")
return f"Server {backend}/{server} set to {state}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_weight(
backend: Annotated[str, Field(description="Backend name (e.g., 'pool_1')")],
server: Annotated[str, Field(description="Server name (e.g., 'pool_1_1')")],
weight: Annotated[int, Field(description="Weight 0-256 (higher = more traffic, 0 = disabled)")]
) -> str:
"""Set server weight for load balancing ratio control.
Example: haproxy_set_server_weight("pool_1", "pool_1_1", weight=2)
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if not (0 <= weight <= 256):
return "Error: weight must be between 0 and 256"
try:
haproxy_cmd_checked(f"set server {backend}/{server} weight {weight}")
return f"Server {backend}/{server} weight set to {weight}"
except HaproxyError as e:
return f"Error: {e}"

29
haproxy_mcp/utils.py Normal file
View File

@@ -0,0 +1,29 @@
"""Utility functions for HAProxy MCP Server."""
from typing import Dict, Generator
from .config import StatField
def parse_stat_csv(stat_output: str) -> Generator[Dict[str, str], None, None]:
"""Parse HAProxy stat CSV output into structured data.
Args:
stat_output: Raw output from 'show stat' command
Yields:
Dictionaries with parsed stat fields for each row
"""
for line in stat_output.split("\n"):
if not line or line.startswith("#"):
continue
parts = line.split(",")
if len(parts) > StatField.STATUS:
yield {
"pxname": parts[StatField.PXNAME],
"svname": parts[StatField.SVNAME],
"scur": parts[StatField.SCUR] if len(parts) > StatField.SCUR else "0",
"smax": parts[StatField.SMAX] if len(parts) > StatField.SMAX else "0",
"status": parts[StatField.STATUS],
"weight": parts[StatField.WEIGHT] if len(parts) > StatField.WEIGHT else "0",
"check_status": parts[StatField.CHECK_STATUS] if len(parts) > StatField.CHECK_STATUS else "",
}

View File

84
haproxy_mcp/validation.py Normal file
View File

@@ -0,0 +1,84 @@
"""Input validation functions for HAProxy MCP Server."""
import ipaddress
from .config import DOMAIN_PATTERN, BACKEND_NAME_PATTERN, NON_ALNUM_PATTERN
def validate_domain(domain: str) -> bool:
"""Validate domain format.
Args:
domain: The domain name to validate
Returns:
True if domain is valid, False otherwise
"""
if not domain or len(domain) > 253:
return False
return bool(DOMAIN_PATTERN.match(domain))
def validate_ip(ip: str, allow_empty: bool = False) -> bool:
"""Validate IPv4 or IPv6 address format.
Args:
ip: The IP address to validate
allow_empty: If True, empty string is considered valid
Returns:
True if IP is valid, False otherwise
"""
if not ip:
return allow_empty
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def validate_port(port: str) -> bool:
"""Validate port number is in valid range.
Args:
port: Port number as string
Returns:
True if port is valid (1-65535), False otherwise
"""
if not port or not port.isdigit():
return False
port_num = int(port)
return 1 <= port_num <= 65535
def validate_backend_name(name: str) -> bool:
"""Validate backend or server name to prevent command injection.
Args:
name: The backend or server name to validate
Returns:
True if name contains only safe characters
"""
if not name or len(name) > 255:
return False
return bool(BACKEND_NAME_PATTERN.match(name))
def domain_to_backend(domain: str) -> str:
"""Convert domain to backend name (alphanumeric + underscore only).
Args:
domain: The domain name to convert
Returns:
Backend name with non-alphanumeric characters replaced by underscores
Raises:
ValueError: If resulting name is invalid
"""
result = NON_ALNUM_PATTERN.sub('_', domain)
if not validate_backend_name(result):
raise ValueError(f"Invalid backend name after conversion: {result}")
return result

File diff suppressed because it is too large Load Diff