#!/usr/bin/env python3 """HAProxy MCP Server - Direct Runtime API Integration This module provides an MCP (Model Context Protocol) server for managing HAProxy configuration and runtime state. It supports dynamic domain/server management with HTTP backends (SSL termination at HAProxy frontend). Environment Variables: MCP_HOST: Host to bind MCP server (default: 0.0.0.0) MCP_PORT: Port for MCP server (default: 8000) HAPROXY_HOST: HAProxy Runtime API host (default: localhost) HAPROXY_PORT: HAProxy Runtime API port (default: 9999) HAPROXY_STATE_FILE: Path to server state file HAPROXY_MAP_FILE: Path to domains.map file HAPROXY_MAP_FILE_CONTAINER: Container path for domains.map HAPROXY_SERVERS_FILE: Path to servers.json file HAPROXY_POOL_COUNT: Number of pool backends (default: 100) HAPROXY_MAX_SLOTS: Max servers per pool (default: 10) """ import socket import subprocess import re import json import logging import os import select import tempfile import time import fcntl import ipaddress from typing import Any, Dict, Generator, List, Optional, Set, Tuple from mcp.server.fastmcp import FastMCP # Configure structured logging log_level = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=getattr(logging, log_level, logging.INFO), format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # MCP Server configuration MCP_HOST: str = os.getenv("MCP_HOST", "0.0.0.0") MCP_PORT: int = int(os.getenv("MCP_PORT", "8000")) mcp = FastMCP("haproxy", host=MCP_HOST, port=MCP_PORT) # HAProxy Runtime API configuration HAPROXY_HOST: str = os.getenv("HAPROXY_HOST", "localhost") HAPROXY_PORT: int = int(os.getenv("HAPROXY_PORT", "9999")) HAPROXY_SOCKET: Tuple[str, int] = (HAPROXY_HOST, HAPROXY_PORT) # File paths (configurable via environment) STATE_FILE: str = os.getenv("HAPROXY_STATE_FILE", "/opt/haproxy/data/servers.state") MAP_FILE: str = os.getenv("HAPROXY_MAP_FILE", "/opt/haproxy/conf/domains.map") MAP_FILE_CONTAINER: str = os.getenv("HAPROXY_MAP_FILE_CONTAINER", "/usr/local/etc/haproxy/domains.map") SERVERS_FILE: str = os.getenv("HAPROXY_SERVERS_FILE", "/opt/haproxy/conf/servers.json") # Pool configuration POOL_COUNT: int = int(os.getenv("HAPROXY_POOL_COUNT", "100")) MAX_SLOTS: int = int(os.getenv("HAPROXY_MAX_SLOTS", "10")) # Container configuration HAPROXY_CONTAINER: str = os.getenv("HAPROXY_CONTAINER", "haproxy") # Validation patterns - compiled once for performance DOMAIN_PATTERN = re.compile( r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?' r'(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$' ) # Backend/server names: alphanumeric, underscore, hyphen only BACKEND_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_-]+$') # Pattern for converting domain to backend name NON_ALNUM_PATTERN = re.compile(r'[^a-zA-Z0-9]') # Limits MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB max response from HAProxy SUBPROCESS_TIMEOUT = 30 # seconds STARTUP_RETRY_COUNT = 10 # HAProxy ready check retries STATE_MIN_COLUMNS = 19 # Minimum columns in HAProxy server state output SOCKET_TIMEOUT = 5 # seconds for HAProxy socket connection SOCKET_RECV_TIMEOUT = 30 # seconds for HAProxy socket recv loop MAX_BULK_SERVERS = 10 # Max servers per bulk add call class HaproxyError(Exception): """HAProxy operation error""" pass class NoAvailablePoolError(HaproxyError): """All pool backends are in use.""" pass # CSV field indices for HAProxy stats (show stat command) class StatField: """HAProxy CSV stat field indices.""" PXNAME = 0 # Proxy name (frontend/backend) SVNAME = 1 # Server name (or FRONTEND/BACKEND) SCUR = 4 # Current sessions SMAX = 6 # Max sessions STATUS = 17 # Status (UP/DOWN/MAINT/etc) WEIGHT = 18 # Server weight CHECK_STATUS = 36 # Check status # Field indices for HAProxy server state (show servers state command) class StateField: """HAProxy server state field indices.""" BE_ID = 0 # Backend ID BE_NAME = 1 # Backend name SRV_ID = 2 # Server ID SRV_NAME = 3 # Server name SRV_ADDR = 4 # Server address SRV_OP_STATE = 5 # Operational state SRV_ADMIN_STATE = 6 # Admin state SRV_PORT = 18 # Server port def haproxy_cmd(command: str) -> str: """Send command to HAProxy Runtime API. Args: command: The HAProxy runtime API command to execute Returns: The response from HAProxy Raises: HaproxyError: If connection fails, times out, or response exceeds size limit """ try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(SOCKET_TIMEOUT) s.connect(HAPROXY_SOCKET) s.sendall(f"{command}\n".encode()) s.shutdown(socket.SHUT_WR) # Set socket to non-blocking for select-based recv loop s.setblocking(False) response = b"" start_time = time.time() while True: # Check for overall timeout elapsed = time.time() - start_time if elapsed >= SOCKET_RECV_TIMEOUT: raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds") # Wait for data with timeout (remaining time) remaining = SOCKET_RECV_TIMEOUT - elapsed ready, _, _ = select.select([s], [], [], min(remaining, 1.0)) if ready: data = s.recv(8192) if not data: break response += data if len(response) > MAX_RESPONSE_SIZE: raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit") return response.decode().strip() except socket.timeout: raise HaproxyError("Connection timeout") except ConnectionRefusedError: raise HaproxyError("Connection refused - HAProxy not running?") except UnicodeDecodeError: raise HaproxyError("Invalid UTF-8 in response") except HaproxyError: raise except Exception as e: raise HaproxyError(str(e)) from e def haproxy_cmd_checked(command: str) -> str: """Send command to HAProxy and raise on error response. Args: command: HAProxy command to execute Returns: Command response Raises: HaproxyError: If HAProxy returns an error message """ result = haproxy_cmd(command) # HAProxy returns empty string on success, error messages on failure error_indicators = ["No such", "not found", "error", "failed", "invalid", "unknown"] if result: result_lower = result.lower() for indicator in error_indicators: if indicator.lower() in result_lower: raise HaproxyError(f"HAProxy command failed: {result.strip()}") return result def reload_haproxy() -> Tuple[bool, str]: """Validate and reload HAProxy configuration. Returns: Tuple of (success, message) """ try: validate = subprocess.run( ["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if validate.returncode != 0: return False, f"Config validation failed:\n{validate.stderr}" result = subprocess.run( ["podman", "kill", "--signal", "USR2", HAPROXY_CONTAINER], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if result.returncode != 0: return False, f"Reload failed: {result.stderr}" return True, "OK" except subprocess.TimeoutExpired: return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds" except FileNotFoundError: return False, "podman command not found" except OSError as e: return False, f"OS error: {e}" def validate_domain(domain: str) -> bool: """Validate domain format. Args: domain: The domain name to validate Returns: True if domain is valid, False otherwise """ if not domain or len(domain) > 253: return False return bool(DOMAIN_PATTERN.match(domain)) def validate_ip(ip: str, allow_empty: bool = False) -> bool: """Validate IPv4 or IPv6 address format. Args: ip: The IP address to validate allow_empty: If True, empty string is considered valid Returns: True if IP is valid, False otherwise """ if not ip: return allow_empty try: ipaddress.ip_address(ip) return True except ValueError: return False def validate_backend_name(name: str) -> bool: """Validate backend or server name to prevent command injection. Args: name: The backend or server name to validate Returns: True if name contains only safe characters """ if not name or len(name) > 255: return False return bool(BACKEND_NAME_PATTERN.match(name)) def domain_to_backend(domain: str) -> str: """Convert domain to backend name (alphanumeric + underscore only). Args: domain: The domain name to convert Returns: Backend name with non-alphanumeric characters replaced by underscores Raises: ValueError: If resulting name is invalid """ result = NON_ALNUM_PATTERN.sub('_', domain) if not validate_backend_name(result): raise ValueError(f"Invalid backend name after conversion: {result}") return result def parse_stat_csv(stat_output: str) -> Generator[Dict[str, str], None, None]: """Parse HAProxy stat CSV output into structured data. Args: stat_output: Raw output from 'show stat' command Yields: Dictionaries with parsed stat fields for each row """ for line in stat_output.split("\n"): if not line or line.startswith("#"): continue parts = line.split(",") if len(parts) > StatField.STATUS: yield { "pxname": parts[StatField.PXNAME], "svname": parts[StatField.SVNAME], "scur": parts[StatField.SCUR] if len(parts) > StatField.SCUR else "0", "smax": parts[StatField.SMAX] if len(parts) > StatField.SMAX else "0", "status": parts[StatField.STATUS], "weight": parts[StatField.WEIGHT] if len(parts) > StatField.WEIGHT else "0", "check_status": parts[StatField.CHECK_STATUS] if len(parts) > StatField.CHECK_STATUS else "", } def validate_port(port: str) -> bool: """Validate port number is in valid range. Args: port: Port number as string Returns: True if port is valid (1-65535), False otherwise """ if not port or not port.isdigit(): return False port_num = int(port) return 1 <= port_num <= 65535 def get_map_contents() -> List[Tuple[str, str]]: """Read domains.map file and return list of (domain, backend) tuples. Returns: List of (domain, backend) tuples from the map file """ entries = [] try: with open(MAP_FILE, "r", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_SH) except OSError: pass # Continue without lock if not supported try: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split() if len(parts) >= 2: entries.append((parts[0], parts[1])) finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass except FileNotFoundError: pass return entries def find_available_pool() -> str: """Find first unused pool from pool_1 to pool_{POOL_COUNT}. Returns: Pool name (e.g., 'pool_1') if available Raises: NoAvailablePoolError: If all pools are in use """ used_pools: Set[str] = set() for domain, backend in get_map_contents(): if backend.startswith("pool_"): used_pools.add(backend) for i in range(1, POOL_COUNT + 1): pool_name = f"pool_{i}" if pool_name not in used_pools: return pool_name raise NoAvailablePoolError(f"All {POOL_COUNT} pool backends are in use") def get_domain_backend(domain: str) -> Optional[str]: """Look up the backend for a domain from domains.map. Args: domain: The domain to look up Returns: Backend name if found, None otherwise """ for map_domain, backend in get_map_contents(): if map_domain == domain: return backend return None def is_legacy_backend(backend: str) -> bool: """Check if backend is a legacy static backend (not a pool). Args: backend: Backend name to check Returns: True if this is a legacy backend, False if it's a pool """ return not backend.startswith("pool_") def get_legacy_backend_name(domain: str) -> str: """Convert domain to legacy backend name format. Args: domain: Domain name Returns: Legacy backend name (e.g., 'api_example_com_backend') """ return f"{domain_to_backend(domain)}_backend" def atomic_write_file(file_path: str, content: str) -> None: """Write content to file atomically using temp file + rename. Args: file_path: Target file path content: Content to write Raises: IOError: If write fails """ dir_path = os.path.dirname(file_path) fd = None temp_path = None try: fd, temp_path = tempfile.mkstemp(dir=dir_path, prefix='.tmp.') with os.fdopen(fd, 'w', encoding='utf-8') as f: fd = None # fd is now owned by the file object f.write(content) os.rename(temp_path, file_path) temp_path = None # Rename succeeded except OSError as e: raise IOError(f"Failed to write {file_path}: {e}") from e finally: if fd is not None: try: os.close(fd) except OSError: pass if temp_path is not None: try: os.unlink(temp_path) except OSError: pass def save_map_file(entries: List[Tuple[str, str]]) -> None: """Save entries to domains.map file atomically. Uses temp file + rename for atomic write to prevent race conditions. Args: entries: List of (domain, backend) tuples to write Raises: IOError: If the file cannot be written """ lines = [ "# Domain to Backend mapping\n", "# Format: domain backend_name\n", "# Wildcard: .domain.com matches *.domain.com\n\n", ] for domain, backend in entries: lines.append(f"{domain} {backend}\n") atomic_write_file(MAP_FILE, "".join(lines)) def load_servers_config() -> Dict[str, Any]: """Load servers configuration from JSON file with file locking. Returns: Dictionary with server configurations """ try: with open(SERVERS_FILE, "r", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_SH) except OSError: logger.debug("File locking not supported for %s", SERVERS_FILE) try: return json.load(f) finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass except FileNotFoundError: return {} except json.JSONDecodeError as e: logger.warning("Corrupt config file %s: %s", SERVERS_FILE, e) return {} def save_servers_config(config: Dict[str, Any]) -> None: """Save servers configuration to JSON file atomically. Uses temp file + rename for atomic write to prevent race conditions. Args: config: Dictionary with server configurations """ atomic_write_file(SERVERS_FILE, json.dumps(config, indent=2)) def add_server_to_config(domain: str, slot: int, ip: str, http_port: int) -> None: """Add server configuration to persistent storage with file locking. Args: domain: Domain name slot: Server slot (1 to MAX_SLOTS) ip: Server IP address http_port: HTTP port """ lock_path = f"{SERVERS_FILE}.lock" with open(lock_path, 'w') as lock_file: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) try: config = load_servers_config() if domain not in config: config[domain] = {} config[domain][str(slot)] = {"ip": ip, "http_port": http_port} save_servers_config(config) finally: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) def remove_server_from_config(domain: str, slot: int) -> None: """Remove server configuration from persistent storage with file locking. Args: domain: Domain name slot: Server slot to remove """ lock_path = f"{SERVERS_FILE}.lock" with open(lock_path, 'w') as lock_file: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) try: config = load_servers_config() if domain in config and str(slot) in config[domain]: del config[domain][str(slot)] if not config[domain]: del config[domain] save_servers_config(config) finally: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) def remove_domain_from_config(domain: str) -> None: """Remove domain from persistent config with file locking. Args: domain: Domain name to remove """ lock_path = f"{SERVERS_FILE}.lock" with open(lock_path, 'w') as lock_file: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) try: config = load_servers_config() if domain in config: del config[domain] save_servers_config(config) finally: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) def get_server_suffixes(http_port: int) -> List[Tuple[str, int]]: """Get server suffixes and ports based on port configuration. Args: http_port: HTTP port for backend Returns: List of (suffix, port) tuples - always HTTP only """ return [("", http_port)] def configure_server_slot(backend: str, server_prefix: str, slot: int, ip: str, http_port: int) -> str: """Configure a server slot in HAProxy. Args: backend: Backend name (e.g., 'pool_1') server_prefix: Server name prefix (e.g., 'pool_1') slot: Slot number (1-10) ip: Server IP address http_port: HTTP port Returns: Server name that was configured Raises: HaproxyError: If HAProxy command fails """ server = f"{server_prefix}_{slot}" haproxy_cmd_checked(f"set server {backend}/{server} addr {ip} port {http_port}") haproxy_cmd_checked(f"set server {backend}/{server} state ready") return server def get_backend_and_prefix(domain: str) -> Tuple[str, str]: """Look up backend and determine server name prefix for a domain. Args: domain: The domain name to look up Returns: Tuple of (backend_name, server_prefix) Raises: ValueError: If domain cannot be mapped to a valid backend """ backend = get_domain_backend(domain) if not backend: backend = get_legacy_backend_name(domain) if backend.startswith("pool_"): server_prefix = backend else: server_prefix = domain_to_backend(domain) return backend, server_prefix def restore_servers_from_config() -> int: """Restore all servers from configuration file. Returns: Number of servers restored """ config = load_servers_config() restored = 0 for domain, slots in config.items(): backend = get_domain_backend(domain) if not backend: continue try: _, server_prefix = get_backend_and_prefix(domain) except ValueError as e: logger.warning("Invalid domain '%s': %s", domain, e) continue for slot_str, server_info in slots.items(): try: slot = int(slot_str) except ValueError: logger.warning("Invalid slot '%s' for %s, skipping", slot_str, domain) continue ip = server_info.get("ip", "") if not ip: continue try: http_port = int(server_info.get("http_port", 80)) except (ValueError, TypeError): logger.warning("Invalid port for %s slot %d, skipping", domain, slot) continue for suffix, port in get_server_suffixes(http_port): server = f"{server_prefix}{suffix}_{slot}" try: haproxy_cmd_checked(f"set server {backend}/{server} addr {ip} port {port}") haproxy_cmd_checked(f"set server {backend}/{server} state ready") restored += 1 except HaproxyError as e: logger.warning("Failed to restore %s/%s: %s", backend, server, e) return restored @mcp.tool() def haproxy_list_domains(include_wildcards: bool = False) -> str: """List all configured domains with their backend servers. Shows all domains mapped in HAProxy with their pool backend and configured servers. Args: include_wildcards: If True, also show wildcard domain mappings (entries starting with '.', e.g., '.example.com' which matches '*.example.com'). Default is False to show only explicit domain mappings. Returns: List of domains in format: domain -> pool_N (pool): server=ip:port Example: # Output: # • api.example.com -> pool_1 (pool): pool_1_1=10.0.0.1:8080, pool_1_2=10.0.0.2:8080 # • web.example.com -> pool_2 (pool): pool_2_1=10.0.0.3:80 # With include_wildcards=True: # • api.example.com -> pool_1 (pool): pool_1_1=10.0.0.1:8080 # • .api.example.com -> pool_1 (wildcard): pool_1_1=10.0.0.1:8080 """ try: domains = [] state = haproxy_cmd("show servers state") # Build server map from HAProxy state server_map: Dict[str, list] = {} for line in state.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.SRV_ADDR] != "0.0.0.0": backend = parts[StateField.BE_NAME] if backend not in server_map: server_map[backend] = [] server_map[backend].append( f"{parts[StateField.SRV_NAME]}={parts[StateField.SRV_ADDR]}:{parts[StateField.SRV_PORT]}" ) # Read from domains.map seen_domains: Set[str] = set() for domain, backend in get_map_contents(): # Skip wildcard entries unless explicitly requested if domain.startswith(".") and not include_wildcards: continue if domain in seen_domains: continue seen_domains.add(domain) servers = server_map.get(backend, ["(none)"]) if domain.startswith("."): backend_type = "wildcard" elif backend.startswith("pool_"): backend_type = "pool" else: backend_type = "static" domains.append(f"• {domain} -> {backend} ({backend_type}): {', '.join(servers)}") return "\n".join(domains) if domains else "No domains configured" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_add_domain(domain: str, ip: str = "", http_port: int = 80) -> str: """Add a new domain to HAProxy using map-based routing (no reload required). Creates a domain mapping to a pool backend. Each domain can have up to 10 backend servers for load balancing. Use haproxy_add_server to add more servers. Args: domain: The domain name to add (e.g., api.example.com) ip: Optional IP address for initial server; if provided, adds to slot 1 http_port: HTTP port for the backend server (default: 80) Returns: Success message or error description Example: # Add domain without server (add servers later) haproxy_add_domain("api.example.com") # Add domain with initial server haproxy_add_domain("api.example.com", ip="10.0.0.1", http_port=8080) """ # Validate inputs if not validate_domain(domain): return "Error: Invalid domain format" if not validate_ip(ip, allow_empty=True): return "Error: Invalid IP address format" if not (1 <= http_port <= 65535): return "Error: Port must be between 1 and 65535" # Read map contents once for both existence check and pool lookup entries = get_map_contents() # Check if domain already exists (using cached entries) for domain_entry, backend in entries: if domain_entry == domain: return f"Error: Domain {domain} already exists (mapped to {backend})" # Find available pool (using cached entries) used_pools: Set[str] = set() for _, backend in entries: if backend.startswith("pool_"): used_pools.add(backend) pool = None for i in range(1, POOL_COUNT + 1): pool_name = f"pool_{i}" if pool_name not in used_pools: pool = pool_name break if not pool: return f"Error: All {POOL_COUNT} pool backends are in use" try: # Save to disk first (atomic write for persistence) # If HAProxy update fails after this, state will be correct on restart # Note: We already have 'entries' from the map contents read above entries.append((domain, pool)) entries.append((f".{domain}", pool)) try: save_map_file(entries) except IOError as e: return f"Error: Failed to save map file: {e}" # Then update HAProxy map via Runtime API try: haproxy_cmd(f"add map {MAP_FILE_CONTAINER} {domain} {pool}") haproxy_cmd(f"add map {MAP_FILE_CONTAINER} .{domain} {pool}") except HaproxyError as e: # Rollback: remove the domain we just added from entries and re-save rollback_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"] try: save_map_file(rollback_entries) except IOError: logger.error("Failed to rollback map file after HAProxy error") return f"Error: Failed to update HAProxy map: {e}" # If IP provided, add server to slot 1 if ip: # Save server config to disk first add_server_to_config(domain, 1, ip, http_port) try: for suffix, port in get_server_suffixes(http_port): server = f"{pool}{suffix}_1" haproxy_cmd(f"set server {pool}/{server} addr {ip} port {port}") haproxy_cmd(f"set server {pool}/{server} state ready") except HaproxyError as e: # Rollback server config on failure remove_server_from_config(domain, 1) return f"Domain {domain} added to {pool} but server config failed: {e}" return f"Domain {domain} added to {pool} with server {ip}:{http_port}" return f"Domain {domain} added to {pool} (no servers configured)" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_remove_domain(domain: str) -> str: """Remove a domain from HAProxy (no reload required for pool-based domains). Args: domain: The domain name to remove Returns: Success message or error description """ if not validate_domain(domain): return "Error: Invalid domain format" # Look up the domain in the map backend = get_domain_backend(domain) if not backend: return f"Error: Domain {domain} not found" # Check if this is a legacy backend (not a pool) if is_legacy_backend(backend): return f"Error: Cannot remove legacy domain {domain} (uses static backend {backend})" try: # Save to disk first (atomic write for persistence) # If HAProxy update fails after this, state will be correct on restart entries = get_map_contents() new_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"] save_map_file(new_entries) # Remove from persistent server config remove_domain_from_config(domain) # Clear map entries via Runtime API (immediate effect) haproxy_cmd(f"del map {MAP_FILE_CONTAINER} {domain}") try: haproxy_cmd(f"del map {MAP_FILE_CONTAINER} .{domain}") except HaproxyError as e: logger.warning("Failed to remove wildcard entry for %s: %s", domain, e) # Disable all servers in the pool (reset to 0.0.0.0:0) for slot in range(1, MAX_SLOTS + 1): server = f"{backend}_{slot}" try: haproxy_cmd(f"set server {backend}/{server} state maint") haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0") except HaproxyError as e: logger.warning( "Failed to clear server %s/%s for domain %s: %s", backend, server, domain, e ) # Continue with remaining cleanup return f"Domain {domain} removed from {backend}" except IOError as e: return f"Error: Failed to update map file: {e}" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_list_servers(domain: str) -> str: """List all servers for a specific domain. Shows all configured servers with their slot numbers, addresses, and status. Use this to see which slots are in use before adding or removing servers. Args: domain: The domain name to list servers for Returns: List of servers with slot number, address (ip:port), and status (UP/DOWN/MAINT) Example: haproxy_list_servers("api.example.com") # Output: pool_1_1: 10.0.0.1:8080 (UP) # pool_1_2: 10.0.0.2:8080 (UP) """ if not validate_domain(domain): return "Error: Invalid domain format" try: backend, _ = get_backend_and_prefix(domain) servers = [] state = haproxy_cmd("show servers state") for line in state.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend: addr = parts[StateField.SRV_ADDR] status = "active" if addr != "0.0.0.0" else "disabled" servers.append( f"• {parts[StateField.SRV_NAME]}: {addr}:{parts[StateField.SRV_PORT]} ({status})" ) if not servers: return f"Backend {backend} not found" return f"Servers for {domain} ({backend}):\n" + "\n".join(servers) except (HaproxyError, ValueError) as e: return f"Error: {e}" @mcp.tool() def haproxy_add_server(domain: str, slot: int, ip: str, http_port: int = 80) -> str: """Add a server to a domain's backend at specified slot. Each domain can have up to 10 servers (slots 1-10) for load balancing. To add multiple servers, call this with different slot numbers. HAProxy distributes traffic across all configured servers using round-robin. Args: domain: The domain name to add the server to slot: Server slot number (1-10), or 0/-1 for auto-select first available slot ip: IP address of the server (required) http_port: HTTP port (default: 80) Returns: Success message or error description Example: # Add two servers for load balancing haproxy_add_server("api.example.com", slot=1, ip="10.0.0.1", http_port=8080) haproxy_add_server("api.example.com", slot=2, ip="10.0.0.2", http_port=8080) # Add a third server haproxy_add_server("api.example.com", slot=3, ip="10.0.0.3", http_port=8080) # Auto-select next available slot haproxy_add_server("api.example.com", slot=0, ip="10.0.0.4", http_port=8080) """ if not validate_domain(domain): return "Error: Invalid domain format" if not ip: return "Error: IP address is required" if not validate_ip(ip): return "Error: Invalid IP address format" if not (1 <= http_port <= 65535): return "Error: Port must be between 1 and 65535" try: backend, server_prefix = get_backend_and_prefix(domain) # Auto-select slot if slot <= 0 if slot <= 0: state = haproxy_cmd("show servers state") used_slots: Set[int] = set() for line in state.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend: if parts[StateField.SRV_ADDR] != "0.0.0.0": # Extract slot number from server name (e.g., pool_1_3 -> 3) server_name = parts[StateField.SRV_NAME] try: used_slots.add(int(server_name.rsplit("_", 1)[1])) except (ValueError, IndexError): pass for s in range(1, MAX_SLOTS + 1): if s not in used_slots: slot = s break else: return f"Error: No available slots (all {MAX_SLOTS} slots in use)" elif not (1 <= slot <= MAX_SLOTS): return f"Error: Slot must be between 1 and {MAX_SLOTS}, or 0/-1 for auto-select" # Save to persistent config FIRST (disk-first pattern) add_server_to_config(domain, slot, ip, http_port) try: server = configure_server_slot(backend, server_prefix, slot, ip, http_port) return f"Added to {domain} ({backend}) slot {slot}:\n{server} → {ip}:{http_port}" except HaproxyError as e: # Rollback config on HAProxy failure remove_server_from_config(domain, slot) return f"Error: {e}" except (ValueError, IOError) as e: return f"Error: {e}" @mcp.tool() def haproxy_add_servers(domain: str, servers: str) -> str: """Add multiple servers to a domain's backend at once. More efficient than calling haproxy_add_server multiple times. All servers are validated before any are added. Args: domain: The domain name to add servers to servers: JSON array of server configs. Each object can have: - slot (required): Server slot number (1-10) - ip (required): IP address of the server - http_port (optional): HTTP port (default: 80) Example: '[{"slot":1,"ip":"10.0.0.1","http_port":80},{"slot":2,"ip":"10.0.0.2"}]' Returns: Summary of added servers or errors for each failed server Example: haproxy_add_servers("api.example.com", '[{"slot":1,"ip":"10.0.0.1"},{"slot":2,"ip":"10.0.0.2"}]') # Output: Added 2 servers to api.example.com (pool_1): # • slot 1: 10.0.0.1:80 # • slot 2: 10.0.0.2:80 """ if not validate_domain(domain): return "Error: Invalid domain format" # Parse JSON array try: server_list = json.loads(servers) except json.JSONDecodeError as e: return f"Error: Invalid JSON - {e}" if not isinstance(server_list, list): return "Error: servers must be a JSON array" if not server_list: return "Error: servers array is empty" if len(server_list) > MAX_BULK_SERVERS: return f"Error: Cannot add more than {MAX_BULK_SERVERS} servers at once" # Validate all servers first before adding any validated_servers = [] validation_errors = [] for i, srv in enumerate(server_list): if not isinstance(srv, dict): validation_errors.append(f"Server {i+1}: must be an object") continue # Extract and validate slot slot = srv.get("slot") if slot is None: validation_errors.append(f"Server {i+1}: missing 'slot' field") continue try: slot = int(slot) except (ValueError, TypeError): validation_errors.append(f"Server {i+1}: slot must be an integer") continue if not (1 <= slot <= MAX_SLOTS): validation_errors.append(f"Server {i+1}: slot must be between 1 and {MAX_SLOTS}") continue # Extract and validate IP ip = srv.get("ip") if not ip: validation_errors.append(f"Server {i+1}: missing 'ip' field") continue if not validate_ip(ip): validation_errors.append(f"Server {i+1}: invalid IP address '{ip}'") continue # Extract and validate port http_port = srv.get("http_port", 80) try: http_port = int(http_port) except (ValueError, TypeError): validation_errors.append(f"Server {i+1}: http_port must be an integer") continue if not (1 <= http_port <= 65535): validation_errors.append(f"Server {i+1}: port must be between 1 and 65535") continue validated_servers.append({"slot": slot, "ip": ip, "http_port": http_port}) # Return validation errors if any if validation_errors: return "Validation errors:\n" + "\n".join(f" • {e}" for e in validation_errors) # Check for duplicate slots slots = [s["slot"] for s in validated_servers] if len(slots) != len(set(slots)): return "Error: Duplicate slot numbers in servers array" # Get backend info try: backend, server_prefix = get_backend_and_prefix(domain) except ValueError as e: return f"Error: {e}" # Save ALL servers to config FIRST (disk-first pattern) for server_config in validated_servers: slot = server_config["slot"] ip = server_config["ip"] http_port = server_config["http_port"] add_server_to_config(domain, slot, ip, http_port) # Then update HAProxy added = [] errors = [] failed_slots = [] try: for server_config in validated_servers: slot = server_config["slot"] ip = server_config["ip"] http_port = server_config["http_port"] try: configure_server_slot(backend, server_prefix, slot, ip, http_port) added.append(f"slot {slot}: {ip}:{http_port}") except HaproxyError as e: failed_slots.append(slot) errors.append(f"slot {slot}: {e}") except Exception as e: # Rollback all saved configs on unexpected error for server_config in validated_servers: remove_server_from_config(domain, server_config["slot"]) return f"Error: {e}" # Rollback failed slots from config for slot in failed_slots: remove_server_from_config(domain, slot) # Build result message result_parts = [] if added: result_parts.append(f"Added {len(added)} servers to {domain} ({backend}):") result_parts.extend(f" • {s}" for s in added) if errors: result_parts.append(f"Failed to add {len(errors)} servers:") result_parts.extend(f" • {e}" for e in errors) return "\n".join(result_parts) if result_parts else "No servers added" @mcp.tool() def haproxy_remove_server(domain: str, slot: int) -> str: """Remove a server from a domain's backend at specified slot. Removes the server from load balancing rotation. Use haproxy_list_servers to see which slots are in use before removing. Args: domain: The domain name to remove the server from slot: Server slot number (1-10) to remove Returns: Success message or error description Example: # Remove server at slot 2 haproxy_remove_server("api.example.com", slot=2) """ if not validate_domain(domain): return "Error: Invalid domain format" if not (1 <= slot <= MAX_SLOTS): return f"Error: Slot must be between 1 and {MAX_SLOTS}" try: backend, server_prefix = get_backend_and_prefix(domain) # HTTP only - single server per slot server = f"{server_prefix}_{slot}" haproxy_cmd_checked(f"set server {backend}/{server} state maint") haproxy_cmd_checked(f"set server {backend}/{server} addr 0.0.0.0 port 0") # Remove from persistent config remove_server_from_config(domain, slot) return f"Removed server at slot {slot} from {domain} ({backend})" except (HaproxyError, ValueError, IOError) as e: return f"Error: {e}" @mcp.tool() def haproxy_set_domain_state(domain: str, state: str) -> str: """Set state for all servers of a domain at once. Useful for maintenance windows or deployments - set all servers to drain/maint before updates, then back to ready after. Args: domain: The domain name state: Target state - ready, drain, or maint Returns: Summary of state changes or error Example: # Put all servers in maintenance for deployment haproxy_set_domain_state("api.example.com", "maint") # Re-enable all servers after deployment haproxy_set_domain_state("api.example.com", "ready") """ if not validate_domain(domain): return "Error: Invalid domain format" if state not in ["ready", "drain", "maint"]: return "Error: State must be 'ready', 'drain', or 'maint'" try: backend, server_prefix = get_backend_and_prefix(domain) except ValueError as e: return f"Error: {e}" # Get active servers for this domain try: state_output = haproxy_cmd("show servers state") except HaproxyError as e: return f"Error: {e}" changed = [] errors = [] for line in state_output.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend: server_name = parts[StateField.SRV_NAME] addr = parts[StateField.SRV_ADDR] # Only change state for configured servers (not 0.0.0.0) if addr != "0.0.0.0": try: haproxy_cmd_checked(f"set server {backend}/{server_name} state {state}") changed.append(server_name) except HaproxyError as e: errors.append(f"{server_name}: {e}") if not changed and not errors: return f"No active servers found for {domain}" result = f"Set {len(changed)} servers to '{state}' for {domain}" if changed: result += ":\n" + "\n".join(f" • {s}" for s in changed) if errors: result += f"\n\nErrors ({len(errors)}):\n" + "\n".join(f" • {e}" for e in errors) return result @mcp.tool() def haproxy_wait_drain(domain: str, timeout: int = 30) -> str: """Wait for all active connections to drain from a domain's servers. Useful after setting servers to 'drain' state before maintenance. Polls every second until all servers have 0 current connections or timeout. Args: domain: The domain name to wait for timeout: Maximum seconds to wait (default: 30, max: 300) Returns: Success message or timeout error Example: haproxy_set_domain_state("api.example.com", "drain") haproxy_wait_drain("api.example.com", timeout=60) # Now safe to perform maintenance """ if not validate_domain(domain): return "Error: Invalid domain format" if not (1 <= timeout <= 300): return "Error: Timeout must be between 1 and 300 seconds" try: backend, _ = get_backend_and_prefix(domain) except ValueError as e: return f"Error: {e}" start_time = time.time() while time.time() - start_time < timeout: try: stats = haproxy_cmd("show stat") total_connections = 0 for line in stats.split("\n"): parts = line.split(",") if len(parts) > StatField.SCUR and parts[0] == backend and parts[1] not in ["FRONTEND", "BACKEND", ""]: try: scur = int(parts[StatField.SCUR]) if parts[StatField.SCUR] else 0 total_connections += scur except ValueError: pass if total_connections == 0: elapsed = int(time.time() - start_time) return f"All connections drained for {domain} ({elapsed}s)" time.sleep(1) except HaproxyError as e: return f"Error checking connections: {e}" return f"Timeout: Connections still active after {timeout}s" @mcp.tool() def haproxy_stats() -> str: """Get HAProxy status and statistics. Returns: Key HAProxy metrics (name, version, uptime, connections, etc.) """ try: result = haproxy_cmd("show info") stats = {} for line in result.split("\n"): if ":" in line: key, value = line.split(":", 1) stats[key.strip()] = value.strip() important = ["Name", "Version", "Uptime_sec", "CurrConns", "MaxConn", "Run_queue", "Tasks"] output = [] for key in important: if key in stats: output.append(f"• {key}: {stats[key]}") return "\n".join(output) if output else result except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_health() -> str: """Check overall system health (MCP server, HAProxy, config files). Use this for monitoring integration. Returns "healthy" if all components are OK. Returns: JSON with: - status: "healthy" or "unhealthy" - components.mcp: MCP server status - components.haproxy: HAProxy connectivity, version, uptime - components.config_files: map_file and servers_file accessibility Example: # Returns: {"status": "healthy", "components": {"mcp": {"status": "ok"}, ...}} """ result: Dict[str, Any] = { "status": "healthy", "timestamp": time.time(), "components": { "mcp": {"status": "ok"}, "haproxy": {"status": "unknown"}, "config_files": {"status": "unknown"} } } # Check HAProxy connectivity try: info = haproxy_cmd("show info") for line in info.split("\n"): if line.startswith("Version:"): result["components"]["haproxy"]["version"] = line.split(":", 1)[1].strip() elif line.startswith("Uptime_sec:"): result["components"]["haproxy"]["uptime_sec"] = int(line.split(":", 1)[1].strip()) result["components"]["haproxy"]["status"] = "ok" except HaproxyError as e: result["components"]["haproxy"]["status"] = "error" result["components"]["haproxy"]["error"] = str(e) result["status"] = "degraded" # Check container status try: container_result = subprocess.run( ["podman", "inspect", "--format", "{{.State.Status}}", HAPROXY_CONTAINER], capture_output=True, text=True, timeout=5 ) if container_result.returncode == 0: container_status = container_result.stdout.strip() result["components"]["container"] = { "status": "ok" if container_status == "running" else container_status, "state": container_status } else: result["components"]["container"] = {"status": "error", "error": container_result.stderr.strip()} result["status"] = "unhealthy" except subprocess.TimeoutExpired: result["components"]["container"] = {"status": "timeout"} result["status"] = "unhealthy" except Exception as e: logger.warning("Container health check failed: %s", e) result["components"]["container"] = {"status": "error", "error": str(e)} # Check configuration files files_ok = True file_status: Dict[str, str] = {} for name, path in [("map_file", MAP_FILE), ("servers_file", SERVERS_FILE)]: if os.path.exists(path): file_status[name] = "ok" else: file_status[name] = "missing" files_ok = False result["components"]["config_files"]["files"] = file_status result["components"]["config_files"]["status"] = "ok" if files_ok else "warning" if not files_ok: result["status"] = "degraded" return json.dumps(result, indent=2) @mcp.tool() def haproxy_domain_health(domain: str) -> str: """Check health status of backend servers for a specific domain. Returns detailed health information for all servers in the domain's backend. Args: domain: The domain name to check health for Returns: JSON with: - status: "healthy" (all UP), "degraded" (partial UP), "down" (all DOWN), "no_servers" - servers: list with name, addr, status (UP/DOWN), check_status (L4OK/L4TOUT/L4CON) - healthy_count/total_count: server counts Example: haproxy_domain_health("api.example.com") # Returns: {"status": "healthy", "servers": [...], "healthy_count": 2, "total_count": 2} """ if not validate_domain(domain): return json.dumps({"error": "Invalid domain format"}) try: backend, _ = get_backend_and_prefix(domain) except ValueError as e: return json.dumps({"error": str(e)}) result: Dict[str, Any] = { "domain": domain, "backend": backend, "status": "unknown", "servers": [], "healthy_count": 0, "total_count": 0 } try: # Get server states state_output = haproxy_cmd("show servers state") stat_output = haproxy_cmd("show stat") # Build status map from stat output (has UP/DOWN/MAINT status) status_map: Dict[str, Dict[str, str]] = {} for stat in parse_stat_csv(stat_output): if stat["pxname"] == backend and stat["svname"] not in ["FRONTEND", "BACKEND"]: status_map[stat["svname"]] = { "status": stat["status"], "check_status": stat["check_status"], "weight": stat["weight"] } # Parse server state for address info for line in state_output.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend: server_name = parts[StateField.SRV_NAME] addr = parts[StateField.SRV_ADDR] port = parts[StateField.SRV_PORT] # Skip disabled servers (0.0.0.0) if addr == "0.0.0.0": continue server_info: Dict[str, Any] = { "name": server_name, "addr": f"{addr}:{port}", "status": "unknown" } # Get status from stat output if server_name in status_map: server_info["status"] = status_map[server_name]["status"] server_info["check_status"] = status_map[server_name]["check_status"] server_info["weight"] = status_map[server_name]["weight"] result["servers"].append(server_info) result["total_count"] += 1 if server_info["status"] == "UP": result["healthy_count"] += 1 # Determine overall status if result["total_count"] == 0: result["status"] = "no_servers" elif result["healthy_count"] == result["total_count"]: result["status"] = "healthy" elif result["healthy_count"] > 0: result["status"] = "degraded" else: result["status"] = "down" return json.dumps(result, indent=2) except HaproxyError as e: return json.dumps({"error": str(e)}) @mcp.tool() def haproxy_backends() -> str: """List all HAProxy backends. Returns: List of all configured backend names """ try: result = haproxy_cmd("show backend") backends = [line for line in result.split("\n") if line and not line.startswith("#")] return "Backends:\n" + "\n".join(f"• {b}" for b in backends) except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_list_frontends() -> str: """List all HAProxy frontends with their status. Returns: List of frontends with status and session counts """ try: result = haproxy_cmd("show stat") frontends = [] for stat in parse_stat_csv(result): if stat["svname"] == "FRONTEND": frontends.append( f"• {stat['pxname']}: {stat['status']} (sessions: {stat['scur']})" ) if not frontends: return "No frontends found" return "Frontends:\n" + "\n".join(frontends) except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_set_server_state(backend: str, server: str, state: str) -> str: """Set server state for maintenance or traffic control. Use haproxy_list_servers to get the backend and server names. Args: backend: Backend name (e.g., "pool_1" from haproxy_list_servers) server: Server name (e.g., "pool_1_1" from haproxy_list_servers) state: Target state: - ready: Enable server for traffic - drain: Stop new connections, finish existing (graceful shutdown) - maint: Disable completely (maintenance mode) Returns: Success message or error description Example: # Put server in maintenance haproxy_set_server_state("pool_1", "pool_1_2", "maint") # Re-enable server haproxy_set_server_state("pool_1", "pool_1_2", "ready") """ if not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" if not validate_backend_name(server): return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)" if state not in ["ready", "drain", "maint"]: return "Error: state must be 'ready', 'drain', or 'maint'" try: haproxy_cmd_checked(f"set server {backend}/{server} state {state}") return f"Server {backend}/{server} set to {state}" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_get_server_health(backend: str = "") -> str: """Get health status of all servers (low-level view across all backends). For domain-specific health, use haproxy_domain_health instead. Args: backend: Optional backend name to filter (e.g., "pool_1"). If empty, shows all. Returns: List of servers with status: UP (healthy), DOWN (failed), MAINT (maintenance) Example: haproxy_get_server_health() # All servers haproxy_get_server_health("pool_1") # Only pool_1 backend """ if backend and not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" try: result = haproxy_cmd("show stat") servers = [] for stat in parse_stat_csv(result): if stat["svname"] not in ["FRONTEND", "BACKEND", ""]: if backend and stat["pxname"] != backend: continue servers.append( f"• {stat['pxname']}/{stat['svname']}: {stat['status']} " f"(weight: {stat['weight']}, check: {stat['check_status']})" ) return "\n".join(servers) if servers else "No servers found" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_set_server_weight(backend: str, server: str, weight: int) -> str: """Set server weight for load balancing ratio control. Higher weight = more traffic. Default is 1. Use 0 to disable without maintenance mode. Args: backend: Backend name (e.g., "pool_1") server: Server name (e.g., "pool_1_1") weight: Weight 0-256 (0 disables, higher = more traffic) Returns: Success message or error description Example: # Send 2x traffic to server 1 vs server 2 haproxy_set_server_weight("pool_1", "pool_1_1", 2) haproxy_set_server_weight("pool_1", "pool_1_2", 1) # Temporarily disable server (soft disable, not maintenance) haproxy_set_server_weight("pool_1", "pool_1_3", 0) """ if not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" if not validate_backend_name(server): return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)" if not (0 <= weight <= 256): return "Error: weight must be between 0 and 256" try: haproxy_cmd_checked(f"set server {backend}/{server} weight {weight}") return f"Server {backend}/{server} weight set to {weight}" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_get_connections(backend: str = "") -> str: """Get active connection counts per server. Useful for monitoring traffic distribution and identifying hot spots. Args: backend: Optional backend name to filter (e.g., "pool_1"). If empty, shows all. Returns: List of servers with current active connections Example: haproxy_get_connections() # All backends haproxy_get_connections("pool_1") # Only pool_1 # Output: pool_1/pool_1_1: 5 active connections """ if backend and not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" try: result = haproxy_cmd("show stat") connections = [] for stat in parse_stat_csv(result): if backend and stat["pxname"] != backend: continue if stat["svname"] in ["FRONTEND", "BACKEND"]: connections.append( f"• {stat['pxname']} ({stat['svname']}): {stat['scur']} current, {stat['smax']} max" ) elif stat["svname"]: connections.append(f" - {stat['svname']}: {stat['scur']} connections") return "\n".join(connections) if connections else "No connection data" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_reload() -> str: """Reload HAProxy configuration (validates config first). After reload, automatically restores server configurations from servers.json. Returns: Success message with restored server count, or error details if failed """ success, msg = reload_haproxy() if not success: return msg # Restore servers from config after reload try: restored = restore_servers_from_config() return f"HAProxy configuration reloaded successfully ({restored} servers restored)" except Exception as e: logger.error("Failed to restore servers after reload: %s", e) return f"HAProxy reloaded but server restore failed: {e}" @mcp.tool() def haproxy_check_config() -> str: """Validate HAProxy configuration file syntax. Returns: Validation result or error details """ try: result = subprocess.run( ["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if result.returncode == 0: return "Configuration is valid" return f"Configuration errors:\n{result.stderr}" except subprocess.TimeoutExpired: return f"Error: Command timed out after {SUBPROCESS_TIMEOUT} seconds" except FileNotFoundError: return "Error: podman command not found" except OSError as e: return f"Error: OS error: {e}" @mcp.tool() def haproxy_save_state() -> str: """Save current server state to disk atomically. Returns: Success message or error description """ try: state = haproxy_cmd("show servers state") atomic_write_file(STATE_FILE, state) return "Server state saved" except HaproxyError as e: return f"Error: {e}" except IOError as e: return f"Error: {e}" @mcp.tool() def haproxy_restore_state() -> str: """Restore server state from disk. Returns: Summary of restored servers or error description """ try: with open(STATE_FILE, "r", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_SH) except OSError: pass # Continue without lock if not supported try: state = f.read() finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass restored = 0 skipped = 0 for line in state.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and not line.startswith("#"): backend = parts[StateField.BE_NAME] server = parts[StateField.SRV_NAME] addr = parts[StateField.SRV_ADDR] port = parts[StateField.SRV_PORT] # Skip disabled servers if addr == "0.0.0.0": continue # Validate names from state file to prevent injection if not validate_backend_name(backend) or not validate_backend_name(server): skipped += 1 continue # Validate IP and port if not validate_ip(addr) or not validate_port(port): skipped += 1 continue try: haproxy_cmd_checked(f"set server {backend}/{server} addr {addr} port {port}") haproxy_cmd_checked(f"set server {backend}/{server} state ready") restored += 1 except HaproxyError as e: logger.warning("Failed to restore %s/%s: %s", backend, server, e) result = f"Server state restored ({restored} servers)" if skipped: result += f", {skipped} entries skipped due to validation" return result except FileNotFoundError: return "Error: No saved state found" except HaproxyError as e: return f"Error: {e}" def startup_restore() -> None: """Restore servers from config file on startup.""" # Wait for HAProxy to be ready for _ in range(STARTUP_RETRY_COUNT): try: haproxy_cmd("show info") break except HaproxyError: time.sleep(1) else: logger.warning("HAProxy not ready, skipping restore") return try: count = restore_servers_from_config() if count > 0: logger.info("Restored %d servers from config", count) except (HaproxyError, OSError, ValueError, json.JSONDecodeError) as e: logger.warning("Failed to restore servers: %s", e) if __name__ == "__main__": startup_restore() mcp.run(transport="streamable-http")