"""Health check tools for HAProxy MCP Server.""" import json import os import subprocess import time from typing import Annotated, Any from pydantic import Field from ..config import ( MAP_FILE, SERVERS_FILE, HAPROXY_CONTAINER, ) from ..exceptions import HaproxyError from ..validation import validate_domain, validate_backend_name from ..haproxy_client import haproxy_cmd from ..file_ops import get_backend_and_prefix from ..utils import parse_stat_csv, parse_servers_state def register_health_tools(mcp): """Register health check tools with MCP server.""" @mcp.tool() def haproxy_health() -> str: """Check overall system health (MCP server, HAProxy, config files). Use this for monitoring integration. Returns "healthy" if all components are OK. Returns: JSON with: - status: "healthy" or "unhealthy" - components.mcp: MCP server status - components.haproxy: HAProxy connectivity, version, uptime - components.config_files: map_file and servers_file accessibility Example: # Returns: {"status": "healthy", "components": {"mcp": {"status": "ok"}, ...}} """ result: dict[str, Any] = { "status": "healthy", "timestamp": time.time(), "components": { "mcp": {"status": "ok"}, "haproxy": {"status": "unknown"}, "config_files": {"status": "unknown"} } } # Check HAProxy connectivity try: info = haproxy_cmd("show info") for line in info.split("\n"): if line.startswith("Version:"): result["components"]["haproxy"]["version"] = line.split(":", 1)[1].strip() elif line.startswith("Uptime_sec:"): result["components"]["haproxy"]["uptime_sec"] = int(line.split(":", 1)[1].strip()) result["components"]["haproxy"]["status"] = "ok" except HaproxyError as e: result["components"]["haproxy"]["status"] = "error" result["components"]["haproxy"]["error"] = str(e) result["status"] = "degraded" # Check container status try: container_result = subprocess.run( ["podman", "inspect", "--format", "{{.State.Status}}", HAPROXY_CONTAINER], capture_output=True, text=True, timeout=5 ) if container_result.returncode == 0: container_status = container_result.stdout.strip() result["components"]["container"] = { "status": "ok" if container_status == "running" else container_status, "state": container_status } else: result["components"]["container"] = {"status": "error", "error": container_result.stderr.strip()} result["status"] = "unhealthy" except subprocess.TimeoutExpired: result["components"]["container"] = {"status": "timeout"} result["status"] = "unhealthy" except (OSError, subprocess.SubprocessError) as e: result["components"]["container"] = {"status": "error", "error": str(e)} # Check configuration files files_ok = True file_status: dict[str, str] = {} for name, path in [("map_file", MAP_FILE), ("servers_file", SERVERS_FILE)]: if os.path.exists(path): file_status[name] = "ok" else: file_status[name] = "missing" files_ok = False result["components"]["config_files"]["files"] = file_status result["components"]["config_files"]["status"] = "ok" if files_ok else "warning" if not files_ok: result["status"] = "degraded" return json.dumps(result, indent=2) @mcp.tool() def haproxy_domain_health( domain: Annotated[str, Field(description="Domain name to check health for (e.g., api.example.com)")] ) -> str: """Check health status of backend servers for a specific domain. Returns JSON with status (healthy/degraded/down/no_servers), server list, and counts. """ if not validate_domain(domain): return json.dumps({"error": "Invalid domain format"}) try: backend, _ = get_backend_and_prefix(domain) except ValueError as e: return json.dumps({"error": str(e)}) result: dict[str, Any] = { "domain": domain, "backend": backend, "status": "unknown", "servers": [], "healthy_count": 0, "total_count": 0 } try: # Get server states state_output = haproxy_cmd("show servers state") stat_output = haproxy_cmd("show stat") # Build status map from stat output (has UP/DOWN/MAINT status) status_map: dict[str, dict[str, str]] = {} for stat in parse_stat_csv(stat_output): if stat["pxname"] == backend and stat["svname"] not in ["FRONTEND", "BACKEND"]: status_map[stat["svname"]] = { "status": stat["status"], "check_status": stat["check_status"], "weight": stat["weight"] } # Parse server state for address info parsed_state = parse_servers_state(state_output) backend_servers = parsed_state.get(backend, {}) for server_name, srv_info in backend_servers.items(): addr = srv_info["addr"] port = srv_info["port"] # Skip disabled servers (0.0.0.0) if addr == "0.0.0.0": continue server_info: dict[str, Any] = { "name": server_name, "addr": f"{addr}:{port}", "status": "unknown" } # Get status from stat output if server_name in status_map: server_info["status"] = status_map[server_name]["status"] server_info["check_status"] = status_map[server_name]["check_status"] server_info["weight"] = status_map[server_name]["weight"] result["servers"].append(server_info) result["total_count"] += 1 if server_info["status"] == "UP": result["healthy_count"] += 1 # Determine overall status if result["total_count"] == 0: result["status"] = "no_servers" elif result["healthy_count"] == result["total_count"]: result["status"] = "healthy" elif result["healthy_count"] > 0: result["status"] = "degraded" else: result["status"] = "down" return json.dumps(result, indent=2) except HaproxyError as e: return json.dumps({"error": str(e)}) @mcp.tool() def haproxy_get_server_health( backend: Annotated[str, Field(default="", description="Optional: Backend name to filter (e.g., 'pool_1'). Empty = all backends")] = "" ) -> str: """Get health status of all servers (low-level view). For domain-specific, use haproxy_domain_health.""" if backend and not validate_backend_name(backend): return "Error: Invalid backend name (use alphanumeric, underscore, hyphen only)" try: result = haproxy_cmd("show stat") servers = [] for stat in parse_stat_csv(result): if stat["svname"] not in ["FRONTEND", "BACKEND", ""]: if backend and stat["pxname"] != backend: continue servers.append( f"• {stat['pxname']}/{stat['svname']}: {stat['status']} " f"(weight: {stat['weight']}, check: {stat['check_status']})" ) return "\n".join(servers) if servers else "No servers found" except HaproxyError as e: return f"Error: {e}"