Files
haproxy-mcp/mcp/server.py
kaffa 61dd4a69fc Improve code quality based on code review
Major improvements:
- Atomic file writes using temp file + rename pattern
- Structured logging with logging module (replaces print)
- StateField class for HAProxy state field indices
- Helper function get_backend_and_prefix() to reduce duplication
- Consistent exception chaining with 'from e'
- Proper fd/temp_path tracking to prevent resource leaks
- Added IOError handling in server management functions

Technical changes:
- save_map_file, save_servers_config, haproxy_save_state now use
  atomic writes with tempfile.mkstemp() + os.rename()
- Standardized on 'set server state ready' (was 'enable server')
- All magic numbers for state parsing replaced with StateField class

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 12:48:49 +00:00

1168 lines
37 KiB
Python

#!/usr/bin/env python3
"""HAProxy MCP Server - Direct Runtime API Integration
This module provides an MCP (Model Context Protocol) server for managing HAProxy
configuration and runtime state. It supports dynamic domain/server management
with HTTP backends (SSL termination at HAProxy frontend).
"""
import socket
import subprocess
import re
import json
import logging
import os
import tempfile
import time
import fcntl
from typing import Any, Dict, Generator, List, Optional, Set, Tuple
from mcp.server.fastmcp import FastMCP
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
mcp = FastMCP("haproxy", host="0.0.0.0", port=8000)
# Constants
HAPROXY_SOCKET: Tuple[str, int] = ("localhost", 9999)
STATE_FILE: str = "/opt/haproxy/data/servers.state"
MAP_FILE: str = "/opt/haproxy/conf/domains.map"
MAP_FILE_CONTAINER: str = "/usr/local/etc/haproxy/domains.map"
SERVERS_FILE: str = "/opt/haproxy/conf/servers.json"
POOL_COUNT: int = 100
MAX_SLOTS: int = 10
# Validation patterns - compiled once for performance
DOMAIN_PATTERN = re.compile(
r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?'
r'(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
)
IP_PATTERN = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
# Backend/server names: alphanumeric, underscore, hyphen only
BACKEND_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_-]+$')
# Pattern for converting domain to backend name
NON_ALNUM_PATTERN = re.compile(r'[^a-zA-Z0-9]')
# Limits
MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB max response from HAProxy
SUBPROCESS_TIMEOUT = 30 # seconds
STARTUP_RETRY_COUNT = 10 # HAProxy ready check retries
STATE_MIN_COLUMNS = 19 # Minimum columns in HAProxy server state output
SOCKET_TIMEOUT = 5 # seconds for HAProxy socket connection
class HaproxyError(Exception):
"""HAProxy operation error"""
pass
# CSV field indices for HAProxy stats (show stat command)
class StatField:
"""HAProxy CSV stat field indices."""
PXNAME = 0 # Proxy name (frontend/backend)
SVNAME = 1 # Server name (or FRONTEND/BACKEND)
SCUR = 4 # Current sessions
SMAX = 6 # Max sessions
STATUS = 17 # Status (UP/DOWN/MAINT/etc)
WEIGHT = 18 # Server weight
CHECK_STATUS = 36 # Check status
# Field indices for HAProxy server state (show servers state command)
class StateField:
"""HAProxy server state field indices."""
BE_ID = 0 # Backend ID
BE_NAME = 1 # Backend name
SRV_ID = 2 # Server ID
SRV_NAME = 3 # Server name
SRV_ADDR = 4 # Server address
SRV_OP_STATE = 5 # Operational state
SRV_ADMIN_STATE = 6 # Admin state
SRV_PORT = 18 # Server port
def haproxy_cmd(command: str) -> str:
"""Send command to HAProxy Runtime API.
Args:
command: The HAProxy runtime API command to execute
Returns:
The response from HAProxy
Raises:
HaproxyError: If connection fails or response exceeds size limit
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(SOCKET_TIMEOUT)
s.connect(HAPROXY_SOCKET)
s.sendall(f"{command}\n".encode())
s.shutdown(socket.SHUT_WR)
response = b""
while True:
data = s.recv(8192)
if not data:
break
response += data
if len(response) > MAX_RESPONSE_SIZE:
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
return response.decode().strip()
except socket.timeout:
raise HaproxyError("Connection timeout")
except ConnectionRefusedError:
raise HaproxyError("Connection refused - HAProxy not running?")
except UnicodeDecodeError:
raise HaproxyError("Invalid UTF-8 in response")
except HaproxyError:
raise
except Exception as e:
raise HaproxyError(str(e)) from e
def reload_haproxy() -> Tuple[bool, str]:
"""Validate and reload HAProxy configuration.
Returns:
Tuple of (success, message)
"""
try:
validate = subprocess.run(
["podman", "exec", "haproxy", "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if validate.returncode != 0:
return False, f"Config validation failed:\n{validate.stderr}"
result = subprocess.run(
["podman", "kill", "--signal", "USR2", "haproxy"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return False, f"Reload failed: {result.stderr}"
return True, "OK"
except subprocess.TimeoutExpired:
return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return False, "podman command not found"
except OSError as e:
return False, f"OS error: {e}"
def validate_domain(domain: str) -> bool:
"""Validate domain format.
Args:
domain: The domain name to validate
Returns:
True if domain is valid, False otherwise
"""
if not domain or len(domain) > 253:
return False
return bool(DOMAIN_PATTERN.match(domain))
def validate_ip(ip: str, allow_empty: bool = False) -> bool:
"""Validate IPv4 address format.
Args:
ip: The IP address to validate
allow_empty: If True, empty string is considered valid
Returns:
True if IP is valid, False otherwise
"""
if not ip:
return allow_empty
if not IP_PATTERN.match(ip):
return False
return all(0 <= int(octet) <= 255 for octet in ip.split('.'))
def validate_backend_name(name: str) -> bool:
"""Validate backend or server name to prevent command injection.
Args:
name: The backend or server name to validate
Returns:
True if name contains only safe characters
"""
if not name or len(name) > 255:
return False
return bool(BACKEND_NAME_PATTERN.match(name))
def domain_to_backend(domain: str) -> str:
"""Convert domain to backend name (alphanumeric + underscore only).
Args:
domain: The domain name to convert
Returns:
Backend name with non-alphanumeric characters replaced by underscores
Raises:
ValueError: If resulting name is invalid
"""
result = NON_ALNUM_PATTERN.sub('_', domain)
if not validate_backend_name(result):
raise ValueError(f"Invalid backend name after conversion: {result}")
return result
def parse_stat_csv(stat_output: str) -> Generator[Dict[str, str], None, None]:
"""Parse HAProxy stat CSV output into structured data.
Args:
stat_output: Raw output from 'show stat' command
Yields:
Dictionaries with parsed stat fields for each row
"""
for line in stat_output.split("\n"):
if not line or line.startswith("#"):
continue
parts = line.split(",")
if len(parts) > StatField.STATUS:
yield {
"pxname": parts[StatField.PXNAME],
"svname": parts[StatField.SVNAME],
"scur": parts[StatField.SCUR] if len(parts) > StatField.SCUR else "0",
"smax": parts[StatField.SMAX] if len(parts) > StatField.SMAX else "0",
"status": parts[StatField.STATUS],
"weight": parts[StatField.WEIGHT] if len(parts) > StatField.WEIGHT else "0",
"check_status": parts[StatField.CHECK_STATUS] if len(parts) > StatField.CHECK_STATUS else "",
}
def validate_port(port: str) -> bool:
"""Validate port number is in valid range.
Args:
port: Port number as string
Returns:
True if port is valid (1-65535), False otherwise
"""
if not port or not port.isdigit():
return False
port_num = int(port)
return 1 <= port_num <= 65535
def get_map_contents() -> List[Tuple[str, str]]:
"""Read domains.map file and return list of (domain, backend) tuples.
Returns:
List of (domain, backend) tuples from the map file
"""
entries = []
try:
with open(MAP_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2:
entries.append((parts[0], parts[1]))
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
pass
return entries
def find_available_pool() -> Optional[str]:
"""Find first unused pool from pool_1 to pool_{POOL_COUNT}.
Returns:
Pool name (e.g., 'pool_1') if available, None if all pools are used
"""
used_pools: Set[str] = set()
for domain, backend in get_map_contents():
if backend.startswith("pool_"):
used_pools.add(backend)
for i in range(1, POOL_COUNT + 1):
pool_name = f"pool_{i}"
if pool_name not in used_pools:
return pool_name
return None
def get_domain_backend(domain: str) -> Optional[str]:
"""Look up the backend for a domain from domains.map.
Args:
domain: The domain to look up
Returns:
Backend name if found, None otherwise
"""
for map_domain, backend in get_map_contents():
if map_domain == domain:
return backend
return None
def is_legacy_backend(backend: str) -> bool:
"""Check if backend is a legacy static backend (not a pool).
Args:
backend: Backend name to check
Returns:
True if this is a legacy backend, False if it's a pool
"""
return not backend.startswith("pool_")
def get_legacy_backend_name(domain: str) -> str:
"""Convert domain to legacy backend name format.
Args:
domain: Domain name
Returns:
Legacy backend name (e.g., 'api_example_com_backend')
"""
return f"{domain_to_backend(domain)}_backend"
def save_map_file(entries: List[Tuple[str, str]]) -> None:
"""Save entries to domains.map file atomically.
Uses temp file + rename for atomic write to prevent race conditions.
Args:
entries: List of (domain, backend) tuples to write
Raises:
IOError: If the file cannot be written
"""
dir_path = os.path.dirname(MAP_FILE)
fd = None
temp_path = None
try:
fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.domains.map.')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None # fd is now owned by the file object
f.write("# Domain to Backend mapping\n")
f.write("# Format: domain backend_name\n")
f.write("# Wildcard: .domain.com matches *.domain.com\n\n")
for domain, backend in entries:
f.write(f"{domain} {backend}\n")
os.rename(temp_path, MAP_FILE)
temp_path = None # Rename succeeded, don't unlink
except OSError as e:
raise IOError(f"Failed to save map file: {e}") from e
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if temp_path is not None:
try:
os.unlink(temp_path)
except OSError:
pass
def load_servers_config() -> Dict[str, Any]:
"""Load servers configuration from JSON file with file locking.
Returns:
Dictionary with server configurations
"""
try:
with open(SERVERS_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
logger.debug("File locking not supported for %s", SERVERS_FILE)
try:
return json.load(f)
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
return {}
except json.JSONDecodeError as e:
logger.warning("Corrupt config file %s: %s", SERVERS_FILE, e)
return {}
def save_servers_config(config: Dict[str, Any]) -> None:
"""Save servers configuration to JSON file atomically.
Uses temp file + rename for atomic write to prevent race conditions.
Args:
config: Dictionary with server configurations
"""
dir_path = os.path.dirname(SERVERS_FILE)
fd = None
temp_path = None
try:
fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.servers.json.')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None # fd is now owned by the file object
json.dump(config, f, indent=2)
os.rename(temp_path, SERVERS_FILE)
temp_path = None # Rename succeeded, don't unlink
except OSError as e:
raise IOError(f"Failed to save servers config: {e}") from e
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if temp_path is not None:
try:
os.unlink(temp_path)
except OSError:
pass
def add_server_to_config(domain: str, slot: int, ip: str, http_port: int) -> None:
"""Add server configuration to persistent storage.
Args:
domain: Domain name
slot: Server slot (1 to MAX_SLOTS)
ip: Server IP address
http_port: HTTP port
"""
config = load_servers_config()
if domain not in config:
config[domain] = {}
config[domain][str(slot)] = {
"ip": ip,
"http_port": http_port
}
save_servers_config(config)
def remove_server_from_config(domain: str, slot: int) -> None:
"""Remove server configuration from persistent storage.
Args:
domain: Domain name
slot: Server slot to remove
"""
config = load_servers_config()
if domain in config and str(slot) in config[domain]:
del config[domain][str(slot)]
if not config[domain]:
del config[domain]
save_servers_config(config)
def remove_domain_from_config(domain: str) -> None:
"""Remove all server configurations for a domain.
Args:
domain: Domain name to remove
"""
config = load_servers_config()
if domain in config:
del config[domain]
save_servers_config(config)
def get_server_suffixes(http_port: int) -> List[Tuple[str, int]]:
"""Get server suffixes and ports based on port configuration.
Args:
http_port: HTTP port for backend
Returns:
List of (suffix, port) tuples - always HTTP only
"""
return [("", http_port)]
def get_backend_and_prefix(domain: str) -> Tuple[str, str]:
"""Look up backend and determine server name prefix for a domain.
Args:
domain: The domain name to look up
Returns:
Tuple of (backend_name, server_prefix)
Raises:
ValueError: If domain cannot be mapped to a valid backend
"""
backend = get_domain_backend(domain)
if not backend:
backend = get_legacy_backend_name(domain)
if backend.startswith("pool_"):
server_prefix = backend
else:
server_prefix = domain_to_backend(domain)
return backend, server_prefix
def restore_servers_from_config() -> int:
"""Restore all servers from configuration file.
Returns:
Number of servers restored
"""
config = load_servers_config()
restored = 0
for domain, slots in config.items():
backend = get_domain_backend(domain)
if not backend:
continue
try:
_, server_prefix = get_backend_and_prefix(domain)
except ValueError as e:
logger.warning("Invalid domain '%s': %s", domain, e)
continue
for slot_str, server_info in slots.items():
try:
slot = int(slot_str)
except ValueError:
logger.warning("Invalid slot '%s' for %s, skipping", slot_str, domain)
continue
ip = server_info.get("ip", "")
if not ip:
continue
try:
http_port = int(server_info.get("http_port", 80))
except (ValueError, TypeError):
logger.warning("Invalid port for %s slot %d, skipping", domain, slot)
continue
try:
for suffix, port in get_server_suffixes(http_port):
server = f"{server_prefix}{suffix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {backend}/{server} state ready")
restored += 1
except HaproxyError as e:
logger.warning("Failed to restore %s slot %d: %s", domain, slot, e)
return restored
@mcp.tool()
def haproxy_list_domains() -> str:
"""List all configured domains with their backend servers.
Returns:
List of domains with their associated backend servers
"""
try:
domains = []
state = haproxy_cmd("show servers state")
# Build server map from HAProxy state
server_map: Dict[str, list] = {}
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.SRV_ADDR] != "0.0.0.0":
backend = parts[StateField.BE_NAME]
if backend not in server_map:
server_map[backend] = []
server_map[backend].append(
f"{parts[StateField.SRV_NAME]}={parts[StateField.SRV_ADDR]}:{parts[StateField.SRV_PORT]}"
)
# Read from domains.map (skip wildcard entries starting with .)
seen_domains: Set[str] = set()
for domain, backend in get_map_contents():
if domain.startswith("."):
continue
if domain in seen_domains:
continue
seen_domains.add(domain)
servers = server_map.get(backend, ["(none)"])
backend_type = "pool" if backend.startswith("pool_") else "static"
domains.append(f"{domain} -> {backend} ({backend_type}): {', '.join(servers)}")
return "\n".join(domains) if domains else "No domains configured"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_domain(domain: str, ip: str = "", http_port: int = 80) -> str:
"""Add a new domain to HAProxy using map-based routing (no reload required).
Args:
domain: The domain name to add (e.g., api.example.com)
ip: Optional IP address for initial server; if provided, adds to slot 1
http_port: HTTP port for the backend server (default: 80)
Returns:
Success message or error description
"""
# Validate inputs
if not validate_domain(domain):
return "Error: Invalid domain format"
if not validate_ip(ip, allow_empty=True):
return "Error: Invalid IP address format"
if not (1 <= http_port <= 65535):
return "Error: Port must be between 1 and 65535"
# Check if domain already exists
existing_backend = get_domain_backend(domain)
if existing_backend:
return f"Error: Domain {domain} already exists (mapped to {existing_backend})"
# Find available pool
pool = find_available_pool()
if not pool:
return f"Error: No available pools (all {POOL_COUNT} pools are in use)"
try:
# Update HAProxy map via Runtime API first (immediate effect)
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} {domain} {pool}")
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} .{domain} {pool}")
# Read current map entries and save to file (persistence)
entries = get_map_contents()
entries.append((domain, pool))
entries.append((f".{domain}", pool))
save_map_file(entries)
# If IP provided, add server to slot 1
if ip:
for suffix, port in get_server_suffixes(http_port):
server = f"{pool}{suffix}_1"
haproxy_cmd(f"set server {pool}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {pool}/{server} state ready")
# Save to persistent config
add_server_to_config(domain, 1, ip, http_port)
return f"Domain {domain} added to {pool} with server {ip}:{http_port}"
return f"Domain {domain} added to {pool} (no servers configured)"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: Failed to update map file: {e}"
@mcp.tool()
def haproxy_remove_domain(domain: str) -> str:
"""Remove a domain from HAProxy (no reload required for pool-based domains).
Args:
domain: The domain name to remove
Returns:
Success message or error description
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Look up the domain in the map
backend = get_domain_backend(domain)
if not backend:
return f"Error: Domain {domain} not found"
# Check if this is a legacy backend (not a pool)
if is_legacy_backend(backend):
return f"Error: Cannot remove legacy domain {domain} (uses static backend {backend})"
try:
# Clear map entries via Runtime API first (immediate effect)
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} {domain}")
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} .{domain}")
# Remove entries from map file (persistence)
entries = get_map_contents()
new_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"]
save_map_file(new_entries)
# Disable all servers in the pool (reset to 0.0.0.0:0)
for slot in range(1, MAX_SLOTS + 1):
server = f"{backend}_{slot}"
try:
haproxy_cmd(f"set server {backend}/{server} state maint")
haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0")
except HaproxyError:
pass # Ignore errors for individual servers
# Remove from persistent config
remove_domain_from_config(domain)
return f"Domain {domain} removed from {backend}"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: Failed to update map file: {e}"
@mcp.tool()
def haproxy_list_servers(domain: str) -> str:
"""List all servers for a specific domain.
Args:
domain: The domain name to list servers for
Returns:
List of servers with their addresses and status
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
try:
backend, _ = get_backend_and_prefix(domain)
servers = []
state = haproxy_cmd("show servers state")
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
addr = parts[StateField.SRV_ADDR]
status = "active" if addr != "0.0.0.0" else "disabled"
servers.append(
f"{parts[StateField.SRV_NAME]}: {addr}:{parts[StateField.SRV_PORT]} ({status})"
)
if not servers:
return f"Backend {backend} not found"
return f"Servers for {domain} ({backend}):\n" + "\n".join(servers)
except (HaproxyError, ValueError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_server(domain: str, slot: int, ip: str, http_port: int = 80) -> str:
"""Add a server to a domain's backend at specified slot.
Args:
domain: The domain name to add the server to
slot: Server slot number (1 to MAX_SLOTS)
ip: IP address of the server (required)
http_port: HTTP port (default: 80)
Returns:
Success message or error description
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not ip:
return "Error: IP address is required"
if not validate_ip(ip):
return "Error: Invalid IP address format"
if not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}"
if not (1 <= http_port <= 65535):
return "Error: Port must be between 1 and 65535"
try:
backend, server_prefix = get_backend_and_prefix(domain)
results = []
for suffix, port in get_server_suffixes(http_port):
server = f"{server_prefix}{suffix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {backend}/{server} state ready")
results.append(f"{server}{ip}:{port}")
# Save to persistent config
add_server_to_config(domain, slot, ip, http_port)
return f"Added to {domain} ({backend}) slot {slot}:\n" + "\n".join(results)
except (HaproxyError, ValueError, IOError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_remove_server(domain: str, slot: int) -> str:
"""Remove a server from a domain's backend at specified slot.
Args:
domain: The domain name to remove the server from
slot: Server slot number (1 to MAX_SLOTS) to remove
Returns:
Success message or error description
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}"
try:
backend, server_prefix = get_backend_and_prefix(domain)
# HTTP only - single server per slot
server = f"{server_prefix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} state maint")
haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0")
# Remove from persistent config
remove_server_from_config(domain, slot)
return f"Removed server at slot {slot} from {domain} ({backend})"
except (HaproxyError, ValueError, IOError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_stats() -> str:
"""Get HAProxy status and statistics.
Returns:
Key HAProxy metrics (name, version, uptime, connections, etc.)
"""
try:
result = haproxy_cmd("show info")
stats = {}
for line in result.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
stats[key.strip()] = value.strip()
important = ["Name", "Version", "Uptime_sec", "CurrConns", "MaxConn", "Run_queue", "Tasks"]
output = []
for key in important:
if key in stats:
output.append(f"{key}: {stats[key]}")
return "\n".join(output) if output else result
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_backends() -> str:
"""List all HAProxy backends.
Returns:
List of all configured backend names
"""
try:
result = haproxy_cmd("show backend")
backends = [line for line in result.split("\n") if line and not line.startswith("#")]
return "Backends:\n" + "\n".join(f"{b}" for b in backends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_list_frontends() -> str:
"""List all HAProxy frontends with their status.
Returns:
List of frontends with status and session counts
"""
try:
result = haproxy_cmd("show stat")
frontends = []
for stat in parse_stat_csv(result):
if stat["svname"] == "FRONTEND":
frontends.append(
f"{stat['pxname']}: {stat['status']} (sessions: {stat['scur']})"
)
if not frontends:
return "No frontends found"
return "Frontends:\n" + "\n".join(frontends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_state(backend: str, server: str, state: str) -> str:
"""Set server state. States: ready (enable), drain (graceful shutdown), maint (maintenance/disable)
Args:
backend: Backend name (alphanumeric, underscore, hyphen only)
server: Server name within the backend
state: Target state - ready, drain, or maint
Returns:
Success message or error description
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if state not in ["ready", "drain", "maint"]:
return "Error: state must be 'ready', 'drain', or 'maint'"
try:
result = haproxy_cmd(f"set server {backend}/{server} state {state}")
return result if result else f"Server {backend}/{server} set to {state}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_server_health(backend: str = "") -> str:
"""Get health status of all servers or servers in a specific backend.
Args:
backend: Optional backend name to filter results
Returns:
Server health status or error description
"""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
servers = []
for stat in parse_stat_csv(result):
if stat["svname"] not in ["FRONTEND", "BACKEND", ""]:
if backend and stat["pxname"] != backend:
continue
servers.append(
f"{stat['pxname']}/{stat['svname']}: {stat['status']} "
f"(weight: {stat['weight']}, check: {stat['check_status']})"
)
return "\n".join(servers) if servers else "No servers found"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_weight(backend: str, server: str, weight: int) -> str:
"""Set server weight (0-256). Weight 0 disables the server for new connections.
Args:
backend: Backend name (alphanumeric, underscore, hyphen only)
server: Server name within the backend
weight: Server weight from 0 to 256
Returns:
Success message or error description
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if not (0 <= weight <= 256):
return "Error: weight must be between 0 and 256"
try:
result = haproxy_cmd(f"set server {backend}/{server} weight {weight}")
return result if result else f"Server {backend}/{server} weight set to {weight}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_connections(backend: str = "") -> str:
"""Get active connections per server.
Args:
backend: Optional backend name to filter results
Returns:
Connection statistics or error description
"""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
connections = []
for stat in parse_stat_csv(result):
if backend and stat["pxname"] != backend:
continue
if stat["svname"] in ["FRONTEND", "BACKEND"]:
connections.append(
f"{stat['pxname']} ({stat['svname']}): {stat['scur']} current, {stat['smax']} max"
)
elif stat["svname"]:
connections.append(f" - {stat['svname']}: {stat['scur']} connections")
return "\n".join(connections) if connections else "No connection data"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_reload() -> str:
"""Reload HAProxy configuration (validates config first).
Returns:
Success message or error details if validation/reload failed
"""
success, msg = reload_haproxy()
if not success:
return msg
return "HAProxy configuration reloaded successfully"
@mcp.tool()
def haproxy_check_config() -> str:
"""Validate HAProxy configuration file syntax.
Returns:
Validation result or error details
"""
try:
result = subprocess.run(
["podman", "exec", "haproxy", "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode == 0:
return "Configuration is valid"
return f"Configuration errors:\n{result.stderr}"
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return "Error: podman command not found"
except OSError as e:
return f"Error: OS error: {e}"
@mcp.tool()
def haproxy_save_state() -> str:
"""Save current server state to disk atomically.
Returns:
Success message or error description
"""
try:
state = haproxy_cmd("show servers state")
dir_path = os.path.dirname(STATE_FILE)
fd = None
temp_path = None
try:
fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.servers.state.')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None # fd is now owned by the file object
f.write(state)
os.rename(temp_path, STATE_FILE)
temp_path = None # Rename succeeded, don't unlink
except OSError as e:
raise IOError(f"Failed to save state: {e}") from e
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if temp_path is not None:
try:
os.unlink(temp_path)
except OSError:
pass
return "Server state saved"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_restore_state() -> str:
"""Restore server state from disk.
Returns:
Summary of restored servers or error description
"""
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
state = f.read()
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
restored = 0
skipped = 0
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and not line.startswith("#"):
backend = parts[StateField.BE_NAME]
server = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
port = parts[StateField.SRV_PORT]
# Skip disabled servers
if addr == "0.0.0.0":
continue
# Validate names from state file to prevent injection
if not validate_backend_name(backend) or not validate_backend_name(server):
skipped += 1
continue
# Validate IP and port
if not validate_ip(addr) or not validate_port(port):
skipped += 1
continue
haproxy_cmd(f"set server {backend}/{server} addr {addr} port {port}")
haproxy_cmd(f"set server {backend}/{server} state ready")
restored += 1
result = f"Server state restored ({restored} servers)"
if skipped:
result += f", {skipped} entries skipped due to validation"
return result
except FileNotFoundError:
return "Error: No saved state found"
except HaproxyError as e:
return f"Error: {e}"
def startup_restore() -> None:
"""Restore servers from config file on startup."""
# Wait for HAProxy to be ready
for _ in range(STARTUP_RETRY_COUNT):
try:
haproxy_cmd("show info")
break
except HaproxyError:
time.sleep(1)
else:
logger.warning("HAProxy not ready, skipping restore")
return
try:
count = restore_servers_from_config()
if count > 0:
logger.info("Restored %d servers from config", count)
except (HaproxyError, OSError, ValueError, json.JSONDecodeError) as e:
logger.warning("Failed to restore servers: %s", e)
if __name__ == "__main__":
startup_restore()
mcp.run(transport="streamable-http")