Files
haproxy-mcp/mcp/server.py
kaffa f17b02fddf fix: Auto-restore servers after haproxy_reload
haproxy_reload now automatically restores server configurations from
servers.json after a successful reload, preventing service disruption.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 13:32:03 +00:00

1433 lines
47 KiB
Python

#!/usr/bin/env python3
"""HAProxy MCP Server - Direct Runtime API Integration
This module provides an MCP (Model Context Protocol) server for managing HAProxy
configuration and runtime state. It supports dynamic domain/server management
with HTTP backends (SSL termination at HAProxy frontend).
Environment Variables:
MCP_HOST: Host to bind MCP server (default: 0.0.0.0)
MCP_PORT: Port for MCP server (default: 8000)
HAPROXY_HOST: HAProxy Runtime API host (default: localhost)
HAPROXY_PORT: HAProxy Runtime API port (default: 9999)
HAPROXY_STATE_FILE: Path to server state file
HAPROXY_MAP_FILE: Path to domains.map file
HAPROXY_MAP_FILE_CONTAINER: Container path for domains.map
HAPROXY_SERVERS_FILE: Path to servers.json file
HAPROXY_POOL_COUNT: Number of pool backends (default: 100)
HAPROXY_MAX_SLOTS: Max servers per pool (default: 10)
"""
import socket
import subprocess
import re
import json
import logging
import os
import tempfile
import time
import fcntl
from typing import Any, Dict, Generator, List, Optional, Set, Tuple
from mcp.server.fastmcp import FastMCP
# Configure structured logging
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# MCP Server configuration
MCP_HOST: str = os.getenv("MCP_HOST", "0.0.0.0")
MCP_PORT: int = int(os.getenv("MCP_PORT", "8000"))
mcp = FastMCP("haproxy", host=MCP_HOST, port=MCP_PORT)
# HAProxy Runtime API configuration
HAPROXY_HOST: str = os.getenv("HAPROXY_HOST", "localhost")
HAPROXY_PORT: int = int(os.getenv("HAPROXY_PORT", "9999"))
HAPROXY_SOCKET: Tuple[str, int] = (HAPROXY_HOST, HAPROXY_PORT)
# File paths (configurable via environment)
STATE_FILE: str = os.getenv("HAPROXY_STATE_FILE", "/opt/haproxy/data/servers.state")
MAP_FILE: str = os.getenv("HAPROXY_MAP_FILE", "/opt/haproxy/conf/domains.map")
MAP_FILE_CONTAINER: str = os.getenv("HAPROXY_MAP_FILE_CONTAINER", "/usr/local/etc/haproxy/domains.map")
SERVERS_FILE: str = os.getenv("HAPROXY_SERVERS_FILE", "/opt/haproxy/conf/servers.json")
# Pool configuration
POOL_COUNT: int = int(os.getenv("HAPROXY_POOL_COUNT", "100"))
MAX_SLOTS: int = int(os.getenv("HAPROXY_MAX_SLOTS", "10"))
# Validation patterns - compiled once for performance
DOMAIN_PATTERN = re.compile(
r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?'
r'(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
)
IP_PATTERN = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$')
# Backend/server names: alphanumeric, underscore, hyphen only
BACKEND_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_-]+$')
# Pattern for converting domain to backend name
NON_ALNUM_PATTERN = re.compile(r'[^a-zA-Z0-9]')
# Limits
MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB max response from HAProxy
SUBPROCESS_TIMEOUT = 30 # seconds
STARTUP_RETRY_COUNT = 10 # HAProxy ready check retries
STATE_MIN_COLUMNS = 19 # Minimum columns in HAProxy server state output
SOCKET_TIMEOUT = 5 # seconds for HAProxy socket connection
class HaproxyError(Exception):
"""HAProxy operation error"""
pass
# CSV field indices for HAProxy stats (show stat command)
class StatField:
"""HAProxy CSV stat field indices."""
PXNAME = 0 # Proxy name (frontend/backend)
SVNAME = 1 # Server name (or FRONTEND/BACKEND)
SCUR = 4 # Current sessions
SMAX = 6 # Max sessions
STATUS = 17 # Status (UP/DOWN/MAINT/etc)
WEIGHT = 18 # Server weight
CHECK_STATUS = 36 # Check status
# Field indices for HAProxy server state (show servers state command)
class StateField:
"""HAProxy server state field indices."""
BE_ID = 0 # Backend ID
BE_NAME = 1 # Backend name
SRV_ID = 2 # Server ID
SRV_NAME = 3 # Server name
SRV_ADDR = 4 # Server address
SRV_OP_STATE = 5 # Operational state
SRV_ADMIN_STATE = 6 # Admin state
SRV_PORT = 18 # Server port
def haproxy_cmd(command: str) -> str:
"""Send command to HAProxy Runtime API.
Args:
command: The HAProxy runtime API command to execute
Returns:
The response from HAProxy
Raises:
HaproxyError: If connection fails or response exceeds size limit
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(SOCKET_TIMEOUT)
s.connect(HAPROXY_SOCKET)
s.sendall(f"{command}\n".encode())
s.shutdown(socket.SHUT_WR)
response = b""
while True:
data = s.recv(8192)
if not data:
break
response += data
if len(response) > MAX_RESPONSE_SIZE:
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
return response.decode().strip()
except socket.timeout:
raise HaproxyError("Connection timeout")
except ConnectionRefusedError:
raise HaproxyError("Connection refused - HAProxy not running?")
except UnicodeDecodeError:
raise HaproxyError("Invalid UTF-8 in response")
except HaproxyError:
raise
except Exception as e:
raise HaproxyError(str(e)) from e
def reload_haproxy() -> Tuple[bool, str]:
"""Validate and reload HAProxy configuration.
Returns:
Tuple of (success, message)
"""
try:
validate = subprocess.run(
["podman", "exec", "haproxy", "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if validate.returncode != 0:
return False, f"Config validation failed:\n{validate.stderr}"
result = subprocess.run(
["podman", "kill", "--signal", "USR2", "haproxy"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return False, f"Reload failed: {result.stderr}"
return True, "OK"
except subprocess.TimeoutExpired:
return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return False, "podman command not found"
except OSError as e:
return False, f"OS error: {e}"
def validate_domain(domain: str) -> bool:
"""Validate domain format.
Args:
domain: The domain name to validate
Returns:
True if domain is valid, False otherwise
"""
if not domain or len(domain) > 253:
return False
return bool(DOMAIN_PATTERN.match(domain))
def validate_ip(ip: str, allow_empty: bool = False) -> bool:
"""Validate IPv4 address format.
Args:
ip: The IP address to validate
allow_empty: If True, empty string is considered valid
Returns:
True if IP is valid, False otherwise
"""
if not ip:
return allow_empty
if not IP_PATTERN.match(ip):
return False
return all(0 <= int(octet) <= 255 for octet in ip.split('.'))
def validate_backend_name(name: str) -> bool:
"""Validate backend or server name to prevent command injection.
Args:
name: The backend or server name to validate
Returns:
True if name contains only safe characters
"""
if not name or len(name) > 255:
return False
return bool(BACKEND_NAME_PATTERN.match(name))
def domain_to_backend(domain: str) -> str:
"""Convert domain to backend name (alphanumeric + underscore only).
Args:
domain: The domain name to convert
Returns:
Backend name with non-alphanumeric characters replaced by underscores
Raises:
ValueError: If resulting name is invalid
"""
result = NON_ALNUM_PATTERN.sub('_', domain)
if not validate_backend_name(result):
raise ValueError(f"Invalid backend name after conversion: {result}")
return result
def parse_stat_csv(stat_output: str) -> Generator[Dict[str, str], None, None]:
"""Parse HAProxy stat CSV output into structured data.
Args:
stat_output: Raw output from 'show stat' command
Yields:
Dictionaries with parsed stat fields for each row
"""
for line in stat_output.split("\n"):
if not line or line.startswith("#"):
continue
parts = line.split(",")
if len(parts) > StatField.STATUS:
yield {
"pxname": parts[StatField.PXNAME],
"svname": parts[StatField.SVNAME],
"scur": parts[StatField.SCUR] if len(parts) > StatField.SCUR else "0",
"smax": parts[StatField.SMAX] if len(parts) > StatField.SMAX else "0",
"status": parts[StatField.STATUS],
"weight": parts[StatField.WEIGHT] if len(parts) > StatField.WEIGHT else "0",
"check_status": parts[StatField.CHECK_STATUS] if len(parts) > StatField.CHECK_STATUS else "",
}
def validate_port(port: str) -> bool:
"""Validate port number is in valid range.
Args:
port: Port number as string
Returns:
True if port is valid (1-65535), False otherwise
"""
if not port or not port.isdigit():
return False
port_num = int(port)
return 1 <= port_num <= 65535
def get_map_contents() -> List[Tuple[str, str]]:
"""Read domains.map file and return list of (domain, backend) tuples.
Returns:
List of (domain, backend) tuples from the map file
"""
entries = []
try:
with open(MAP_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2:
entries.append((parts[0], parts[1]))
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
pass
return entries
def find_available_pool() -> Optional[str]:
"""Find first unused pool from pool_1 to pool_{POOL_COUNT}.
Returns:
Pool name (e.g., 'pool_1') if available, None if all pools are used
"""
used_pools: Set[str] = set()
for domain, backend in get_map_contents():
if backend.startswith("pool_"):
used_pools.add(backend)
for i in range(1, POOL_COUNT + 1):
pool_name = f"pool_{i}"
if pool_name not in used_pools:
return pool_name
return None
def get_domain_backend(domain: str) -> Optional[str]:
"""Look up the backend for a domain from domains.map.
Args:
domain: The domain to look up
Returns:
Backend name if found, None otherwise
"""
for map_domain, backend in get_map_contents():
if map_domain == domain:
return backend
return None
def is_legacy_backend(backend: str) -> bool:
"""Check if backend is a legacy static backend (not a pool).
Args:
backend: Backend name to check
Returns:
True if this is a legacy backend, False if it's a pool
"""
return not backend.startswith("pool_")
def get_legacy_backend_name(domain: str) -> str:
"""Convert domain to legacy backend name format.
Args:
domain: Domain name
Returns:
Legacy backend name (e.g., 'api_example_com_backend')
"""
return f"{domain_to_backend(domain)}_backend"
def save_map_file(entries: List[Tuple[str, str]]) -> None:
"""Save entries to domains.map file atomically.
Uses temp file + rename for atomic write to prevent race conditions.
Args:
entries: List of (domain, backend) tuples to write
Raises:
IOError: If the file cannot be written
"""
dir_path = os.path.dirname(MAP_FILE)
fd = None
temp_path = None
try:
fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.domains.map.')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None # fd is now owned by the file object
f.write("# Domain to Backend mapping\n")
f.write("# Format: domain backend_name\n")
f.write("# Wildcard: .domain.com matches *.domain.com\n\n")
for domain, backend in entries:
f.write(f"{domain} {backend}\n")
os.rename(temp_path, MAP_FILE)
temp_path = None # Rename succeeded, don't unlink
except OSError as e:
raise IOError(f"Failed to save map file: {e}") from e
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if temp_path is not None:
try:
os.unlink(temp_path)
except OSError:
pass
def load_servers_config() -> Dict[str, Any]:
"""Load servers configuration from JSON file with file locking.
Returns:
Dictionary with server configurations
"""
try:
with open(SERVERS_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
logger.debug("File locking not supported for %s", SERVERS_FILE)
try:
return json.load(f)
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
return {}
except json.JSONDecodeError as e:
logger.warning("Corrupt config file %s: %s", SERVERS_FILE, e)
return {}
def save_servers_config(config: Dict[str, Any]) -> None:
"""Save servers configuration to JSON file atomically.
Uses temp file + rename for atomic write to prevent race conditions.
Args:
config: Dictionary with server configurations
"""
dir_path = os.path.dirname(SERVERS_FILE)
fd = None
temp_path = None
try:
fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.servers.json.')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None # fd is now owned by the file object
json.dump(config, f, indent=2)
os.rename(temp_path, SERVERS_FILE)
temp_path = None # Rename succeeded, don't unlink
except OSError as e:
raise IOError(f"Failed to save servers config: {e}") from e
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if temp_path is not None:
try:
os.unlink(temp_path)
except OSError:
pass
def add_server_to_config(domain: str, slot: int, ip: str, http_port: int) -> None:
"""Add server configuration to persistent storage.
Args:
domain: Domain name
slot: Server slot (1 to MAX_SLOTS)
ip: Server IP address
http_port: HTTP port
"""
config = load_servers_config()
if domain not in config:
config[domain] = {}
config[domain][str(slot)] = {
"ip": ip,
"http_port": http_port
}
save_servers_config(config)
def remove_server_from_config(domain: str, slot: int) -> None:
"""Remove server configuration from persistent storage.
Args:
domain: Domain name
slot: Server slot to remove
"""
config = load_servers_config()
if domain in config and str(slot) in config[domain]:
del config[domain][str(slot)]
if not config[domain]:
del config[domain]
save_servers_config(config)
def remove_domain_from_config(domain: str) -> None:
"""Remove all server configurations for a domain.
Args:
domain: Domain name to remove
"""
config = load_servers_config()
if domain in config:
del config[domain]
save_servers_config(config)
def get_server_suffixes(http_port: int) -> List[Tuple[str, int]]:
"""Get server suffixes and ports based on port configuration.
Args:
http_port: HTTP port for backend
Returns:
List of (suffix, port) tuples - always HTTP only
"""
return [("", http_port)]
def get_backend_and_prefix(domain: str) -> Tuple[str, str]:
"""Look up backend and determine server name prefix for a domain.
Args:
domain: The domain name to look up
Returns:
Tuple of (backend_name, server_prefix)
Raises:
ValueError: If domain cannot be mapped to a valid backend
"""
backend = get_domain_backend(domain)
if not backend:
backend = get_legacy_backend_name(domain)
if backend.startswith("pool_"):
server_prefix = backend
else:
server_prefix = domain_to_backend(domain)
return backend, server_prefix
def restore_servers_from_config() -> int:
"""Restore all servers from configuration file.
Returns:
Number of servers restored
"""
config = load_servers_config()
restored = 0
for domain, slots in config.items():
backend = get_domain_backend(domain)
if not backend:
continue
try:
_, server_prefix = get_backend_and_prefix(domain)
except ValueError as e:
logger.warning("Invalid domain '%s': %s", domain, e)
continue
for slot_str, server_info in slots.items():
try:
slot = int(slot_str)
except ValueError:
logger.warning("Invalid slot '%s' for %s, skipping", slot_str, domain)
continue
ip = server_info.get("ip", "")
if not ip:
continue
try:
http_port = int(server_info.get("http_port", 80))
except (ValueError, TypeError):
logger.warning("Invalid port for %s slot %d, skipping", domain, slot)
continue
try:
for suffix, port in get_server_suffixes(http_port):
server = f"{server_prefix}{suffix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {backend}/{server} state ready")
restored += 1
except HaproxyError as e:
logger.warning("Failed to restore %s slot %d: %s", domain, slot, e)
return restored
@mcp.tool()
def haproxy_list_domains() -> str:
"""List all configured domains with their backend servers.
Shows all domains mapped in HAProxy with their pool backend and configured servers.
Returns:
List of domains in format: domain -> pool_N (pool): server=ip:port
Example:
# Output:
# • api.example.com -> pool_1 (pool): pool_1_1=10.0.0.1:8080, pool_1_2=10.0.0.2:8080
# • web.example.com -> pool_2 (pool): pool_2_1=10.0.0.3:80
"""
try:
domains = []
state = haproxy_cmd("show servers state")
# Build server map from HAProxy state
server_map: Dict[str, list] = {}
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.SRV_ADDR] != "0.0.0.0":
backend = parts[StateField.BE_NAME]
if backend not in server_map:
server_map[backend] = []
server_map[backend].append(
f"{parts[StateField.SRV_NAME]}={parts[StateField.SRV_ADDR]}:{parts[StateField.SRV_PORT]}"
)
# Read from domains.map (skip wildcard entries starting with .)
seen_domains: Set[str] = set()
for domain, backend in get_map_contents():
if domain.startswith("."):
continue
if domain in seen_domains:
continue
seen_domains.add(domain)
servers = server_map.get(backend, ["(none)"])
backend_type = "pool" if backend.startswith("pool_") else "static"
domains.append(f"{domain} -> {backend} ({backend_type}): {', '.join(servers)}")
return "\n".join(domains) if domains else "No domains configured"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_domain(domain: str, ip: str = "", http_port: int = 80) -> str:
"""Add a new domain to HAProxy using map-based routing (no reload required).
Creates a domain mapping to a pool backend. Each domain can have up to 10
backend servers for load balancing. Use haproxy_add_server to add more servers.
Args:
domain: The domain name to add (e.g., api.example.com)
ip: Optional IP address for initial server; if provided, adds to slot 1
http_port: HTTP port for the backend server (default: 80)
Returns:
Success message or error description
Example:
# Add domain without server (add servers later)
haproxy_add_domain("api.example.com")
# Add domain with initial server
haproxy_add_domain("api.example.com", ip="10.0.0.1", http_port=8080)
"""
# Validate inputs
if not validate_domain(domain):
return "Error: Invalid domain format"
if not validate_ip(ip, allow_empty=True):
return "Error: Invalid IP address format"
if not (1 <= http_port <= 65535):
return "Error: Port must be between 1 and 65535"
# Check if domain already exists
existing_backend = get_domain_backend(domain)
if existing_backend:
return f"Error: Domain {domain} already exists (mapped to {existing_backend})"
# Find available pool
pool = find_available_pool()
if not pool:
return f"Error: No available pools (all {POOL_COUNT} pools are in use)"
try:
# Update HAProxy map via Runtime API first (immediate effect)
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} {domain} {pool}")
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} .{domain} {pool}")
# Read current map entries and save to file (persistence)
entries = get_map_contents()
entries.append((domain, pool))
entries.append((f".{domain}", pool))
save_map_file(entries)
# If IP provided, add server to slot 1
if ip:
for suffix, port in get_server_suffixes(http_port):
server = f"{pool}{suffix}_1"
haproxy_cmd(f"set server {pool}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {pool}/{server} state ready")
# Save to persistent config
add_server_to_config(domain, 1, ip, http_port)
return f"Domain {domain} added to {pool} with server {ip}:{http_port}"
return f"Domain {domain} added to {pool} (no servers configured)"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: Failed to update map file: {e}"
@mcp.tool()
def haproxy_remove_domain(domain: str) -> str:
"""Remove a domain from HAProxy (no reload required for pool-based domains).
Args:
domain: The domain name to remove
Returns:
Success message or error description
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Look up the domain in the map
backend = get_domain_backend(domain)
if not backend:
return f"Error: Domain {domain} not found"
# Check if this is a legacy backend (not a pool)
if is_legacy_backend(backend):
return f"Error: Cannot remove legacy domain {domain} (uses static backend {backend})"
try:
# Clear map entries via Runtime API first (immediate effect)
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} {domain}")
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} .{domain}")
# Remove entries from map file (persistence)
entries = get_map_contents()
new_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"]
save_map_file(new_entries)
# Disable all servers in the pool (reset to 0.0.0.0:0)
for slot in range(1, MAX_SLOTS + 1):
server = f"{backend}_{slot}"
try:
haproxy_cmd(f"set server {backend}/{server} state maint")
haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0")
except HaproxyError:
pass # Ignore errors for individual servers
# Remove from persistent config
remove_domain_from_config(domain)
return f"Domain {domain} removed from {backend}"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: Failed to update map file: {e}"
@mcp.tool()
def haproxy_list_servers(domain: str) -> str:
"""List all servers for a specific domain.
Shows all configured servers with their slot numbers, addresses, and status.
Use this to see which slots are in use before adding or removing servers.
Args:
domain: The domain name to list servers for
Returns:
List of servers with slot number, address (ip:port), and status (UP/DOWN/MAINT)
Example:
haproxy_list_servers("api.example.com")
# Output: pool_1_1: 10.0.0.1:8080 (UP)
# pool_1_2: 10.0.0.2:8080 (UP)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
try:
backend, _ = get_backend_and_prefix(domain)
servers = []
state = haproxy_cmd("show servers state")
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
addr = parts[StateField.SRV_ADDR]
status = "active" if addr != "0.0.0.0" else "disabled"
servers.append(
f"{parts[StateField.SRV_NAME]}: {addr}:{parts[StateField.SRV_PORT]} ({status})"
)
if not servers:
return f"Backend {backend} not found"
return f"Servers for {domain} ({backend}):\n" + "\n".join(servers)
except (HaproxyError, ValueError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_server(domain: str, slot: int, ip: str, http_port: int = 80) -> str:
"""Add a server to a domain's backend at specified slot.
Each domain can have up to 10 servers (slots 1-10) for load balancing.
To add multiple servers, call this with different slot numbers.
HAProxy distributes traffic across all configured servers using round-robin.
Args:
domain: The domain name to add the server to
slot: Server slot number (1-10). Use different slots for multiple servers
ip: IP address of the server (required)
http_port: HTTP port (default: 80)
Returns:
Success message or error description
Example:
# Add two servers for load balancing
haproxy_add_server("api.example.com", slot=1, ip="10.0.0.1", http_port=8080)
haproxy_add_server("api.example.com", slot=2, ip="10.0.0.2", http_port=8080)
# Add a third server
haproxy_add_server("api.example.com", slot=3, ip="10.0.0.3", http_port=8080)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not ip:
return "Error: IP address is required"
if not validate_ip(ip):
return "Error: Invalid IP address format"
if not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}"
if not (1 <= http_port <= 65535):
return "Error: Port must be between 1 and 65535"
try:
backend, server_prefix = get_backend_and_prefix(domain)
results = []
for suffix, port in get_server_suffixes(http_port):
server = f"{server_prefix}{suffix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} addr {ip} port {port}")
haproxy_cmd(f"set server {backend}/{server} state ready")
results.append(f"{server}{ip}:{port}")
# Save to persistent config
add_server_to_config(domain, slot, ip, http_port)
return f"Added to {domain} ({backend}) slot {slot}:\n" + "\n".join(results)
except (HaproxyError, ValueError, IOError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_remove_server(domain: str, slot: int) -> str:
"""Remove a server from a domain's backend at specified slot.
Removes the server from load balancing rotation. Use haproxy_list_servers
to see which slots are in use before removing.
Args:
domain: The domain name to remove the server from
slot: Server slot number (1-10) to remove
Returns:
Success message or error description
Example:
# Remove server at slot 2
haproxy_remove_server("api.example.com", slot=2)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}"
try:
backend, server_prefix = get_backend_and_prefix(domain)
# HTTP only - single server per slot
server = f"{server_prefix}_{slot}"
haproxy_cmd(f"set server {backend}/{server} state maint")
haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0")
# Remove from persistent config
remove_server_from_config(domain, slot)
return f"Removed server at slot {slot} from {domain} ({backend})"
except (HaproxyError, ValueError, IOError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_stats() -> str:
"""Get HAProxy status and statistics.
Returns:
Key HAProxy metrics (name, version, uptime, connections, etc.)
"""
try:
result = haproxy_cmd("show info")
stats = {}
for line in result.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
stats[key.strip()] = value.strip()
important = ["Name", "Version", "Uptime_sec", "CurrConns", "MaxConn", "Run_queue", "Tasks"]
output = []
for key in important:
if key in stats:
output.append(f"{key}: {stats[key]}")
return "\n".join(output) if output else result
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_health() -> str:
"""Check overall system health (MCP server, HAProxy, config files).
Use this for monitoring integration. Returns "healthy" if all components are OK.
Returns:
JSON with:
- status: "healthy" or "unhealthy"
- components.mcp: MCP server status
- components.haproxy: HAProxy connectivity, version, uptime
- components.config_files: map_file and servers_file accessibility
Example:
# Returns: {"status": "healthy", "components": {"mcp": {"status": "ok"}, ...}}
"""
result: Dict[str, Any] = {
"status": "healthy",
"timestamp": time.time(),
"components": {
"mcp": {"status": "ok"},
"haproxy": {"status": "unknown"},
"config_files": {"status": "unknown"}
}
}
# Check HAProxy connectivity
try:
info = haproxy_cmd("show info")
for line in info.split("\n"):
if line.startswith("Version:"):
result["components"]["haproxy"]["version"] = line.split(":", 1)[1].strip()
elif line.startswith("Uptime_sec:"):
result["components"]["haproxy"]["uptime_sec"] = int(line.split(":", 1)[1].strip())
result["components"]["haproxy"]["status"] = "ok"
except HaproxyError as e:
result["components"]["haproxy"]["status"] = "error"
result["components"]["haproxy"]["error"] = str(e)
result["status"] = "degraded"
# Check configuration files
files_ok = True
file_status: Dict[str, str] = {}
for name, path in [("map_file", MAP_FILE), ("servers_file", SERVERS_FILE)]:
if os.path.exists(path):
file_status[name] = "ok"
else:
file_status[name] = "missing"
files_ok = False
result["components"]["config_files"]["files"] = file_status
result["components"]["config_files"]["status"] = "ok" if files_ok else "warning"
if not files_ok:
result["status"] = "degraded"
return json.dumps(result, indent=2)
@mcp.tool()
def haproxy_domain_health(domain: str) -> str:
"""Check health status of backend servers for a specific domain.
Returns detailed health information for all servers in the domain's backend.
Args:
domain: The domain name to check health for
Returns:
JSON with:
- status: "healthy" (all UP), "degraded" (partial UP), "down" (all DOWN), "no_servers"
- servers: list with name, addr, status (UP/DOWN), check_status (L4OK/L4TOUT/L4CON)
- healthy_count/total_count: server counts
Example:
haproxy_domain_health("api.example.com")
# Returns: {"status": "healthy", "servers": [...], "healthy_count": 2, "total_count": 2}
"""
if not validate_domain(domain):
return json.dumps({"error": "Invalid domain format"})
try:
backend, _ = get_backend_and_prefix(domain)
except ValueError as e:
return json.dumps({"error": str(e)})
result: Dict[str, Any] = {
"domain": domain,
"backend": backend,
"status": "unknown",
"servers": [],
"healthy_count": 0,
"total_count": 0
}
try:
# Get server states
state_output = haproxy_cmd("show servers state")
stat_output = haproxy_cmd("show stat")
# Build status map from stat output (has UP/DOWN/MAINT status)
status_map: Dict[str, Dict[str, str]] = {}
for stat in parse_stat_csv(stat_output):
if stat["pxname"] == backend and stat["svname"] not in ["FRONTEND", "BACKEND"]:
status_map[stat["svname"]] = {
"status": stat["status"],
"check_status": stat["check_status"],
"weight": stat["weight"]
}
# Parse server state for address info
for line in state_output.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
server_name = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
port = parts[StateField.SRV_PORT]
# Skip disabled servers (0.0.0.0)
if addr == "0.0.0.0":
continue
server_info: Dict[str, Any] = {
"name": server_name,
"addr": f"{addr}:{port}",
"status": "unknown"
}
# Get status from stat output
if server_name in status_map:
server_info["status"] = status_map[server_name]["status"]
server_info["check_status"] = status_map[server_name]["check_status"]
server_info["weight"] = status_map[server_name]["weight"]
result["servers"].append(server_info)
result["total_count"] += 1
if server_info["status"] == "UP":
result["healthy_count"] += 1
# Determine overall status
if result["total_count"] == 0:
result["status"] = "no_servers"
elif result["healthy_count"] == result["total_count"]:
result["status"] = "healthy"
elif result["healthy_count"] > 0:
result["status"] = "degraded"
else:
result["status"] = "down"
return json.dumps(result, indent=2)
except HaproxyError as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def haproxy_backends() -> str:
"""List all HAProxy backends.
Returns:
List of all configured backend names
"""
try:
result = haproxy_cmd("show backend")
backends = [line for line in result.split("\n") if line and not line.startswith("#")]
return "Backends:\n" + "\n".join(f"{b}" for b in backends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_list_frontends() -> str:
"""List all HAProxy frontends with their status.
Returns:
List of frontends with status and session counts
"""
try:
result = haproxy_cmd("show stat")
frontends = []
for stat in parse_stat_csv(result):
if stat["svname"] == "FRONTEND":
frontends.append(
f"{stat['pxname']}: {stat['status']} (sessions: {stat['scur']})"
)
if not frontends:
return "No frontends found"
return "Frontends:\n" + "\n".join(frontends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_state(backend: str, server: str, state: str) -> str:
"""Set server state for maintenance or traffic control.
Use haproxy_list_servers to get the backend and server names.
Args:
backend: Backend name (e.g., "pool_1" from haproxy_list_servers)
server: Server name (e.g., "pool_1_1" from haproxy_list_servers)
state: Target state:
- ready: Enable server for traffic
- drain: Stop new connections, finish existing (graceful shutdown)
- maint: Disable completely (maintenance mode)
Returns:
Success message or error description
Example:
# Put server in maintenance
haproxy_set_server_state("pool_1", "pool_1_2", "maint")
# Re-enable server
haproxy_set_server_state("pool_1", "pool_1_2", "ready")
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if state not in ["ready", "drain", "maint"]:
return "Error: state must be 'ready', 'drain', or 'maint'"
try:
result = haproxy_cmd(f"set server {backend}/{server} state {state}")
return result if result else f"Server {backend}/{server} set to {state}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_server_health(backend: str = "") -> str:
"""Get health status of all servers (low-level view across all backends).
For domain-specific health, use haproxy_domain_health instead.
Args:
backend: Optional backend name to filter (e.g., "pool_1"). If empty, shows all.
Returns:
List of servers with status: UP (healthy), DOWN (failed), MAINT (maintenance)
Example:
haproxy_get_server_health() # All servers
haproxy_get_server_health("pool_1") # Only pool_1 backend
"""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
servers = []
for stat in parse_stat_csv(result):
if stat["svname"] not in ["FRONTEND", "BACKEND", ""]:
if backend and stat["pxname"] != backend:
continue
servers.append(
f"{stat['pxname']}/{stat['svname']}: {stat['status']} "
f"(weight: {stat['weight']}, check: {stat['check_status']})"
)
return "\n".join(servers) if servers else "No servers found"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_weight(backend: str, server: str, weight: int) -> str:
"""Set server weight for load balancing ratio control.
Higher weight = more traffic. Default is 1. Use 0 to disable without maintenance mode.
Args:
backend: Backend name (e.g., "pool_1")
server: Server name (e.g., "pool_1_1")
weight: Weight 0-256 (0 disables, higher = more traffic)
Returns:
Success message or error description
Example:
# Send 2x traffic to server 1 vs server 2
haproxy_set_server_weight("pool_1", "pool_1_1", 2)
haproxy_set_server_weight("pool_1", "pool_1_2", 1)
# Temporarily disable server (soft disable, not maintenance)
haproxy_set_server_weight("pool_1", "pool_1_3", 0)
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if not (0 <= weight <= 256):
return "Error: weight must be between 0 and 256"
try:
result = haproxy_cmd(f"set server {backend}/{server} weight {weight}")
return result if result else f"Server {backend}/{server} weight set to {weight}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_connections(backend: str = "") -> str:
"""Get active connection counts per server.
Useful for monitoring traffic distribution and identifying hot spots.
Args:
backend: Optional backend name to filter (e.g., "pool_1"). If empty, shows all.
Returns:
List of servers with current active connections
Example:
haproxy_get_connections() # All backends
haproxy_get_connections("pool_1") # Only pool_1
# Output: pool_1/pool_1_1: 5 active connections
"""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
connections = []
for stat in parse_stat_csv(result):
if backend and stat["pxname"] != backend:
continue
if stat["svname"] in ["FRONTEND", "BACKEND"]:
connections.append(
f"{stat['pxname']} ({stat['svname']}): {stat['scur']} current, {stat['smax']} max"
)
elif stat["svname"]:
connections.append(f" - {stat['svname']}: {stat['scur']} connections")
return "\n".join(connections) if connections else "No connection data"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_reload() -> str:
"""Reload HAProxy configuration (validates config first).
After reload, automatically restores server configurations from servers.json.
Returns:
Success message with restored server count, or error details if failed
"""
success, msg = reload_haproxy()
if not success:
return msg
# Restore servers from config after reload
try:
restored = restore_servers_from_config()
return f"HAProxy configuration reloaded successfully ({restored} servers restored)"
except Exception as e:
logger.error("Failed to restore servers after reload: %s", e)
return f"HAProxy reloaded but server restore failed: {e}"
@mcp.tool()
def haproxy_check_config() -> str:
"""Validate HAProxy configuration file syntax.
Returns:
Validation result or error details
"""
try:
result = subprocess.run(
["podman", "exec", "haproxy", "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode == 0:
return "Configuration is valid"
return f"Configuration errors:\n{result.stderr}"
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return "Error: podman command not found"
except OSError as e:
return f"Error: OS error: {e}"
@mcp.tool()
def haproxy_save_state() -> str:
"""Save current server state to disk atomically.
Returns:
Success message or error description
"""
try:
state = haproxy_cmd("show servers state")
dir_path = os.path.dirname(STATE_FILE)
fd = None
temp_path = None
try:
fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.servers.state.')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None # fd is now owned by the file object
f.write(state)
os.rename(temp_path, STATE_FILE)
temp_path = None # Rename succeeded, don't unlink
except OSError as e:
raise IOError(f"Failed to save state: {e}") from e
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if temp_path is not None:
try:
os.unlink(temp_path)
except OSError:
pass
return "Server state saved"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_restore_state() -> str:
"""Restore server state from disk.
Returns:
Summary of restored servers or error description
"""
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
state = f.read()
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
restored = 0
skipped = 0
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and not line.startswith("#"):
backend = parts[StateField.BE_NAME]
server = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
port = parts[StateField.SRV_PORT]
# Skip disabled servers
if addr == "0.0.0.0":
continue
# Validate names from state file to prevent injection
if not validate_backend_name(backend) or not validate_backend_name(server):
skipped += 1
continue
# Validate IP and port
if not validate_ip(addr) or not validate_port(port):
skipped += 1
continue
haproxy_cmd(f"set server {backend}/{server} addr {addr} port {port}")
haproxy_cmd(f"set server {backend}/{server} state ready")
restored += 1
result = f"Server state restored ({restored} servers)"
if skipped:
result += f", {skipped} entries skipped due to validation"
return result
except FileNotFoundError:
return "Error: No saved state found"
except HaproxyError as e:
return f"Error: {e}"
def startup_restore() -> None:
"""Restore servers from config file on startup."""
# Wait for HAProxy to be ready
for _ in range(STARTUP_RETRY_COUNT):
try:
haproxy_cmd("show info")
break
except HaproxyError:
time.sleep(1)
else:
logger.warning("HAProxy not ready, skipping restore")
return
try:
count = restore_servers_from_config()
if count > 0:
logger.info("Restored %d servers from config", count)
except (HaproxyError, OSError, ValueError, json.JSONDecodeError) as e:
logger.warning("Failed to restore servers: %s", e)
if __name__ == "__main__":
startup_restore()
mcp.run(transport="streamable-http")