#!/usr/bin/env python3 """HAProxy MCP Server - Direct Runtime API Integration This module provides an MCP (Model Context Protocol) server for managing HAProxy configuration and runtime state. It supports dynamic domain/server management with HTTP backends (SSL termination at HAProxy frontend). """ import socket import subprocess import re import json import sys import time import fcntl from typing import Dict, Generator, List, Optional, Set, Tuple from mcp.server.fastmcp import FastMCP mcp = FastMCP("haproxy", host="0.0.0.0", port=8000) # Constants HAPROXY_SOCKET: Tuple[str, int] = ("localhost", 9999) STATE_FILE: str = "/opt/haproxy/data/servers.state" MAP_FILE: str = "/opt/haproxy/conf/domains.map" MAP_FILE_CONTAINER: str = "/usr/local/etc/haproxy/domains.map" SERVERS_FILE: str = "/opt/haproxy/conf/servers.json" POOL_COUNT: int = 100 MAX_SLOTS: int = 10 # Validation patterns - compiled once for performance DOMAIN_PATTERN = re.compile( r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?' r'(\.[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?)*$' ) IP_PATTERN = re.compile(r'^(\d{1,3}\.){3}\d{1,3}$') # Backend/server names: alphanumeric, underscore, hyphen only BACKEND_NAME_PATTERN = re.compile(r'^[a-zA-Z0-9_-]+$') # Pattern for converting domain to backend name NON_ALNUM_PATTERN = re.compile(r'[^a-zA-Z0-9]') # Limits MAX_RESPONSE_SIZE = 10 * 1024 * 1024 # 10 MB max response from HAProxy SUBPROCESS_TIMEOUT = 30 # seconds STARTUP_RETRY_COUNT = 10 # HAProxy ready check retries STATE_MIN_COLUMNS = 19 # Minimum columns in HAProxy server state output SOCKET_TIMEOUT = 5 # seconds for HAProxy socket connection class HaproxyError(Exception): """HAProxy operation error""" pass # CSV field indices for HAProxy stats (show stat command) class StatField: """HAProxy CSV stat field indices.""" PXNAME = 0 # Proxy name (frontend/backend) SVNAME = 1 # Server name (or FRONTEND/BACKEND) SCUR = 4 # Current sessions SMAX = 6 # Max sessions STATUS = 17 # Status (UP/DOWN/MAINT/etc) WEIGHT = 18 # Server weight CHECK_STATUS = 36 # Check status def haproxy_cmd(command: str) -> str: """Send command to HAProxy Runtime API. Args: command: The HAProxy runtime API command to execute Returns: The response from HAProxy Raises: HaproxyError: If connection fails or response exceeds size limit """ try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(SOCKET_TIMEOUT) s.connect(HAPROXY_SOCKET) s.sendall(f"{command}\n".encode()) s.shutdown(socket.SHUT_WR) response = b"" while True: data = s.recv(8192) if not data: break response += data if len(response) > MAX_RESPONSE_SIZE: raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit") return response.decode().strip() except socket.timeout: raise HaproxyError("Connection timeout") except ConnectionRefusedError: raise HaproxyError("Connection refused - HAProxy not running?") except UnicodeDecodeError: raise HaproxyError("Invalid UTF-8 in response") except HaproxyError: raise except Exception as e: raise HaproxyError(str(e)) def reload_haproxy() -> Tuple[bool, str]: """Validate and reload HAProxy configuration. Returns: Tuple of (success, message) """ try: validate = subprocess.run( ["podman", "exec", "haproxy", "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if validate.returncode != 0: return False, f"Config validation failed:\n{validate.stderr}" result = subprocess.run( ["podman", "kill", "--signal", "USR2", "haproxy"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if result.returncode != 0: return False, f"Reload failed: {result.stderr}" return True, "OK" except subprocess.TimeoutExpired: return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds" except FileNotFoundError: return False, "podman command not found" except OSError as e: return False, f"OS error: {e}" def validate_domain(domain: str) -> bool: """Validate domain format. Args: domain: The domain name to validate Returns: True if domain is valid, False otherwise """ if not domain or len(domain) > 253: return False return bool(DOMAIN_PATTERN.match(domain)) def validate_ip(ip: str, allow_empty: bool = False) -> bool: """Validate IPv4 address format. Args: ip: The IP address to validate allow_empty: If True, empty string is considered valid Returns: True if IP is valid, False otherwise """ if not ip: return allow_empty if not IP_PATTERN.match(ip): return False return all(0 <= int(octet) <= 255 for octet in ip.split('.')) def validate_backend_name(name: str) -> bool: """Validate backend or server name to prevent command injection. Args: name: The backend or server name to validate Returns: True if name contains only safe characters """ if not name or len(name) > 255: return False return bool(BACKEND_NAME_PATTERN.match(name)) def domain_to_backend(domain: str) -> str: """Convert domain to backend name (alphanumeric + underscore only). Args: domain: The domain name to convert Returns: Backend name with non-alphanumeric characters replaced by underscores Raises: ValueError: If resulting name is invalid """ result = NON_ALNUM_PATTERN.sub('_', domain) if not validate_backend_name(result): raise ValueError(f"Invalid backend name after conversion: {result}") return result def parse_stat_csv(stat_output: str) -> Generator[Dict[str, str], None, None]: """Parse HAProxy stat CSV output into structured data. Args: stat_output: Raw output from 'show stat' command Yields: Dictionaries with parsed stat fields for each row """ for line in stat_output.split("\n"): if not line or line.startswith("#"): continue parts = line.split(",") if len(parts) > StatField.STATUS: yield { "pxname": parts[StatField.PXNAME], "svname": parts[StatField.SVNAME], "scur": parts[StatField.SCUR] if len(parts) > StatField.SCUR else "0", "smax": parts[StatField.SMAX] if len(parts) > StatField.SMAX else "0", "status": parts[StatField.STATUS], "weight": parts[StatField.WEIGHT] if len(parts) > StatField.WEIGHT else "0", "check_status": parts[StatField.CHECK_STATUS] if len(parts) > StatField.CHECK_STATUS else "", } def validate_port(port: str) -> bool: """Validate port number is in valid range. Args: port: Port number as string Returns: True if port is valid (1-65535), False otherwise """ if not port or not port.isdigit(): return False port_num = int(port) return 1 <= port_num <= 65535 def get_map_contents() -> List[Tuple[str, str]]: """Read domains.map file and return list of (domain, backend) tuples. Returns: List of (domain, backend) tuples from the map file """ entries = [] try: with open(MAP_FILE, "r", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_SH) except OSError: pass # Continue without lock if not supported try: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split() if len(parts) >= 2: entries.append((parts[0], parts[1])) finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass except FileNotFoundError: pass return entries def find_available_pool() -> Optional[str]: """Find first unused pool from pool_1 to pool_{POOL_COUNT}. Returns: Pool name (e.g., 'pool_1') if available, None if all pools are used """ used_pools: Set[str] = set() for domain, backend in get_map_contents(): if backend.startswith("pool_"): used_pools.add(backend) for i in range(1, POOL_COUNT + 1): pool_name = f"pool_{i}" if pool_name not in used_pools: return pool_name return None def get_domain_backend(domain: str) -> Optional[str]: """Look up the backend for a domain from domains.map. Args: domain: The domain to look up Returns: Backend name if found, None otherwise """ for map_domain, backend in get_map_contents(): if map_domain == domain: return backend return None def is_legacy_backend(backend: str) -> bool: """Check if backend is a legacy static backend (not a pool). Args: backend: Backend name to check Returns: True if this is a legacy backend, False if it's a pool """ return not backend.startswith("pool_") def get_legacy_backend_name(domain: str) -> str: """Convert domain to legacy backend name format. Args: domain: Domain name Returns: Legacy backend name (e.g., 'api_example_com_backend') """ return f"{domain_to_backend(domain)}_backend" def save_map_file(entries: List[Tuple[str, str]]) -> None: """Save entries to domains.map file with file locking. Args: entries: List of (domain, backend) tuples to write Raises: IOError: If the file cannot be written """ with open(MAP_FILE, "w", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_EX) except OSError: pass # Continue without lock if not supported try: f.write("# Domain to Backend mapping\n") f.write("# Format: domain backend_name\n") f.write("# Wildcard: .domain.com matches *.domain.com\n\n") for domain, backend in entries: f.write(f"{domain} {backend}\n") finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass def load_servers_config() -> Dict: """Load servers configuration from JSON file with file locking. Returns: Dictionary with server configurations """ try: with open(SERVERS_FILE, "r", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_SH) except OSError: pass # Continue without lock if not supported try: return json.load(f) finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass except FileNotFoundError: return {} except json.JSONDecodeError as e: print(f"Warning: Corrupt config file {SERVERS_FILE}: {e}", file=sys.stderr) return {} def save_servers_config(config: Dict) -> None: """Save servers configuration to JSON file with file locking. Args: config: Dictionary with server configurations """ with open(SERVERS_FILE, "w", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_EX) except OSError: pass # Continue without lock if not supported try: json.dump(config, f, indent=2) finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass def add_server_to_config(domain: str, slot: int, ip: str, http_port: int) -> None: """Add server configuration to persistent storage. Args: domain: Domain name slot: Server slot (1 to MAX_SLOTS) ip: Server IP address http_port: HTTP port """ config = load_servers_config() if domain not in config: config[domain] = {} config[domain][str(slot)] = { "ip": ip, "http_port": http_port } save_servers_config(config) def remove_server_from_config(domain: str, slot: int) -> None: """Remove server configuration from persistent storage. Args: domain: Domain name slot: Server slot to remove """ config = load_servers_config() if domain in config and str(slot) in config[domain]: del config[domain][str(slot)] if not config[domain]: del config[domain] save_servers_config(config) def remove_domain_from_config(domain: str) -> None: """Remove all server configurations for a domain. Args: domain: Domain name to remove """ config = load_servers_config() if domain in config: del config[domain] save_servers_config(config) def get_server_suffixes(http_port: int) -> List[Tuple[str, int]]: """Get server suffixes and ports based on port configuration. Args: http_port: HTTP port for backend Returns: List of (suffix, port) tuples - always HTTP only """ return [("", http_port)] def restore_servers_from_config() -> int: """Restore all servers from configuration file. Returns: Number of servers restored """ config = load_servers_config() restored = 0 for domain, slots in config.items(): backend = get_domain_backend(domain) if not backend: continue try: if backend.startswith("pool_"): server_prefix = backend else: server_prefix = domain_to_backend(domain) except ValueError as e: print(f"Warning: Invalid domain '{domain}': {e}", file=sys.stderr) continue for slot_str, server_info in slots.items(): try: slot = int(slot_str) except ValueError: print(f"Warning: Invalid slot '{slot_str}' for {domain}, skipping", file=sys.stderr) continue ip = server_info.get("ip", "") if not ip: continue try: http_port = int(server_info.get("http_port", 80)) except (ValueError, TypeError): print(f"Warning: Invalid port for {domain} slot {slot}, skipping", file=sys.stderr) continue try: for suffix, port in get_server_suffixes(http_port): server = f"{server_prefix}{suffix}_{slot}" haproxy_cmd(f"set server {backend}/{server} addr {ip} port {port}") haproxy_cmd(f"set server {backend}/{server} state ready") restored += 1 except HaproxyError as e: print(f"Warning: Failed to restore {domain} slot {slot}: {e}", file=sys.stderr) return restored @mcp.tool() def haproxy_list_domains() -> str: """List all configured domains with their backend servers. Returns: List of domains with their associated backend servers """ try: domains = [] state = haproxy_cmd("show servers state") # Build server map from HAProxy state server_map: Dict[str, list] = {} for line in state.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and parts[4] != "0.0.0.0": backend = parts[1] if backend not in server_map: server_map[backend] = [] server_map[backend].append(f"{parts[3]}={parts[4]}:{parts[18]}") # Read from domains.map (skip wildcard entries starting with .) seen_domains: Set[str] = set() for domain, backend in get_map_contents(): if domain.startswith("."): continue if domain in seen_domains: continue seen_domains.add(domain) servers = server_map.get(backend, ["(none)"]) backend_type = "pool" if backend.startswith("pool_") else "static" domains.append(f"• {domain} -> {backend} ({backend_type}): {', '.join(servers)}") return "\n".join(domains) if domains else "No domains configured" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_add_domain(domain: str, ip: str = "", http_port: int = 80) -> str: """Add a new domain to HAProxy using map-based routing (no reload required). Args: domain: The domain name to add (e.g., api.example.com) ip: Optional IP address for initial server; if provided, adds to slot 1 http_port: HTTP port for the backend server (default: 80) Returns: Success message or error description """ # Validate inputs if not validate_domain(domain): return "Error: Invalid domain format" if not validate_ip(ip, allow_empty=True): return "Error: Invalid IP address format" if not (1 <= http_port <= 65535): return "Error: Port must be between 1 and 65535" # Check if domain already exists existing_backend = get_domain_backend(domain) if existing_backend: return f"Error: Domain {domain} already exists (mapped to {existing_backend})" # Find available pool pool = find_available_pool() if not pool: return f"Error: No available pools (all {POOL_COUNT} pools are in use)" try: # Update HAProxy map via Runtime API first (immediate effect) haproxy_cmd(f"add map {MAP_FILE_CONTAINER} {domain} {pool}") haproxy_cmd(f"add map {MAP_FILE_CONTAINER} .{domain} {pool}") # Read current map entries and save to file (persistence) entries = get_map_contents() entries.append((domain, pool)) entries.append((f".{domain}", pool)) save_map_file(entries) # If IP provided, add server to slot 1 if ip: for suffix, port in get_server_suffixes(http_port): server = f"{pool}{suffix}_1" haproxy_cmd(f"set server {pool}/{server} addr {ip} port {port}") haproxy_cmd(f"set server {pool}/{server} state ready") # Save to persistent config add_server_to_config(domain, 1, ip, http_port) return f"Domain {domain} added to {pool} with server {ip}:{http_port}" return f"Domain {domain} added to {pool} (no servers configured)" except HaproxyError as e: return f"Error: {e}" except IOError as e: return f"Error: Failed to update map file: {e}" @mcp.tool() def haproxy_remove_domain(domain: str) -> str: """Remove a domain from HAProxy (no reload required for pool-based domains). Args: domain: The domain name to remove Returns: Success message or error description """ if not validate_domain(domain): return "Error: Invalid domain format" # Look up the domain in the map backend = get_domain_backend(domain) if not backend: return f"Error: Domain {domain} not found" # Check if this is a legacy backend (not a pool) if is_legacy_backend(backend): return f"Error: Cannot remove legacy domain {domain} (uses static backend {backend})" try: # Clear map entries via Runtime API first (immediate effect) haproxy_cmd(f"del map {MAP_FILE_CONTAINER} {domain}") haproxy_cmd(f"del map {MAP_FILE_CONTAINER} .{domain}") # Remove entries from map file (persistence) entries = get_map_contents() new_entries = [(d, b) for d, b in entries if d != domain and d != f".{domain}"] save_map_file(new_entries) # Disable all servers in the pool (reset to 0.0.0.0:0) for slot in range(1, MAX_SLOTS + 1): server = f"{backend}_{slot}" try: haproxy_cmd(f"set server {backend}/{server} state maint") haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0") except HaproxyError: pass # Ignore errors for individual servers # Remove from persistent config remove_domain_from_config(domain) return f"Domain {domain} removed from {backend}" except HaproxyError as e: return f"Error: {e}" except IOError as e: return f"Error: Failed to update map file: {e}" @mcp.tool() def haproxy_list_servers(domain: str) -> str: """List all servers for a specific domain. Args: domain: The domain name to list servers for Returns: List of servers with their addresses and status """ if not validate_domain(domain): return "Error: Invalid domain format" try: # Look up backend from map backend = get_domain_backend(domain) if not backend: # Fall back to legacy naming convention backend = get_legacy_backend_name(domain) servers = [] state = haproxy_cmd("show servers state") for line in state.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and parts[1] == backend: status = "active" if parts[4] != "0.0.0.0" else "disabled" servers.append(f"• {parts[3]}: {parts[4]}:{parts[18]} ({status})") if not servers: return f"Backend {backend} not found" return f"Servers for {domain} ({backend}):\n" + "\n".join(servers) except (HaproxyError, ValueError) as e: return f"Error: {e}" @mcp.tool() def haproxy_add_server(domain: str, slot: int, ip: str, http_port: int = 80) -> str: """Add a server to a domain's backend at specified slot. Args: domain: The domain name to add the server to slot: Server slot number (1 to MAX_SLOTS) ip: IP address of the server (required) http_port: HTTP port (default: 80) Returns: Success message or error description """ if not validate_domain(domain): return "Error: Invalid domain format" if not ip: return "Error: IP address is required" if not validate_ip(ip): return "Error: Invalid IP address format" if not (1 <= slot <= MAX_SLOTS): return f"Error: Slot must be between 1 and {MAX_SLOTS}" if not (1 <= http_port <= 65535): return "Error: Port must be between 1 and 65535" try: # Look up backend from map backend = get_domain_backend(domain) if not backend: # Fall back to legacy naming convention backend = get_legacy_backend_name(domain) # Determine server name prefix based on backend type if backend.startswith("pool_"): # Pool backends use pool_N_slot naming server_prefix = backend else: # Legacy backends use domain-based naming server_prefix = domain_to_backend(domain) results = [] for suffix, port in get_server_suffixes(http_port): server = f"{server_prefix}{suffix}_{slot}" haproxy_cmd(f"set server {backend}/{server} addr {ip} port {port}") haproxy_cmd(f"set server {backend}/{server} state ready") results.append(f"{server} → {ip}:{port}") # Save to persistent config add_server_to_config(domain, slot, ip, http_port) return f"Added to {domain} ({backend}) slot {slot}:\n" + "\n".join(results) except (HaproxyError, ValueError) as e: return f"Error: {e}" @mcp.tool() def haproxy_remove_server(domain: str, slot: int) -> str: """Remove a server from a domain's backend at specified slot. Args: domain: The domain name to remove the server from slot: Server slot number (1 to MAX_SLOTS) to remove Returns: Success message or error description """ if not validate_domain(domain): return "Error: Invalid domain format" if not (1 <= slot <= MAX_SLOTS): return f"Error: Slot must be between 1 and {MAX_SLOTS}" try: # Look up backend from map backend = get_domain_backend(domain) if not backend: # Fall back to legacy naming convention backend = get_legacy_backend_name(domain) # Determine server name prefix based on backend type if backend.startswith("pool_"): # Pool backends use pool_N_slot naming server_prefix = backend else: # Legacy backends use domain-based naming server_prefix = domain_to_backend(domain) # HTTP only - single server per slot server = f"{server_prefix}_{slot}" haproxy_cmd(f"set server {backend}/{server} state maint") haproxy_cmd(f"set server {backend}/{server} addr 0.0.0.0 port 0") # Remove from persistent config remove_server_from_config(domain, slot) return f"Removed server at slot {slot} from {domain} ({backend})" except (HaproxyError, ValueError) as e: return f"Error: {e}" @mcp.tool() def haproxy_stats() -> str: """Get HAProxy status and statistics. Returns: Key HAProxy metrics (name, version, uptime, connections, etc.) """ try: result = haproxy_cmd("show info") stats = {} for line in result.split("\n"): if ":" in line: key, value = line.split(":", 1) stats[key.strip()] = value.strip() important = ["Name", "Version", "Uptime_sec", "CurrConns", "MaxConn", "Run_queue", "Tasks"] output = [] for key in important: if key in stats: output.append(f"• {key}: {stats[key]}") return "\n".join(output) if output else result except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_backends() -> str: """List all HAProxy backends. Returns: List of all configured backend names """ try: result = haproxy_cmd("show backend") backends = [line for line in result.split("\n") if line and not line.startswith("#")] return "Backends:\n" + "\n".join(f"• {b}" for b in backends) except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_list_frontends() -> str: """List all HAProxy frontends with their status. Returns: List of frontends with status and session counts """ try: result = haproxy_cmd("show stat") frontends = [] for stat in parse_stat_csv(result): if stat["svname"] == "FRONTEND": frontends.append( f"• {stat['pxname']}: {stat['status']} (sessions: {stat['scur']})" ) if not frontends: return "No frontends found" return "Frontends:\n" + "\n".join(frontends) except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_set_server_state(backend: str, server: str, state: str) -> str: """Set server state. States: ready (enable), drain (graceful shutdown), maint (maintenance/disable) Args: backend: Backend name (alphanumeric, underscore, hyphen only) server: Server name within the backend state: Target state - ready, drain, or maint Returns: Success message or error description """ if not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" if not validate_backend_name(server): return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)" if state not in ["ready", "drain", "maint"]: return "Error: state must be 'ready', 'drain', or 'maint'" try: result = haproxy_cmd(f"set server {backend}/{server} state {state}") return result if result else f"Server {backend}/{server} set to {state}" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_get_server_health(backend: str = "") -> str: """Get health status of all servers or servers in a specific backend. Args: backend: Optional backend name to filter results Returns: Server health status or error description """ if backend and not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" try: result = haproxy_cmd("show stat") servers = [] for stat in parse_stat_csv(result): if stat["svname"] not in ["FRONTEND", "BACKEND", ""]: if backend and stat["pxname"] != backend: continue servers.append( f"• {stat['pxname']}/{stat['svname']}: {stat['status']} " f"(weight: {stat['weight']}, check: {stat['check_status']})" ) return "\n".join(servers) if servers else "No servers found" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_set_server_weight(backend: str, server: str, weight: int) -> str: """Set server weight (0-256). Weight 0 disables the server for new connections. Args: backend: Backend name (alphanumeric, underscore, hyphen only) server: Server name within the backend weight: Server weight from 0 to 256 Returns: Success message or error description """ if not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" if not validate_backend_name(server): return "Error: Invalid server name (use alphanumeric, underscore, hyphen only)" if not (0 <= weight <= 256): return "Error: weight must be between 0 and 256" try: result = haproxy_cmd(f"set server {backend}/{server} weight {weight}") return result if result else f"Server {backend}/{server} weight set to {weight}" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_get_connections(backend: str = "") -> str: """Get active connections per server. Args: backend: Optional backend name to filter results Returns: Connection statistics or error description """ if backend and not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" try: result = haproxy_cmd("show stat") connections = [] for stat in parse_stat_csv(result): if backend and stat["pxname"] != backend: continue if stat["svname"] in ["FRONTEND", "BACKEND"]: connections.append( f"• {stat['pxname']} ({stat['svname']}): {stat['scur']} current, {stat['smax']} max" ) elif stat["svname"]: connections.append(f" - {stat['svname']}: {stat['scur']} connections") return "\n".join(connections) if connections else "No connection data" except HaproxyError as e: return f"Error: {e}" @mcp.tool() def haproxy_reload() -> str: """Reload HAProxy configuration (validates config first). Returns: Success message or error details if validation/reload failed """ success, msg = reload_haproxy() if not success: return msg return "HAProxy configuration reloaded successfully" @mcp.tool() def haproxy_check_config() -> str: """Validate HAProxy configuration file syntax. Returns: Validation result or error details """ try: result = subprocess.run( ["podman", "exec", "haproxy", "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if result.returncode == 0: return "Configuration is valid" return f"Configuration errors:\n{result.stderr}" except subprocess.TimeoutExpired: return f"Error: Command timed out after {SUBPROCESS_TIMEOUT} seconds" except FileNotFoundError: return "Error: podman command not found" except OSError as e: return f"Error: OS error: {e}" @mcp.tool() def haproxy_save_state() -> str: """Save current server state to disk. Returns: Success message or error description """ try: state = haproxy_cmd("show servers state") with open(STATE_FILE, "w", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_EX) except OSError: pass # Continue without lock if not supported try: f.write(state) finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass return "Server state saved" except HaproxyError as e: return f"Error: {e}" except IOError as e: return f"Error: Failed to save state: {e}" @mcp.tool() def haproxy_restore_state() -> str: """Restore server state from disk. Returns: Summary of restored servers or error description """ try: with open(STATE_FILE, "r", encoding="utf-8") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_SH) except OSError: pass # Continue without lock if not supported try: state = f.read() finally: try: fcntl.flock(f.fileno(), fcntl.LOCK_UN) except OSError: pass restored = 0 skipped = 0 for line in state.split("\n"): parts = line.split() if len(parts) >= STATE_MIN_COLUMNS and not line.startswith("#"): backend = parts[1] server = parts[3] addr = parts[4] port = parts[18] # Skip disabled servers if addr == "0.0.0.0": continue # Validate names from state file to prevent injection if not validate_backend_name(backend) or not validate_backend_name(server): skipped += 1 continue # Validate IP and port if not validate_ip(addr) or not validate_port(port): skipped += 1 continue haproxy_cmd(f"set server {backend}/{server} addr {addr} port {port}") haproxy_cmd(f"enable server {backend}/{server}") restored += 1 result = f"Server state restored ({restored} servers)" if skipped: result += f", {skipped} entries skipped due to validation" return result except FileNotFoundError: return "Error: No saved state found" except HaproxyError as e: return f"Error: {e}" def startup_restore() -> None: """Restore servers from config file on startup.""" # Wait for HAProxy to be ready for _ in range(STARTUP_RETRY_COUNT): try: haproxy_cmd("show info") break except HaproxyError: time.sleep(1) else: print("Warning: HAProxy not ready, skipping restore", file=sys.stderr) return try: count = restore_servers_from_config() if count > 0: print(f"Restored {count} servers from config", file=sys.stderr) except (HaproxyError, OSError, ValueError, json.JSONDecodeError) as e: print(f"Warning: Failed to restore servers: {e}", file=sys.stderr) if __name__ == "__main__": startup_restore() mcp.run(transport="streamable-http")