Files
haproxy-mcp/mcp/server.py
root 432154c850 Initial commit: HAProxy MCP Server
- Zero-reload domain management with map-based routing
- 100 pool backends with 10 server slots each
- Runtime API integration for dynamic configuration
- Auto-restore servers from persistent config on startup
- 17 MCP tools for domain/server management

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 11:37:06 +00:00

1138 lines
37 KiB
Python

#!/usr/bin/env python3
"""HAProxy MCP Server - Direct Runtime API Integration
This module provides an MCP (Model Context Protocol) server for managing HAProxy
configuration and runtime state. It supports dynamic domain/server management
with HTTP, HTTPS, and HTTP/3 (QUIC) protocols.
"""
import socket
import subprocess
import re
import json
import sys
import time
import fcntl
from typing import Dict, Generator, List, Optional, Set, Tuple
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("haproxy", host="0.0.0.0", port=8000)
# Constants
HAPROXY_SOCKET: Tuple[str, int] = ("localhost", 9999)
STATE_FILE: str = "/opt/haproxy/data/servers.state"
MAP_FILE: str = "/opt/haproxy/conf/domains.map"
MAP_FILE_CONTAINER: str = "/usr/local/etc/haproxy/domains.map"
SERVERS_FILE: str = "/opt/haproxy/conf/servers.json"
POOL_COUNT: int = 100
MAX_SLOTS: int = 10
# Validation patterns - compiled once for performance
DOMAIN_PATTERN = re.compile(
r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?'
r'(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
)
IP_PATTERN = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
# Backend/server names: alphanumeric, underscore, hyphen only
BACKEND_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_-]+$')
# Pattern for converting domain to backend name
NON_ALNUM_PATTERN = re.compile(r'[^a-zA-Z0-9]')
# Limits
MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB max response from HAProxy
SUBPROCESS_TIMEOUT = 30 # seconds
STARTUP_RETRY_COUNT = 10 # HAProxy ready check retries
STATE_MIN_COLUMNS = 19 # Minimum columns in HAProxy server state output
SOCKET_TIMEOUT = 5 # seconds for HAProxy socket connection
class HaproxyError(Exception):
"""HAProxy operation error"""
pass
# CSV field indices for HAProxy stats (show stat command)
class StatField:
"""HAProxy CSV stat field indices."""
PXNAME = 0 # Proxy name (frontend/backend)
SVNAME = 1 # Server name (or FRONTEND/BACKEND)
SCUR = 4 # Current sessions
SMAX = 6 # Max sessions
STATUS = 17 # Status (UP/DOWN/MAINT/etc)
WEIGHT = 18 # Server weight
CHECK_STATUS = 36 # Check status
def haproxy_cmd(command: str) -> str:
"""Send command to HAProxy Runtime API.
Args:
command: The HAProxy runtime API command to execute
Returns:
The response from HAProxy
Raises:
HaproxyError: If connection fails or response exceeds size limit
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(SOCKET_TIMEOUT)
s.connect(HAPROXY_SOCKET)
s.sendall(f"{command}\n".encode())
s.shutdown(socket.SHUT_WR)
response = b""
while True:
data = s.recv(8192)
if not data:
break
response += data
if len(response) > MAX_RESPONSE_SIZE:
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
return response.decode().strip()
except socket.timeout:
raise HaproxyError("Connection timeout")
except ConnectionRefusedError:
raise HaproxyError("Connection refused - HAProxy not running?")
except UnicodeDecodeError:
raise HaproxyError("Invalid UTF-8 in response")
except HaproxyError:
raise
except Exception as e:
raise HaproxyError(str(e))
def reload_haproxy() -> Tuple[bool, str]:
"""Validate and reload HAProxy configuration.
Returns:
Tuple of (success, message)
"""
try:
validate = subprocess.run(
["podman", "exec", "haproxy", "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if validate.returncode != 0:
return False, f"Config validation failed:\n{validate.stderr}"
result = subprocess.run(
["podman", "kill", "--signal", "USR2", "haproxy"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return False, f"Reload failed: {result.stderr}"
return True, "OK"
except subprocess.TimeoutExpired:
return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return False, "podman command not found"
except OSError as e:
return False, f"OS error: {e}"
def validate_domain(domain: str) -> bool:
"""Validate domain format.
Args:
domain: The domain name to validate
Returns:
True if domain is valid, False otherwise
"""
if not domain or len(domain) > 253:
return False
return bool(DOMAIN_PATTERN.match(domain))
def validate_ip(ip: str, allow_empty: bool = False) -> bool:
"""Validate IPv4 address format.
Args:
ip: The IP address to validate
allow_empty: If True, empty string is considered valid
Returns:
True if IP is valid, False otherwise
"""
if not ip:
return allow_empty
if not IP_PATTERN.match(ip):
return False
return all(0 <= int(octet) <= 255 for octet in ip.split('.'))
def validate_backend_name(name: str) -> bool:
"""Validate backend or server name to prevent command injection.
Args:
name: The backend or server name to validate
Returns:
True if name contains only safe characters
"""
if not name or len(name) > 255:
return False
return bool(BACKEND_NAME_PATTERN.match(name))
def domain_to_backend(domain: str) -> str:
"""Convert domain to backend name (alphanumeric + underscore only).
Args:
domain: The domain name to convert
Returns:
Backend name with non-alphanumeric characters replaced by underscores
Raises:
ValueError: If resulting name is invalid
"""
result = NON_ALNUM_PATTERN.sub('_', domain)
if not validate_backend_name(result):
raise ValueError(f"Invalid backend name after conversion: {result}")
return result
def parse_stat_csv(stat_output: str) -> Generator[Dict[str, str], None, None]:
"""Parse HAProxy stat CSV output into structured data.
Args:
stat_output: Raw output from 'show stat' command
Yields:
Dictionaries with parsed stat fields for each row
"""
for line in stat_output.split("\n"):
if not line or line.startswith("#"):
continue
parts = line.split(",")
if len(parts) > StatField.STATUS:
yield {
"pxname": parts[StatField.PXNAME],
"svname": parts[StatField.SVNAME],
"scur": parts[StatField.SCUR] if len(parts) > StatField.SCUR else "0",
"smax": parts[StatField.SMAX] if len(parts) > StatField.SMAX else "0",
"status": parts[StatField.STATUS],
"weight": parts[StatField.WEIGHT] if len(parts) > StatField.WEIGHT else "0",
"check_status": parts[StatField.CHECK_STATUS] if len(parts) > StatField.CHECK_STATUS else "",
}
def validate_port(port: str) -> bool:
"""Validate port number is in valid range.
Args:
port: Port number as string
Returns:
True if port is valid (1-65535), False otherwise
"""
if not port or not port.isdigit():
return False
port_num = int(port)
return 1 <= port_num <= 65535
def get_map_contents() -> List[Tuple[str, str]]:
"""Read domains.map file and return list of (domain, backend) tuples.
Returns:
List of (domain, backend) tuples from the map file
"""
entries = []
try:
with open(MAP_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2:
entries.append((parts[0], parts[1]))
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
pass
return entries
def find_available_pool() -> Optional[str]:
"""Find first unused pool from pool_1 to pool_{POOL_COUNT}.
Returns:
Pool name (e.g., 'pool_1') if available, None if all pools are used
"""
used_pools: Set[str] = set()
for domain, backend in get_map_contents():
if backend.startswith("pool_"):
used_pools.add(backend)
for i in range(1, POOL_COUNT + 1):
pool_name = f"pool_{i}"
if pool_name not in used_pools:
return pool_name
return None
def get_domain_backend(domain: str) -> Optional[str]:
"""Look up the backend for a domain from domains.map.
Args:
domain: The domain to look up
Returns:
Backend name if found, None otherwise
"""
for map_domain, backend in get_map_contents():
if map_domain == domain:
return backend
return None
def is_legacy_backend(backend: str) -> bool:
"""Check if backend is a legacy static backend (not a pool).
Args:
backend: Backend name to check
Returns:
True if this is a legacy backend, False if it's a pool
"""
return not backend.startswith("pool_")
def get_legacy_backend_name(domain: str) -> str:
"""Convert domain to legacy backend name format.
Args:
domain: Domain name
Returns:
Legacy backend name (e.g., 'api_example_com_backend')
"""
return f"{domain_to_backend(domain)}_backend"
def save_map_file(entries: List[Tuple[str, str]]) -> None:
"""Save entries to domains.map file with file locking.
Args:
entries: List of (domain, backend) tuples to write
Raises:
IOError: If the file cannot be written
"""
with open(MAP_FILE, "w", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
except OSError:
pass # Continue without lock if not supported
try:
f.write("# Domain to Backend mapping\n")
f.write("# Format: domain backend_name\n")
f.write("# Wildcard: .domain.com matches *.domain.com\n\n")
for domain, backend in entries:
f.write(f"{domain} {backend}\n")
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
def load_servers_config() -> Dict:
"""Load servers configuration from JSON file with file locking.
Returns:
Dictionary with server configurations
"""
try:
with open(SERVERS_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
return json.load(f)
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
return {}
except json.JSONDecodeError as e:
print(f"Warning: Corrupt config file {SERVERS_FILE}: {e}", file=sys.stderr)
return {}
def save_servers_config(config: Dict) -> None:
"""Save servers configuration to JSON file with file locking.
Args:
config: Dictionary with server configurations
"""
with open(SERVERS_FILE, "w", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
except OSError:
pass # Continue without lock if not supported
try:
json.dump(config, f, indent=2)
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
def add_server_to_config(domain: str, slot: int, ip: str, http_port: int, https_port: int) -> None:
"""Add server configuration to persistent storage.
Args:
domain: Domain name
slot: Server slot (1 to MAX_SLOTS)
ip: Server IP address
http_port: HTTP port
https_port: HTTPS port
"""
config = load_servers_config()
if domain not in config:
config[domain] = {}
config[domain][str(slot)] = {
"ip": ip,
"http_port": http_port,
"https_port": https_port
}
save_servers_config(config)
def remove_server_from_config(domain: str, slot: int) -> None:
"""Remove server configuration from persistent storage.
Args:
domain: Domain name
slot: Server slot to remove
"""
config = load_servers_config()
if domain in config and str(slot) in config[domain]:
del config[domain][str(slot)]
if not config[domain]:
del config[domain]
save_servers_config(config)
def remove_domain_from_config(domain: str) -> None:
"""Remove all server configurations for a domain.
Args:
domain: Domain name to remove
"""
config = load_servers_config()
if domain in config:
del config[domain]
save_servers_config(config)
def get_server_suffixes(http_port: int, https_port: int) -> List[Tuple[str, int]]:
"""Get server suffixes and ports based on port configuration.
Args:
http_port: HTTP port
https_port: HTTPS port
Returns:
List of (suffix, port) tuples
"""
if http_port == 80 and https_port == 443:
# Default ports: HTTP + HTTPS + HTTP/3
return [("", 80), ("_ssl", 443), ("_h3", 443)]
else:
# Custom port: HTTP only
return [("", http_port)]
def restore_servers_from_config() -> int:
"""Restore all servers from configuration file.
Returns:
Number of servers restored
"""
config = load_servers_config()
restored = 0
for domain, slots in config.items():
backend = get_domain_backend(domain)
if not backend:
continue
try:
if backend.startswith("pool_"):
server_prefix = backend
else:
server_prefix = domain_to_backend(domain)
except ValueError as e:
print(f"Warning: Invalid domain '{domain}': {e}", file=sys.stderr)
continue
for slot_str, server_info in slots.items():
try:
slot = int(slot_str)
except ValueError:
print(f"Warning: Invalid slot '{slot_str}' for {domain}, skipping", file=sys.stderr)
continue
ip = server_info.get("ip", "")
if not ip:
continue
try:
http_port = int(server_info.get("http_port", 80))
https_port = int(server_info.get("https_port", 443))
except (ValueError, TypeError):
print(f"Warning: Invalid port for {domain} slot {slot}, skipping", file=sys.stderr)
continue
try:
for suffix, port in get_server_suffixes(http_port, https_port):
server = f"{server_prefix}{suffix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {backend}/{server} state ready")
restored += 1
except HaproxyError as e:
print(f"Warning: Failed to restore {domain} slot {slot}: {e}", file=sys.stderr)
return restored
@mcp.tool()
def haproxy_list_domains() -> str:
"""List all configured domains with their backend servers.
Returns:
List of domains with their associated backend servers
"""
try:
domains = []
state = haproxy_cmd("show servers state")
# Build server map from HAProxy state
server_map: Dict[str, list] = {}
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[4] != "0.0.0.0":
backend = parts[1]
if backend not in server_map:
server_map[backend] = []
server_map[backend].append(f"{parts[3]}={parts[4]}:{parts[18]}")
# Read from domains.map (skip wildcard entries starting with .)
seen_domains: Set[str] = set()
for domain, backend in get_map_contents():
if domain.startswith("."):
continue
if domain in seen_domains:
continue
seen_domains.add(domain)
servers = server_map.get(backend, ["(none)"])
backend_type = "pool" if backend.startswith("pool_") else "static"
domains.append(f"{domain} -> {backend} ({backend_type}): {', '.join(servers)}")
return "\n".join(domains) if domains else "No domains configured"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_domain(domain: str, ip: str = "", http_port: int = 80, https_port: int = 443) -> str:
"""Add a new domain to HAProxy using map-based routing (no reload required).
Args:
domain: The domain name to add (e.g., api.example.com)
ip: Optional IP address for initial server; if provided, adds to slot 1
http_port: HTTP port for the backend server (default: 80)
https_port: HTTPS port for the backend server (default: 443)
Returns:
Success message or error description
"""
# Validate inputs
if not validate_domain(domain):
return "Error: Invalid domain format"
if not validate_ip(ip, allow_empty=True):
return "Error: Invalid IP address format"
if not (1 <= http_port <= 65535) or not (1 <= https_port <= 65535):
return "Error: Port must be between 1 and 65535"
# Check if domain already exists
existing_backend = get_domain_backend(domain)
if existing_backend:
return f"Error: Domain {domain} already exists (mapped to {existing_backend})"
# Find available pool
pool = find_available_pool()
if not pool:
return f"Error: No available pools (all {POOL_COUNT} pools are in use)"
try:
# Update HAProxy map via Runtime API first (immediate effect)
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} {domain} {pool}")
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} .{domain} {pool}")
# Read current map entries and save to file (persistence)
entries = get_map_contents()
entries.append((domain, pool))
entries.append((f".{domain}", pool))
save_map_file(entries)
# If IP provided, add server to slot 1
if ip:
suffixes = get_server_suffixes(http_port, https_port)
for suffix, port in suffixes:
server = f"{pool}{suffix}_1"
haproxy_cmd(f"set server {pool}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {pool}/{server} state ready")
# Save to persistent config
add_server_to_config(domain, 1, ip, http_port, https_port)
if len(suffixes) == 1:
return f"Domain {domain} added to {pool} with server {ip}:{http_port} (HTTP only)"
return f"Domain {domain} added to {pool} with server {ip}:{http_port}/{https_port}"
return f"Domain {domain} added to {pool} (no servers configured)"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: Failed to update map file: {e}"
@mcp.tool()
def haproxy_remove_domain(domain: str) -> str:
"""Remove a domain from HAProxy (no reload required for pool-based domains).
Args:
domain: The domain name to remove
Returns:
Success message or error description
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Look up the domain in the map
backend = get_domain_backend(domain)
if not backend:
return f"Error: Domain {domain} not found"
# Check if this is a legacy backend (not a pool)
if is_legacy_backend(backend):
return f"Error: Cannot remove legacy domain {domain} (uses static backend {backend})"
try:
# Clear map entries via Runtime API first (immediate effect)
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} {domain}")
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} .{domain}")
# Remove entries from map file (persistence)
entries = get_map_contents()
new_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"]
save_map_file(new_entries)
# Disable all servers in the pool (reset to 0.0.0.0:0)
for slot in range(1, MAX_SLOTS + 1):
for suffix in ["", "_ssl", "_h3"]:
server = f"{backend}{suffix}_{slot}"
try:
haproxy_cmd(f"set server {backend}/{server} state maint")
haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0")
except HaproxyError:
pass # Ignore errors for individual servers
# Remove from persistent config
remove_domain_from_config(domain)
return f"Domain {domain} removed from {backend}"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: Failed to update map file: {e}"
@mcp.tool()
def haproxy_list_servers(domain: str) -> str:
"""List all servers for a specific domain.
Args:
domain: The domain name to list servers for
Returns:
List of servers with their addresses and status
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
try:
# Look up backend from map
backend = get_domain_backend(domain)
if not backend:
# Fall back to legacy naming convention
backend = get_legacy_backend_name(domain)
servers = []
state = haproxy_cmd("show servers state")
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[1] == backend:
status = "active" if parts[4] != "0.0.0.0" else "disabled"
servers.append(f"{parts[3]}: {parts[4]}:{parts[18]} ({status})")
if not servers:
return f"Backend {backend} not found"
return f"Servers for {domain} ({backend}):\n" + "\n".join(servers)
except (HaproxyError, ValueError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_server(domain: str, slot: int, ip: str, http_port: int = 80, https_port: int = 443) -> str:
"""Add a server to a domain's backend at specified slot.
Args:
domain: The domain name to add the server to
slot: Server slot number (1 to MAX_SLOTS)
ip: IP address of the server (required)
http_port: HTTP port (default: 80)
https_port: HTTPS port (default: 443)
Returns:
Success message or error description
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not ip:
return "Error: IP address is required"
if not validate_ip(ip):
return "Error: Invalid IP address format"
if not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}"
if not (1 <= http_port <= 65535) or not (1 <= https_port <= 65535):
return "Error: Port must be between 1 and 65535"
try:
# Look up backend from map
backend = get_domain_backend(domain)
if not backend:
# Fall back to legacy naming convention
backend = get_legacy_backend_name(domain)
# Determine server name prefix based on backend type
if backend.startswith("pool_"):
# Pool backends use pool_N_slot naming
server_prefix = backend
else:
# Legacy backends use domain-based naming
server_prefix = domain_to_backend(domain)
results = []
for suffix, port in get_server_suffixes(http_port, https_port):
server = f"{server_prefix}{suffix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {backend}/{server} state ready")
results.append(f"{server}{ip}:{port}")
# Save to persistent config
add_server_to_config(domain, slot, ip, http_port, https_port)
return f"Added to {domain} ({backend}) slot {slot}:\n" + "\n".join(results)
except (HaproxyError, ValueError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_remove_server(domain: str, slot: int) -> str:
"""Remove a server from a domain's backend at specified slot.
Args:
domain: The domain name to remove the server from
slot: Server slot number (1 to MAX_SLOTS) to remove
Returns:
Success message or error description
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}"
try:
# Look up backend from map
backend = get_domain_backend(domain)
if not backend:
# Fall back to legacy naming convention
backend = get_legacy_backend_name(domain)
# Determine server name prefix based on backend type
if backend.startswith("pool_"):
# Pool backends use pool_N_slot naming
server_prefix = backend
else:
# Legacy backends use domain-based naming
server_prefix = domain_to_backend(domain)
# Get server config to determine which suffixes to remove
config = load_servers_config()
suffixes = ["", "_ssl", "_h3"] # Default: remove all
if domain in config and str(slot) in config[domain]:
server_info = config[domain][str(slot)]
try:
http_port = int(server_info.get("http_port", 80))
https_port = int(server_info.get("https_port", 443))
suffixes = [s for s, _ in get_server_suffixes(http_port, https_port)]
except (ValueError, TypeError):
pass # Use default suffixes
for suffix in suffixes:
server = f"{server_prefix}{suffix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} state maint")
haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0")
# Remove from persistent config
remove_server_from_config(domain, slot)
return f"Removed server at slot {slot} from {domain} ({backend})"
except (HaproxyError, ValueError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_stats() -> str:
"""Get HAProxy status and statistics.
Returns:
Key HAProxy metrics (name, version, uptime, connections, etc.)
"""
try:
result = haproxy_cmd("show info")
stats = {}
for line in result.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
stats[key.strip()] = value.strip()
important = ["Name", "Version", "Uptime_sec", "CurrConns", "MaxConn", "Run_queue", "Tasks"]
output = []
for key in important:
if key in stats:
output.append(f"{key}: {stats[key]}")
return "\n".join(output) if output else result
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_backends() -> str:
"""List all HAProxy backends.
Returns:
List of all configured backend names
"""
try:
result = haproxy_cmd("show backend")
backends = [line for line in result.split("\n") if line and not line.startswith("#")]
return "Backends:\n" + "\n".join(f"{b}" for b in backends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_list_frontends() -> str:
"""List all HAProxy frontends with their status.
Returns:
List of frontends with status and session counts
"""
try:
result = haproxy_cmd("show stat")
frontends = []
for stat in parse_stat_csv(result):
if stat["svname"] == "FRONTEND":
frontends.append(
f"{stat['pxname']}: {stat['status']} (sessions: {stat['scur']})"
)
if not frontends:
return "No frontends found"
return "Frontends:\n" + "\n".join(frontends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_state(backend: str, server: str, state: str) -> str:
"""Set server state. States: ready (enable), drain (graceful shutdown), maint (maintenance/disable)
Args:
backend: Backend name (alphanumeric, underscore, hyphen only)
server: Server name within the backend
state: Target state - ready, drain, or maint
Returns:
Success message or error description
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if state not in ["ready", "drain", "maint"]:
return "Error: state must be 'ready', 'drain', or 'maint'"
try:
result = haproxy_cmd(f"set server {backend}/{server} state {state}")
return result if result else f"Server {backend}/{server} set to {state}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_server_health(backend: str = "") -> str:
"""Get health status of all servers or servers in a specific backend.
Args:
backend: Optional backend name to filter results
Returns:
Server health status or error description
"""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
servers = []
for stat in parse_stat_csv(result):
if stat["svname"] not in ["FRONTEND", "BACKEND", ""]:
if backend and stat["pxname"] != backend:
continue
servers.append(
f"{stat['pxname']}/{stat['svname']}: {stat['status']} "
f"(weight: {stat['weight']}, check: {stat['check_status']})"
)
return "\n".join(servers) if servers else "No servers found"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_weight(backend: str, server: str, weight: int) -> str:
"""Set server weight (0-256). Weight 0 disables the server for new connections.
Args:
backend: Backend name (alphanumeric, underscore, hyphen only)
server: Server name within the backend
weight: Server weight from 0 to 256
Returns:
Success message or error description
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if not (0 <= weight <= 256):
return "Error: weight must be between 0 and 256"
try:
result = haproxy_cmd(f"set server {backend}/{server} weight {weight}")
return result if result else f"Server {backend}/{server} weight set to {weight}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_connections(backend: str = "") -> str:
"""Get active connections per server.
Args:
backend: Optional backend name to filter results
Returns:
Connection statistics or error description
"""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
connections = []
for stat in parse_stat_csv(result):
if backend and stat["pxname"] != backend:
continue
if stat["svname"] in ["FRONTEND", "BACKEND"]:
connections.append(
f"{stat['pxname']} ({stat['svname']}): {stat['scur']} current, {stat['smax']} max"
)
elif stat["svname"]:
connections.append(f" - {stat['svname']}: {stat['scur']} connections")
return "\n".join(connections) if connections else "No connection data"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_reload() -> str:
"""Reload HAProxy configuration (validates config first).
Returns:
Success message or error details if validation/reload failed
"""
success, msg = reload_haproxy()
if not success:
return msg
return "HAProxy configuration reloaded successfully"
@mcp.tool()
def haproxy_check_config() -> str:
"""Validate HAProxy configuration file syntax.
Returns:
Validation result or error details
"""
try:
result = subprocess.run(
["podman", "exec", "haproxy", "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode == 0:
return "Configuration is valid"
return f"Configuration errors:\n{result.stderr}"
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return "Error: podman command not found"
except OSError as e:
return f"Error: OS error: {e}"
@mcp.tool()
def haproxy_save_state() -> str:
"""Save current server state to disk.
Returns:
Success message or error description
"""
try:
state = haproxy_cmd("show servers state")
with open(STATE_FILE, "w", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
except OSError:
pass # Continue without lock if not supported
try:
f.write(state)
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
return "Server state saved"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: Failed to save state: {e}"
@mcp.tool()
def haproxy_restore_state() -> str:
"""Restore server state from disk.
Returns:
Summary of restored servers or error description
"""
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
state = f.read()
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
restored = 0
skipped = 0
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and not line.startswith("#"):
backend = parts[1]
server = parts[3]
addr = parts[4]
port = parts[18]
# Skip disabled servers
if addr == "0.0.0.0":
continue
# Validate names from state file to prevent injection
if not validate_backend_name(backend) or not validate_backend_name(server):
skipped += 1
continue
# Validate IP and port
if not validate_ip(addr) or not validate_port(port):
skipped += 1
continue
haproxy_cmd(f"set server {backend}/{server} addr {addr} port {port}")
haproxy_cmd(f"enable server {backend}/{server}")
restored += 1
result = f"Server state restored ({restored} servers)"
if skipped:
result += f", {skipped} entries skipped due to validation"
return result
except FileNotFoundError:
return "Error: No saved state found"
except HaproxyError as e:
return f"Error: {e}"
def startup_restore() -> None:
"""Restore servers from config file on startup."""
# Wait for HAProxy to be ready
for _ in range(STARTUP_RETRY_COUNT):
try:
haproxy_cmd("show info")
break
except HaproxyError:
time.sleep(1)
else:
print("Warning: HAProxy not ready, skipping restore", file=sys.stderr)
return
try:
count = restore_servers_from_config()
if count > 0:
print(f"Restored {count} servers from config", file=sys.stderr)
except (HaproxyError, OSError, ValueError, json.JSONDecodeError) as e:
print(f"Warning: Failed to restore servers: {e}", file=sys.stderr)
if __name__ == "__main__":
startup_restore()
mcp.run(transport="streamable-http")