Files
haproxy-mcp/mcp/server.py
kaffa 913ba0fdca fix: Final round of improvements
HIGH PRIORITY:
1. Pool allocation race condition
   - Add file locking around entire pool allocation in haproxy_add_domain
   - Prevents concurrent calls from getting same pool

2. haproxy_remove_server - disk-first pattern
   - Remove from config FIRST, then update HAProxy
   - Rollback config on HAProxy failure

3. Wildcard domain prefix validation
   - Reject domains starting with '.'
   - Prevents double-prefix like '..domain.com'

MEDIUM PRIORITY:
4. Variable shadowing fix
   - Rename state_output to servers_state in haproxy_set_domain_state

5. JSON size limit
   - Add MAX_SERVERS_JSON_SIZE = 10000 limit for haproxy_add_servers

6. Remove get_server_suffixes
   - Delete unused abstraction layer
   - Inline logic in restore_servers_from_config and haproxy_add_domain

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 14:15:00 +00:00

1875 lines
64 KiB
Python

#!/usr/bin/env python3
"""HAProxy MCP Server - Direct Runtime API Integration
This module provides an MCP (Model Context Protocol) server for managing HAProxy
configuration and runtime state. It supports dynamic domain/server management
with HTTP backends (SSL termination at HAProxy frontend).
Environment Variables:
MCP_HOST: Host to bind MCP server (default: 0.0.0.0)
MCP_PORT: Port for MCP server (default: 8000)
HAPROXY_HOST: HAProxy Runtime API host (default: localhost)
HAPROXY_PORT: HAProxy Runtime API port (default: 9999)
HAPROXY_STATE_FILE: Path to server state file
HAPROXY_MAP_FILE: Path to domains.map file
HAPROXY_MAP_FILE_CONTAINER: Container path for domains.map
HAPROXY_SERVERS_FILE: Path to servers.json file
HAPROXY_POOL_COUNT: Number of pool backends (default: 100)
HAPROXY_MAX_SLOTS: Max servers per pool (default: 10)
"""
import socket
import subprocess
import re
import json
import logging
import os
import select
import tempfile
import time
import fcntl
import ipaddress
from typing import Any, Dict, Generator, List, Optional, Set, Tuple
from mcp.server.fastmcp import FastMCP
# Configure structured logging
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# MCP Server configuration
MCP_HOST: str = os.getenv("MCP_HOST", "0.0.0.0")
MCP_PORT: int = int(os.getenv("MCP_PORT", "8000"))
mcp = FastMCP("haproxy", host=MCP_HOST, port=MCP_PORT)
# HAProxy Runtime API configuration
HAPROXY_HOST: str = os.getenv("HAPROXY_HOST", "localhost")
HAPROXY_PORT: int = int(os.getenv("HAPROXY_PORT", "9999"))
HAPROXY_SOCKET: Tuple[str, int] = (HAPROXY_HOST, HAPROXY_PORT)
# File paths (configurable via environment)
STATE_FILE: str = os.getenv("HAPROXY_STATE_FILE", "/opt/haproxy/data/servers.state")
MAP_FILE: str = os.getenv("HAPROXY_MAP_FILE", "/opt/haproxy/conf/domains.map")
MAP_FILE_CONTAINER: str = os.getenv("HAPROXY_MAP_FILE_CONTAINER", "/usr/local/etc/haproxy/domains.map")
SERVERS_FILE: str = os.getenv("HAPROXY_SERVERS_FILE", "/opt/haproxy/conf/servers.json")
# Pool configuration
POOL_COUNT: int = int(os.getenv("HAPROXY_POOL_COUNT", "100"))
MAX_SLOTS: int = int(os.getenv("HAPROXY_MAX_SLOTS", "10"))
# Container configuration
HAPROXY_CONTAINER: str = os.getenv("HAPROXY_CONTAINER", "haproxy")
# Validation patterns - compiled once for performance
DOMAIN_PATTERN = re.compile(
r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?'
r'(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$'
)
# Backend/server names: alphanumeric, underscore, hyphen only
BACKEND_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_-]+$')
# Pattern for converting domain to backend name
NON_ALNUM_PATTERN = re.compile(r'[^a-zA-Z0-9]')
# Limits
MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB max response from HAProxy
SUBPROCESS_TIMEOUT = 30 # seconds
STARTUP_RETRY_COUNT = 10 # HAProxy ready check retries
STATE_MIN_COLUMNS = 19 # Minimum columns in HAProxy server state output
SOCKET_TIMEOUT = 5 # seconds for HAProxy socket connection
SOCKET_RECV_TIMEOUT = 30 # seconds for HAProxy socket recv loop
MAX_BULK_SERVERS = 10 # Max servers per bulk add call
MAX_SERVERS_JSON_SIZE = 10000 # Max size of servers JSON in haproxy_add_servers
class HaproxyError(Exception):
"""HAProxy operation error"""
pass
class NoAvailablePoolError(HaproxyError):
"""All pool backends are in use."""
pass
# CSV field indices for HAProxy stats (show stat command)
class StatField:
"""HAProxy CSV stat field indices."""
PXNAME = 0 # Proxy name (frontend/backend)
SVNAME = 1 # Server name (or FRONTEND/BACKEND)
SCUR = 4 # Current sessions
SMAX = 6 # Max sessions
STATUS = 17 # Status (UP/DOWN/MAINT/etc)
WEIGHT = 18 # Server weight
CHECK_STATUS = 36 # Check status
# Field indices for HAProxy server state (show servers state command)
class StateField:
"""HAProxy server state field indices."""
BE_ID = 0 # Backend ID
BE_NAME = 1 # Backend name
SRV_ID = 2 # Server ID
SRV_NAME = 3 # Server name
SRV_ADDR = 4 # Server address
SRV_OP_STATE = 5 # Operational state
SRV_ADMIN_STATE = 6 # Admin state
SRV_PORT = 18 # Server port
def haproxy_cmd(command: str) -> str:
"""Send command to HAProxy Runtime API.
Args:
command: The HAProxy runtime API command to execute
Returns:
The response from HAProxy
Raises:
HaproxyError: If connection fails, times out, or response exceeds size limit
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(SOCKET_TIMEOUT)
s.connect(HAPROXY_SOCKET)
s.sendall(f"{command}\n".encode())
s.shutdown(socket.SHUT_WR)
# Set socket to non-blocking for select-based recv loop
s.setblocking(False)
response = b""
start_time = time.time()
while True:
# Check for overall timeout
elapsed = time.time() - start_time
if elapsed >= SOCKET_RECV_TIMEOUT:
raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds")
# Wait for data with timeout (remaining time)
remaining = SOCKET_RECV_TIMEOUT - elapsed
ready, _, _ = select.select([s], [], [], min(remaining, 1.0))
if ready:
data = s.recv(8192)
if not data:
break
response += data
if len(response) > MAX_RESPONSE_SIZE:
raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit")
return response.decode().strip()
except socket.timeout:
raise HaproxyError("Connection timeout")
except ConnectionRefusedError:
raise HaproxyError("Connection refused - HAProxy not running?")
except UnicodeDecodeError:
raise HaproxyError("Invalid UTF-8 in response")
except HaproxyError:
raise
except Exception as e:
raise HaproxyError(str(e)) from e
def haproxy_cmd_checked(command: str) -> str:
"""Send command to HAProxy and raise on error response.
Args:
command: HAProxy command to execute
Returns:
Command response
Raises:
HaproxyError: If HAProxy returns an error message
"""
result = haproxy_cmd(command)
# HAProxy returns empty string on success, error messages on failure
error_indicators = ["No such", "not found", "error", "failed", "invalid", "unknown"]
if result:
result_lower = result.lower()
for indicator in error_indicators:
if indicator.lower() in result_lower:
raise HaproxyError(f"HAProxy command failed: {result.strip()}")
return result
def reload_haproxy() -> Tuple[bool, str]:
"""Validate and reload HAProxy configuration.
Returns:
Tuple of (success, message)
"""
try:
validate = subprocess.run(
["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if validate.returncode != 0:
return False, f"Config validation failed:\n{validate.stderr}"
result = subprocess.run(
["podman", "kill", "--signal", "USR2", HAPROXY_CONTAINER],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode != 0:
return False, f"Reload failed: {result.stderr}"
return True, "OK"
except subprocess.TimeoutExpired:
return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return False, "podman command not found"
except OSError as e:
return False, f"OS error: {e}"
def validate_domain(domain: str) -> bool:
"""Validate domain format.
Args:
domain: The domain name to validate
Returns:
True if domain is valid, False otherwise
"""
if not domain or len(domain) > 253:
return False
return bool(DOMAIN_PATTERN.match(domain))
def validate_ip(ip: str, allow_empty: bool = False) -> bool:
"""Validate IPv4 or IPv6 address format.
Args:
ip: The IP address to validate
allow_empty: If True, empty string is considered valid
Returns:
True if IP is valid, False otherwise
"""
if not ip:
return allow_empty
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def validate_backend_name(name: str) -> bool:
"""Validate backend or server name to prevent command injection.
Args:
name: The backend or server name to validate
Returns:
True if name contains only safe characters
"""
if not name or len(name) > 255:
return False
return bool(BACKEND_NAME_PATTERN.match(name))
def domain_to_backend(domain: str) -> str:
"""Convert domain to backend name (alphanumeric + underscore only).
Args:
domain: The domain name to convert
Returns:
Backend name with non-alphanumeric characters replaced by underscores
Raises:
ValueError: If resulting name is invalid
"""
result = NON_ALNUM_PATTERN.sub('_', domain)
if not validate_backend_name(result):
raise ValueError(f"Invalid backend name after conversion: {result}")
return result
def parse_stat_csv(stat_output: str) -> Generator[Dict[str, str], None, None]:
"""Parse HAProxy stat CSV output into structured data.
Args:
stat_output: Raw output from 'show stat' command
Yields:
Dictionaries with parsed stat fields for each row
"""
for line in stat_output.split("\n"):
if not line or line.startswith("#"):
continue
parts = line.split(",")
if len(parts) > StatField.STATUS:
yield {
"pxname": parts[StatField.PXNAME],
"svname": parts[StatField.SVNAME],
"scur": parts[StatField.SCUR] if len(parts) > StatField.SCUR else "0",
"smax": parts[StatField.SMAX] if len(parts) > StatField.SMAX else "0",
"status": parts[StatField.STATUS],
"weight": parts[StatField.WEIGHT] if len(parts) > StatField.WEIGHT else "0",
"check_status": parts[StatField.CHECK_STATUS] if len(parts) > StatField.CHECK_STATUS else "",
}
def validate_port(port: str) -> bool:
"""Validate port number is in valid range.
Args:
port: Port number as string
Returns:
True if port is valid (1-65535), False otherwise
"""
if not port or not port.isdigit():
return False
port_num = int(port)
return 1 <= port_num <= 65535
def get_map_contents() -> List[Tuple[str, str]]:
"""Read domains.map file and return list of (domain, backend) tuples.
Returns:
List of (domain, backend) tuples from the map file
"""
entries = []
try:
with open(MAP_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split()
if len(parts) >= 2:
entries.append((parts[0], parts[1]))
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
pass
return entries
def find_available_pool() -> str:
"""Find first unused pool from pool_1 to pool_{POOL_COUNT}.
Returns:
Pool name (e.g., 'pool_1') if available
Raises:
NoAvailablePoolError: If all pools are in use
"""
used_pools: Set[str] = set()
for domain, backend in get_map_contents():
if backend.startswith("pool_"):
used_pools.add(backend)
for i in range(1, POOL_COUNT + 1):
pool_name = f"pool_{i}"
if pool_name not in used_pools:
return pool_name
raise NoAvailablePoolError(f"All {POOL_COUNT} pool backends are in use")
def get_domain_backend(domain: str) -> Optional[str]:
"""Look up the backend for a domain from domains.map.
Args:
domain: The domain to look up
Returns:
Backend name if found, None otherwise
"""
for map_domain, backend in get_map_contents():
if map_domain == domain:
return backend
return None
def is_legacy_backend(backend: str) -> bool:
"""Check if backend is a legacy static backend (not a pool).
Args:
backend: Backend name to check
Returns:
True if this is a legacy backend, False if it's a pool
"""
return not backend.startswith("pool_")
def get_legacy_backend_name(domain: str) -> str:
"""Convert domain to legacy backend name format.
Args:
domain: Domain name
Returns:
Legacy backend name (e.g., 'api_example_com_backend')
"""
return f"{domain_to_backend(domain)}_backend"
def atomic_write_file(file_path: str, content: str) -> None:
"""Write content to file atomically using temp file + rename.
Args:
file_path: Target file path
content: Content to write
Raises:
IOError: If write fails
"""
dir_path = os.path.dirname(file_path)
fd = None
temp_path = None
try:
fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.tmp.')
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None # fd is now owned by the file object
f.write(content)
os.rename(temp_path, file_path)
temp_path = None # Rename succeeded
except OSError as e:
raise IOError(f"Failed to write {file_path}: {e}") from e
finally:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
if temp_path is not None:
try:
os.unlink(temp_path)
except OSError:
pass
def save_map_file(entries: List[Tuple[str, str]]) -> None:
"""Save entries to domains.map file atomically.
Uses temp file + rename for atomic write to prevent race conditions.
Args:
entries: List of (domain, backend) tuples to write
Raises:
IOError: If the file cannot be written
"""
lines = [
"# Domain to Backend mapping\n",
"# Format: domain backend_name\n",
"# Wildcard: .domain.com matches *.domain.com\n\n",
]
for domain, backend in entries:
lines.append(f"{domain} {backend}\n")
atomic_write_file(MAP_FILE, "".join(lines))
def load_servers_config() -> Dict[str, Any]:
"""Load servers configuration from JSON file with file locking.
Returns:
Dictionary with server configurations
"""
try:
with open(SERVERS_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
logger.debug("File locking not supported for %s", SERVERS_FILE)
try:
return json.load(f)
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
except FileNotFoundError:
return {}
except json.JSONDecodeError as e:
logger.warning("Corrupt config file %s: %s", SERVERS_FILE, e)
return {}
def save_servers_config(config: Dict[str, Any]) -> None:
"""Save servers configuration to JSON file atomically.
Uses temp file + rename for atomic write to prevent race conditions.
Args:
config: Dictionary with server configurations
"""
atomic_write_file(SERVERS_FILE, json.dumps(config, indent=2))
def add_server_to_config(domain: str, slot: int, ip: str, http_port: int) -> None:
"""Add server configuration to persistent storage with file locking.
Args:
domain: Domain name
slot: Server slot (1 to MAX_SLOTS)
ip: Server IP address
http_port: HTTP port
"""
lock_path = f"{SERVERS_FILE}.lock"
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
config = load_servers_config()
if domain not in config:
config[domain] = {}
config[domain][str(slot)] = {"ip": ip, "http_port": http_port}
save_servers_config(config)
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def remove_server_from_config(domain: str, slot: int) -> None:
"""Remove server configuration from persistent storage with file locking.
Args:
domain: Domain name
slot: Server slot to remove
"""
lock_path = f"{SERVERS_FILE}.lock"
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
config = load_servers_config()
if domain in config and str(slot) in config[domain]:
del config[domain][str(slot)]
if not config[domain]:
del config[domain]
save_servers_config(config)
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def remove_domain_from_config(domain: str) -> None:
"""Remove domain from persistent config with file locking.
Args:
domain: Domain name to remove
"""
lock_path = f"{SERVERS_FILE}.lock"
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
config = load_servers_config()
if domain in config:
del config[domain]
save_servers_config(config)
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def configure_server_slot(backend: str, server_prefix: str, slot: int, ip: str, http_port: int) -> str:
"""Configure a server slot in HAProxy.
Args:
backend: Backend name (e.g., 'pool_1')
server_prefix: Server name prefix (e.g., 'pool_1')
slot: Slot number (1-10)
ip: Server IP address
http_port: HTTP port
Returns:
Server name that was configured
Raises:
HaproxyError: If HAProxy command fails
"""
server = f"{server_prefix}_{slot}"
haproxy_cmd_checked(f"set server {backend}/{server} addr {ip} port {http_port}")
haproxy_cmd_checked(f"set server {backend}/{server} state ready")
return server
def get_backend_and_prefix(domain: str) -> Tuple[str, str]:
"""Look up backend and determine server name prefix for a domain.
Args:
domain: The domain name to look up
Returns:
Tuple of (backend_name, server_prefix)
Raises:
ValueError: If domain cannot be mapped to a valid backend
"""
backend = get_domain_backend(domain)
if not backend:
backend = get_legacy_backend_name(domain)
if backend.startswith("pool_"):
server_prefix = backend
else:
server_prefix = domain_to_backend(domain)
return backend, server_prefix
def restore_servers_from_config() -> int:
"""Restore all servers from configuration file.
Returns:
Number of servers restored
"""
config = load_servers_config()
restored = 0
for domain, slots in config.items():
backend = get_domain_backend(domain)
if not backend:
continue
try:
_, server_prefix = get_backend_and_prefix(domain)
except ValueError as e:
logger.warning("Invalid domain '%s': %s", domain, e)
continue
for slot_str, server_info in slots.items():
try:
slot = int(slot_str)
except ValueError:
logger.warning("Invalid slot '%s' for %s, skipping", slot_str, domain)
continue
ip = server_info.get("ip", "")
if not ip:
continue
try:
port = int(server_info.get("http_port", 80))
except (ValueError, TypeError):
logger.warning("Invalid port for %s slot %d, skipping", domain, slot)
continue
server = f"{server_prefix}_{slot}"
try:
haproxy_cmd_checked(f"set server {backend}/{server} addr {ip} port {port}")
haproxy_cmd_checked(f"set server {backend}/{server} state ready")
restored += 1
except HaproxyError as e:
logger.warning("Failed to restore %s/%s: %s", backend, server, e)
return restored
@mcp.tool()
def haproxy_list_domains(include_wildcards: bool = False) -> str:
"""List all configured domains with their backend servers.
Shows all domains mapped in HAProxy with their pool backend and configured servers.
Args:
include_wildcards: If True, also show wildcard domain mappings (entries starting
with '.', e.g., '.example.com' which matches '*.example.com').
Default is False to show only explicit domain mappings.
Returns:
List of domains in format: domain -> pool_N (pool): server=ip:port
Example:
# Output:
# • api.example.com -> pool_1 (pool): pool_1_1=10.0.0.1:8080, pool_1_2=10.0.0.2:8080
# • web.example.com -> pool_2 (pool): pool_2_1=10.0.0.3:80
# With include_wildcards=True:
# • api.example.com -> pool_1 (pool): pool_1_1=10.0.0.1:8080
# • .api.example.com -> pool_1 (wildcard): pool_1_1=10.0.0.1:8080
"""
try:
domains = []
state = haproxy_cmd("show servers state")
# Build server map from HAProxy state
server_map: Dict[str, list] = {}
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.SRV_ADDR] != "0.0.0.0":
backend = parts[StateField.BE_NAME]
if backend not in server_map:
server_map[backend] = []
server_map[backend].append(
f"{parts[StateField.SRV_NAME]}={parts[StateField.SRV_ADDR]}:{parts[StateField.SRV_PORT]}"
)
# Read from domains.map
seen_domains: Set[str] = set()
for domain, backend in get_map_contents():
# Skip wildcard entries unless explicitly requested
if domain.startswith(".") and not include_wildcards:
continue
if domain in seen_domains:
continue
seen_domains.add(domain)
servers = server_map.get(backend, ["(none)"])
if domain.startswith("."):
backend_type = "wildcard"
elif backend.startswith("pool_"):
backend_type = "pool"
else:
backend_type = "static"
domains.append(f"{domain} -> {backend} ({backend_type}): {', '.join(servers)}")
return "\n".join(domains) if domains else "No domains configured"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_domain(domain: str, ip: str = "", http_port: int = 80) -> str:
"""Add a new domain to HAProxy using map-based routing (no reload required).
Creates a domain mapping to a pool backend. Each domain can have up to 10
backend servers for load balancing. Use haproxy_add_server to add more servers.
Args:
domain: The domain name to add (e.g., api.example.com)
ip: Optional IP address for initial server; if provided, adds to slot 1
http_port: HTTP port for the backend server (default: 80)
Returns:
Success message or error description
Example:
# Add domain without server (add servers later)
haproxy_add_domain("api.example.com")
# Add domain with initial server
haproxy_add_domain("api.example.com", ip="10.0.0.1", http_port=8080)
"""
# Validate inputs
if domain.startswith("."):
return "Error: Domain cannot start with '.' (wildcard entries are added automatically)"
if not validate_domain(domain):
return "Error: Invalid domain format"
if not validate_ip(ip, allow_empty=True):
return "Error: Invalid IP address format"
if not (1 <= http_port <= 65535):
return "Error: Port must be between 1 and 65535"
# Use file locking for the entire pool allocation operation
lock_path = f"{MAP_FILE}.lock"
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
try:
# Read map contents once for both existence check and pool lookup
entries = get_map_contents()
# Check if domain already exists (using cached entries)
for domain_entry, backend in entries:
if domain_entry == domain:
return f"Error: Domain {domain} already exists (mapped to {backend})"
# Find available pool (using cached entries)
used_pools: Set[str] = set()
for _, backend in entries:
if backend.startswith("pool_"):
used_pools.add(backend)
pool = None
for i in range(1, POOL_COUNT + 1):
pool_name = f"pool_{i}"
if pool_name not in used_pools:
pool = pool_name
break
if not pool:
return f"Error: All {POOL_COUNT} pool backends are in use"
try:
# Save to disk first (atomic write for persistence)
# If HAProxy update fails after this, state will be correct on restart
# Note: We already have 'entries' from the map contents read above
entries.append((domain, pool))
entries.append((f".{domain}", pool))
try:
save_map_file(entries)
except IOError as e:
return f"Error: Failed to save map file: {e}"
# Then update HAProxy map via Runtime API
try:
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} {domain} {pool}")
haproxy_cmd(f"add map {MAP_FILE_CONTAINER} .{domain} {pool}")
except HaproxyError as e:
# Rollback: remove the domain we just added from entries and re-save
rollback_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"]
try:
save_map_file(rollback_entries)
except IOError:
logger.error("Failed to rollback map file after HAProxy error")
return f"Error: Failed to update HAProxy map: {e}"
# If IP provided, add server to slot 1
if ip:
# Save server config to disk first
add_server_to_config(domain, 1, ip, http_port)
try:
server = f"{pool}_1"
haproxy_cmd(f"set server {pool}/{server} addr {ip} port {http_port}")
haproxy_cmd(f"set server {pool}/{server} state ready")
except HaproxyError as e:
# Rollback server config on failure
remove_server_from_config(domain, 1)
return f"Domain {domain} added to {pool} but server config failed: {e}"
return f"Domain {domain} added to {pool} with server {ip}:{http_port}"
return f"Domain {domain} added to {pool} (no servers configured)"
except HaproxyError as e:
return f"Error: {e}"
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
@mcp.tool()
def haproxy_remove_domain(domain: str) -> str:
"""Remove a domain from HAProxy (no reload required for pool-based domains).
Args:
domain: The domain name to remove
Returns:
Success message or error description
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Look up the domain in the map
backend = get_domain_backend(domain)
if not backend:
return f"Error: Domain {domain} not found"
# Check if this is a legacy backend (not a pool)
if is_legacy_backend(backend):
return f"Error: Cannot remove legacy domain {domain} (uses static backend {backend})"
try:
# Save to disk first (atomic write for persistence)
# If HAProxy update fails after this, state will be correct on restart
entries = get_map_contents()
new_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"]
save_map_file(new_entries)
# Remove from persistent server config
remove_domain_from_config(domain)
# Clear map entries via Runtime API (immediate effect)
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} {domain}")
try:
haproxy_cmd(f"del map {MAP_FILE_CONTAINER} .{domain}")
except HaproxyError as e:
logger.warning("Failed to remove wildcard entry for %s: %s", domain, e)
# Disable all servers in the pool (reset to 0.0.0.0:0)
for slot in range(1, MAX_SLOTS + 1):
server = f"{backend}_{slot}"
try:
haproxy_cmd(f"set server {backend}/{server} state maint")
haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0")
except HaproxyError as e:
logger.warning(
"Failed to clear server %s/%s for domain %s: %s",
backend, server, domain, e
)
# Continue with remaining cleanup
return f"Domain {domain} removed from {backend}"
except IOError as e:
return f"Error: Failed to update map file: {e}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_list_servers(domain: str) -> str:
"""List all servers for a specific domain.
Shows all configured servers with their slot numbers, addresses, and status.
Use this to see which slots are in use before adding or removing servers.
Args:
domain: The domain name to list servers for
Returns:
List of servers with slot number, address (ip:port), and status (UP/DOWN/MAINT)
Example:
haproxy_list_servers("api.example.com")
# Output: pool_1_1: 10.0.0.1:8080 (UP)
# pool_1_2: 10.0.0.2:8080 (UP)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
try:
backend, _ = get_backend_and_prefix(domain)
servers = []
state = haproxy_cmd("show servers state")
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
addr = parts[StateField.SRV_ADDR]
status = "active" if addr != "0.0.0.0" else "disabled"
servers.append(
f"{parts[StateField.SRV_NAME]}: {addr}:{parts[StateField.SRV_PORT]} ({status})"
)
if not servers:
return f"Backend {backend} not found"
return f"Servers for {domain} ({backend}):\n" + "\n".join(servers)
except (HaproxyError, ValueError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_server(domain: str, slot: int, ip: str, http_port: int = 80) -> str:
"""Add a server to a domain's backend at specified slot.
Each domain can have up to 10 servers (slots 1-10) for load balancing.
To add multiple servers, call this with different slot numbers.
HAProxy distributes traffic across all configured servers using round-robin.
Args:
domain: The domain name to add the server to
slot: Server slot number (1-10), or 0/-1 for auto-select first available slot
ip: IP address of the server (required)
http_port: HTTP port (default: 80)
Returns:
Success message or error description
Example:
# Add two servers for load balancing
haproxy_add_server("api.example.com", slot=1, ip="10.0.0.1", http_port=8080)
haproxy_add_server("api.example.com", slot=2, ip="10.0.0.2", http_port=8080)
# Add a third server
haproxy_add_server("api.example.com", slot=3, ip="10.0.0.3", http_port=8080)
# Auto-select next available slot
haproxy_add_server("api.example.com", slot=0, ip="10.0.0.4", http_port=8080)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not ip:
return "Error: IP address is required"
if not validate_ip(ip):
return "Error: Invalid IP address format"
if not (1 <= http_port <= 65535):
return "Error: Port must be between 1 and 65535"
try:
backend, server_prefix = get_backend_and_prefix(domain)
# Auto-select slot if slot <= 0
if slot <= 0:
state = haproxy_cmd("show servers state")
used_slots: Set[int] = set()
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
if parts[StateField.SRV_ADDR] != "0.0.0.0":
# Extract slot number from server name (e.g., pool_1_3 -> 3)
server_name = parts[StateField.SRV_NAME]
try:
used_slots.add(int(server_name.rsplit("_", 1)[1]))
except (ValueError, IndexError):
pass
for s in range(1, MAX_SLOTS + 1):
if s not in used_slots:
slot = s
break
else:
return f"Error: No available slots (all {MAX_SLOTS} slots in use)"
elif not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}, or 0/-1 for auto-select"
# Save to persistent config FIRST (disk-first pattern)
add_server_to_config(domain, slot, ip, http_port)
try:
server = configure_server_slot(backend, server_prefix, slot, ip, http_port)
return f"Added to {domain} ({backend}) slot {slot}:\n{server}{ip}:{http_port}"
except HaproxyError as e:
# Rollback config on HAProxy failure
remove_server_from_config(domain, slot)
return f"Error: {e}"
except (ValueError, IOError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_add_servers(domain: str, servers: str) -> str:
"""Add multiple servers to a domain's backend at once.
More efficient than calling haproxy_add_server multiple times.
All servers are validated before any are added.
Args:
domain: The domain name to add servers to
servers: JSON array of server configs. Each object can have:
- slot (required): Server slot number (1-10)
- ip (required): IP address of the server
- http_port (optional): HTTP port (default: 80)
Example: '[{"slot":1,"ip":"10.0.0.1","http_port":80},{"slot":2,"ip":"10.0.0.2"}]'
Returns:
Summary of added servers or errors for each failed server
Example:
haproxy_add_servers("api.example.com", '[{"slot":1,"ip":"10.0.0.1"},{"slot":2,"ip":"10.0.0.2"}]')
# Output: Added 2 servers to api.example.com (pool_1):
# • slot 1: 10.0.0.1:80
# • slot 2: 10.0.0.2:80
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
# Check JSON size before parsing
if len(servers) > MAX_SERVERS_JSON_SIZE:
return f"Error: servers JSON exceeds maximum size ({MAX_SERVERS_JSON_SIZE} bytes)"
# Parse JSON array
try:
server_list = json.loads(servers)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON - {e}"
if not isinstance(server_list, list):
return "Error: servers must be a JSON array"
if not server_list:
return "Error: servers array is empty"
if len(server_list) > MAX_BULK_SERVERS:
return f"Error: Cannot add more than {MAX_BULK_SERVERS} servers at once"
# Validate all servers first before adding any
validated_servers = []
validation_errors = []
for i, srv in enumerate(server_list):
if not isinstance(srv, dict):
validation_errors.append(f"Server {i+1}: must be an object")
continue
# Extract and validate slot
slot = srv.get("slot")
if slot is None:
validation_errors.append(f"Server {i+1}: missing 'slot' field")
continue
try:
slot = int(slot)
except (ValueError, TypeError):
validation_errors.append(f"Server {i+1}: slot must be an integer")
continue
if not (1 <= slot <= MAX_SLOTS):
validation_errors.append(f"Server {i+1}: slot must be between 1 and {MAX_SLOTS}")
continue
# Extract and validate IP
ip = srv.get("ip")
if not ip:
validation_errors.append(f"Server {i+1}: missing 'ip' field")
continue
if not validate_ip(ip):
validation_errors.append(f"Server {i+1}: invalid IP address '{ip}'")
continue
# Extract and validate port
http_port = srv.get("http_port", 80)
try:
http_port = int(http_port)
except (ValueError, TypeError):
validation_errors.append(f"Server {i+1}: http_port must be an integer")
continue
if not (1 <= http_port <= 65535):
validation_errors.append(f"Server {i+1}: port must be between 1 and 65535")
continue
validated_servers.append({"slot": slot, "ip": ip, "http_port": http_port})
# Return validation errors if any
if validation_errors:
return "Validation errors:\n" + "\n".join(f"{e}" for e in validation_errors)
# Check for duplicate slots
slots = [s["slot"] for s in validated_servers]
if len(slots) != len(set(slots)):
return "Error: Duplicate slot numbers in servers array"
# Get backend info
try:
backend, server_prefix = get_backend_and_prefix(domain)
except ValueError as e:
return f"Error: {e}"
# Save ALL servers to config FIRST (disk-first pattern)
for server_config in validated_servers:
slot = server_config["slot"]
ip = server_config["ip"]
http_port = server_config["http_port"]
add_server_to_config(domain, slot, ip, http_port)
# Then update HAProxy
added = []
errors = []
failed_slots = []
try:
for server_config in validated_servers:
slot = server_config["slot"]
ip = server_config["ip"]
http_port = server_config["http_port"]
try:
configure_server_slot(backend, server_prefix, slot, ip, http_port)
added.append(f"slot {slot}: {ip}:{http_port}")
except HaproxyError as e:
failed_slots.append(slot)
errors.append(f"slot {slot}: {e}")
except Exception as e:
# Rollback all saved configs on unexpected error
for server_config in validated_servers:
remove_server_from_config(domain, server_config["slot"])
return f"Error: {e}"
# Rollback failed slots from config
for slot in failed_slots:
remove_server_from_config(domain, slot)
# Build result message
result_parts = []
if added:
result_parts.append(f"Added {len(added)} servers to {domain} ({backend}):")
result_parts.extend(f"{s}" for s in added)
if errors:
result_parts.append(f"Failed to add {len(errors)} servers:")
result_parts.extend(f"{e}" for e in errors)
return "\n".join(result_parts) if result_parts else "No servers added"
@mcp.tool()
def haproxy_remove_server(domain: str, slot: int) -> str:
"""Remove a server from a domain's backend at specified slot.
Removes the server from load balancing rotation. Use haproxy_list_servers
to see which slots are in use before removing.
Args:
domain: The domain name to remove the server from
slot: Server slot number (1-10) to remove
Returns:
Success message or error description
Example:
# Remove server at slot 2
haproxy_remove_server("api.example.com", slot=2)
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not (1 <= slot <= MAX_SLOTS):
return f"Error: Slot must be between 1 and {MAX_SLOTS}"
try:
backend, server_prefix = get_backend_and_prefix(domain)
# Get current server info for potential rollback
config = load_servers_config()
old_config = config.get(domain, {}).get(str(slot), {})
# Remove from persistent config FIRST (disk-first pattern)
remove_server_from_config(domain, slot)
try:
# HTTP only - single server per slot
server = f"{server_prefix}_{slot}"
haproxy_cmd_checked(f"set server {backend}/{server} state maint")
haproxy_cmd_checked(f"set server {backend}/{server} addr 0.0.0.0 port 0")
return f"Removed server at slot {slot} from {domain} ({backend})"
except HaproxyError as e:
# Rollback: re-add config if HAProxy command failed
if old_config:
add_server_to_config(domain, slot, old_config.get("ip", ""), old_config.get("http_port", 80))
return f"Error: {e}"
except (ValueError, IOError) as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_domain_state(domain: str, state: str) -> str:
"""Set state for all servers of a domain at once.
Useful for maintenance windows or deployments - set all servers to drain/maint
before updates, then back to ready after.
Args:
domain: The domain name
state: Target state - ready, drain, or maint
Returns:
Summary of state changes or error
Example:
# Put all servers in maintenance for deployment
haproxy_set_domain_state("api.example.com", "maint")
# Re-enable all servers after deployment
haproxy_set_domain_state("api.example.com", "ready")
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if state not in ["ready", "drain", "maint"]:
return "Error: State must be 'ready', 'drain', or 'maint'"
try:
backend, server_prefix = get_backend_and_prefix(domain)
except ValueError as e:
return f"Error: {e}"
# Get active servers for this domain
try:
servers_state = haproxy_cmd("show servers state")
except HaproxyError as e:
return f"Error: {e}"
changed = []
errors = []
for line in servers_state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
server_name = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
# Only change state for configured servers (not 0.0.0.0)
if addr != "0.0.0.0":
try:
haproxy_cmd_checked(f"set server {backend}/{server_name} state {state}")
changed.append(server_name)
except HaproxyError as e:
errors.append(f"{server_name}: {e}")
if not changed and not errors:
return f"No active servers found for {domain}"
result = f"Set {len(changed)} servers to '{state}' for {domain}"
if changed:
result += ":\n" + "\n".join(f"{s}" for s in changed)
if errors:
result += f"\n\nErrors ({len(errors)}):\n" + "\n".join(f"{e}" for e in errors)
return result
@mcp.tool()
def haproxy_wait_drain(domain: str, timeout: int = 30) -> str:
"""Wait for all active connections to drain from a domain's servers.
Useful after setting servers to 'drain' state before maintenance.
Polls every second until all servers have 0 current connections or timeout.
Args:
domain: The domain name to wait for
timeout: Maximum seconds to wait (default: 30, max: 300)
Returns:
Success message or timeout error
Example:
haproxy_set_domain_state("api.example.com", "drain")
haproxy_wait_drain("api.example.com", timeout=60)
# Now safe to perform maintenance
"""
if not validate_domain(domain):
return "Error: Invalid domain format"
if not (1 <= timeout <= 300):
return "Error: Timeout must be between 1 and 300 seconds"
try:
backend, _ = get_backend_and_prefix(domain)
except ValueError as e:
return f"Error: {e}"
start_time = time.time()
while time.time() - start_time < timeout:
try:
stats = haproxy_cmd("show stat")
total_connections = 0
for line in stats.split("\n"):
parts = line.split(",")
if len(parts) > StatField.SCUR and parts[0] == backend and parts[1] not in ["FRONTEND", "BACKEND", ""]:
try:
scur = int(parts[StatField.SCUR]) if parts[StatField.SCUR] else 0
total_connections += scur
except ValueError:
pass
if total_connections == 0:
elapsed = int(time.time() - start_time)
return f"All connections drained for {domain} ({elapsed}s)"
time.sleep(1)
except HaproxyError as e:
return f"Error checking connections: {e}"
return f"Timeout: Connections still active after {timeout}s"
@mcp.tool()
def haproxy_stats() -> str:
"""Get HAProxy status and statistics.
Returns:
Key HAProxy metrics (name, version, uptime, connections, etc.)
"""
try:
result = haproxy_cmd("show info")
stats = {}
for line in result.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
stats[key.strip()] = value.strip()
important = ["Name", "Version", "Uptime_sec", "CurrConns", "MaxConn", "Run_queue", "Tasks"]
output = []
for key in important:
if key in stats:
output.append(f"{key}: {stats[key]}")
return "\n".join(output) if output else result
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_health() -> str:
"""Check overall system health (MCP server, HAProxy, config files).
Use this for monitoring integration. Returns "healthy" if all components are OK.
Returns:
JSON with:
- status: "healthy" or "unhealthy"
- components.mcp: MCP server status
- components.haproxy: HAProxy connectivity, version, uptime
- components.config_files: map_file and servers_file accessibility
Example:
# Returns: {"status": "healthy", "components": {"mcp": {"status": "ok"}, ...}}
"""
result: Dict[str, Any] = {
"status": "healthy",
"timestamp": time.time(),
"components": {
"mcp": {"status": "ok"},
"haproxy": {"status": "unknown"},
"config_files": {"status": "unknown"}
}
}
# Check HAProxy connectivity
try:
info = haproxy_cmd("show info")
for line in info.split("\n"):
if line.startswith("Version:"):
result["components"]["haproxy"]["version"] = line.split(":", 1)[1].strip()
elif line.startswith("Uptime_sec:"):
result["components"]["haproxy"]["uptime_sec"] = int(line.split(":", 1)[1].strip())
result["components"]["haproxy"]["status"] = "ok"
except HaproxyError as e:
result["components"]["haproxy"]["status"] = "error"
result["components"]["haproxy"]["error"] = str(e)
result["status"] = "degraded"
# Check container status
try:
container_result = subprocess.run(
["podman", "inspect", "--format", "{{.State.Status}}", HAPROXY_CONTAINER],
capture_output=True, text=True, timeout=5
)
if container_result.returncode == 0:
container_status = container_result.stdout.strip()
result["components"]["container"] = {
"status": "ok" if container_status == "running" else container_status,
"state": container_status
}
else:
result["components"]["container"] = {"status": "error", "error": container_result.stderr.strip()}
result["status"] = "unhealthy"
except subprocess.TimeoutExpired:
result["components"]["container"] = {"status": "timeout"}
result["status"] = "unhealthy"
except Exception as e:
logger.warning("Container health check failed: %s", e)
result["components"]["container"] = {"status": "error", "error": str(e)}
# Check configuration files
files_ok = True
file_status: Dict[str, str] = {}
for name, path in [("map_file", MAP_FILE), ("servers_file", SERVERS_FILE)]:
if os.path.exists(path):
file_status[name] = "ok"
else:
file_status[name] = "missing"
files_ok = False
result["components"]["config_files"]["files"] = file_status
result["components"]["config_files"]["status"] = "ok" if files_ok else "warning"
if not files_ok:
result["status"] = "degraded"
return json.dumps(result, indent=2)
@mcp.tool()
def haproxy_domain_health(domain: str) -> str:
"""Check health status of backend servers for a specific domain.
Returns detailed health information for all servers in the domain's backend.
Args:
domain: The domain name to check health for
Returns:
JSON with:
- status: "healthy" (all UP), "degraded" (partial UP), "down" (all DOWN), "no_servers"
- servers: list with name, addr, status (UP/DOWN), check_status (L4OK/L4TOUT/L4CON)
- healthy_count/total_count: server counts
Example:
haproxy_domain_health("api.example.com")
# Returns: {"status": "healthy", "servers": [...], "healthy_count": 2, "total_count": 2}
"""
if not validate_domain(domain):
return json.dumps({"error": "Invalid domain format"})
try:
backend, _ = get_backend_and_prefix(domain)
except ValueError as e:
return json.dumps({"error": str(e)})
result: Dict[str, Any] = {
"domain": domain,
"backend": backend,
"status": "unknown",
"servers": [],
"healthy_count": 0,
"total_count": 0
}
try:
# Get server states
state_output = haproxy_cmd("show servers state")
stat_output = haproxy_cmd("show stat")
# Build status map from stat output (has UP/DOWN/MAINT status)
status_map: Dict[str, Dict[str, str]] = {}
for stat in parse_stat_csv(stat_output):
if stat["pxname"] == backend and stat["svname"] not in ["FRONTEND", "BACKEND"]:
status_map[stat["svname"]] = {
"status": stat["status"],
"check_status": stat["check_status"],
"weight": stat["weight"]
}
# Parse server state for address info
for line in state_output.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
server_name = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
port = parts[StateField.SRV_PORT]
# Skip disabled servers (0.0.0.0)
if addr == "0.0.0.0":
continue
server_info: Dict[str, Any] = {
"name": server_name,
"addr": f"{addr}:{port}",
"status": "unknown"
}
# Get status from stat output
if server_name in status_map:
server_info["status"] = status_map[server_name]["status"]
server_info["check_status"] = status_map[server_name]["check_status"]
server_info["weight"] = status_map[server_name]["weight"]
result["servers"].append(server_info)
result["total_count"] += 1
if server_info["status"] == "UP":
result["healthy_count"] += 1
# Determine overall status
if result["total_count"] == 0:
result["status"] = "no_servers"
elif result["healthy_count"] == result["total_count"]:
result["status"] = "healthy"
elif result["healthy_count"] > 0:
result["status"] = "degraded"
else:
result["status"] = "down"
return json.dumps(result, indent=2)
except HaproxyError as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def haproxy_backends() -> str:
"""List all HAProxy backends.
Returns:
List of all configured backend names
"""
try:
result = haproxy_cmd("show backend")
backends = [line for line in result.split("\n") if line and not line.startswith("#")]
return "Backends:\n" + "\n".join(f"{b}" for b in backends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_list_frontends() -> str:
"""List all HAProxy frontends with their status.
Returns:
List of frontends with status and session counts
"""
try:
result = haproxy_cmd("show stat")
frontends = []
for stat in parse_stat_csv(result):
if stat["svname"] == "FRONTEND":
frontends.append(
f"{stat['pxname']}: {stat['status']} (sessions: {stat['scur']})"
)
if not frontends:
return "No frontends found"
return "Frontends:\n" + "\n".join(frontends)
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_state(backend: str, server: str, state: str) -> str:
"""Set server state for maintenance or traffic control.
Use haproxy_list_servers to get the backend and server names.
Args:
backend: Backend name (e.g., "pool_1" from haproxy_list_servers)
server: Server name (e.g., "pool_1_1" from haproxy_list_servers)
state: Target state:
- ready: Enable server for traffic
- drain: Stop new connections, finish existing (graceful shutdown)
- maint: Disable completely (maintenance mode)
Returns:
Success message or error description
Example:
# Put server in maintenance
haproxy_set_server_state("pool_1", "pool_1_2", "maint")
# Re-enable server
haproxy_set_server_state("pool_1", "pool_1_2", "ready")
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if state not in ["ready", "drain", "maint"]:
return "Error: state must be 'ready', 'drain', or 'maint'"
try:
haproxy_cmd_checked(f"set server {backend}/{server} state {state}")
return f"Server {backend}/{server} set to {state}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_server_health(backend: str = "") -> str:
"""Get health status of all servers (low-level view across all backends).
For domain-specific health, use haproxy_domain_health instead.
Args:
backend: Optional backend name to filter (e.g., "pool_1"). If empty, shows all.
Returns:
List of servers with status: UP (healthy), DOWN (failed), MAINT (maintenance)
Example:
haproxy_get_server_health() # All servers
haproxy_get_server_health("pool_1") # Only pool_1 backend
"""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
servers = []
for stat in parse_stat_csv(result):
if stat["svname"] not in ["FRONTEND", "BACKEND", ""]:
if backend and stat["pxname"] != backend:
continue
servers.append(
f"{stat['pxname']}/{stat['svname']}: {stat['status']} "
f"(weight: {stat['weight']}, check: {stat['check_status']})"
)
return "\n".join(servers) if servers else "No servers found"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_set_server_weight(backend: str, server: str, weight: int) -> str:
"""Set server weight for load balancing ratio control.
Higher weight = more traffic. Default is 1. Use 0 to disable without maintenance mode.
Args:
backend: Backend name (e.g., "pool_1")
server: Server name (e.g., "pool_1_1")
weight: Weight 0-256 (0 disables, higher = more traffic)
Returns:
Success message or error description
Example:
# Send 2x traffic to server 1 vs server 2
haproxy_set_server_weight("pool_1", "pool_1_1", 2)
haproxy_set_server_weight("pool_1", "pool_1_2", 1)
# Temporarily disable server (soft disable, not maintenance)
haproxy_set_server_weight("pool_1", "pool_1_3", 0)
"""
if not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
if not validate_backend_name(server):
return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)"
if not (0 <= weight <= 256):
return "Error: weight must be between 0 and 256"
try:
haproxy_cmd_checked(f"set server {backend}/{server} weight {weight}")
return f"Server {backend}/{server} weight set to {weight}"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_get_connections(backend: str = "") -> str:
"""Get active connection counts per server.
Useful for monitoring traffic distribution and identifying hot spots.
Args:
backend: Optional backend name to filter (e.g., "pool_1"). If empty, shows all.
Returns:
List of servers with current active connections
Example:
haproxy_get_connections() # All backends
haproxy_get_connections("pool_1") # Only pool_1
# Output: pool_1/pool_1_1: 5 active connections
"""
if backend and not validate_backend_name(backend):
return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)"
try:
result = haproxy_cmd("show stat")
connections = []
for stat in parse_stat_csv(result):
if backend and stat["pxname"] != backend:
continue
if stat["svname"] in ["FRONTEND", "BACKEND"]:
connections.append(
f"{stat['pxname']} ({stat['svname']}): {stat['scur']} current, {stat['smax']} max"
)
elif stat["svname"]:
connections.append(f" - {stat['svname']}: {stat['scur']} connections")
return "\n".join(connections) if connections else "No connection data"
except HaproxyError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_reload() -> str:
"""Reload HAProxy configuration (validates config first).
After reload, automatically restores server configurations from servers.json.
Returns:
Success message with restored server count, or error details if failed
"""
success, msg = reload_haproxy()
if not success:
return msg
# Restore servers from config after reload
try:
restored = restore_servers_from_config()
return f"HAProxy configuration reloaded successfully ({restored} servers restored)"
except Exception as e:
logger.error("Failed to restore servers after reload: %s", e)
return f"HAProxy reloaded but server restore failed: {e}"
@mcp.tool()
def haproxy_check_config() -> str:
"""Validate HAProxy configuration file syntax.
Returns:
Validation result or error details
"""
try:
result = subprocess.run(
["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"],
capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT
)
if result.returncode == 0:
return "Configuration is valid"
return f"Configuration errors:\n{result.stderr}"
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {SUBPROCESS_TIMEOUT} seconds"
except FileNotFoundError:
return "Error: podman command not found"
except OSError as e:
return f"Error: OS error: {e}"
@mcp.tool()
def haproxy_save_state() -> str:
"""Save current server state to disk atomically.
Returns:
Success message or error description
"""
try:
state = haproxy_cmd("show servers state")
atomic_write_file(STATE_FILE, state)
return "Server state saved"
except HaproxyError as e:
return f"Error: {e}"
except IOError as e:
return f"Error: {e}"
@mcp.tool()
def haproxy_restore_state() -> str:
"""Restore server state from disk.
Returns:
Summary of restored servers or error description
"""
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
except OSError:
pass # Continue without lock if not supported
try:
state = f.read()
finally:
try:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
except OSError:
pass
restored = 0
skipped = 0
for line in state.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and not line.startswith("#"):
backend = parts[StateField.BE_NAME]
server = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
port = parts[StateField.SRV_PORT]
# Skip disabled servers
if addr == "0.0.0.0":
continue
# Validate names from state file to prevent injection
if not validate_backend_name(backend) or not validate_backend_name(server):
skipped += 1
continue
# Validate IP and port
if not validate_ip(addr) or not validate_port(port):
skipped += 1
continue
try:
haproxy_cmd_checked(f"set server {backend}/{server} addr {addr} port {port}")
haproxy_cmd_checked(f"set server {backend}/{server} state ready")
restored += 1
except HaproxyError as e:
logger.warning("Failed to restore %s/%s: %s", backend, server, e)
result = f"Server state restored ({restored} servers)"
if skipped:
result += f", {skipped} entries skipped due to validation"
return result
except FileNotFoundError:
return "Error: No saved state found"
except HaproxyError as e:
return f"Error: {e}"
def startup_restore() -> None:
"""Restore servers from config file on startup."""
# Wait for HAProxy to be ready
for _ in range(STARTUP_RETRY_COUNT):
try:
haproxy_cmd("show info")
break
except HaproxyError:
time.sleep(1)
else:
logger.warning("HAProxy not ready, skipping restore")
return
try:
count = restore_servers_from_config()
if count > 0:
logger.info("Restored %d servers from config", count)
except (HaproxyError, OSError, ValueError, json.JSONDecodeError) as e:
logger.warning("Failed to restore servers: %s", e)
if __name__ == "__main__":
startup_restore()
mcp.run(transport="streamable-http")