"""HAProxy Runtime API client functions.""" import socket import subprocess import select import time from .config import ( HAPROXY_SOCKET, HAPROXY_CONTAINER, SOCKET_TIMEOUT, SOCKET_RECV_TIMEOUT, MAX_RESPONSE_SIZE, SUBPROCESS_TIMEOUT, ) from .exceptions import HaproxyError def haproxy_cmd(command: str) -> str: """Send command to HAProxy Runtime API. Args: command: The HAProxy runtime API command to execute Returns: The response from HAProxy Raises: HaproxyError: If connection fails, times out, or response exceeds size limit """ try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(SOCKET_TIMEOUT) s.connect(HAPROXY_SOCKET) s.sendall(f"{command}\n".encode()) s.shutdown(socket.SHUT_WR) # Set socket to non-blocking for select-based recv loop s.setblocking(False) response = b"" start_time = time.time() while True: # Check for overall timeout elapsed = time.time() - start_time if elapsed >= SOCKET_RECV_TIMEOUT: raise HaproxyError(f"Response timeout after {SOCKET_RECV_TIMEOUT} seconds") # Wait for data with timeout (remaining time) remaining = SOCKET_RECV_TIMEOUT - elapsed ready, _, _ = select.select([s], [], [], min(remaining, 1.0)) if ready: data = s.recv(8192) if not data: break response += data if len(response) > MAX_RESPONSE_SIZE: raise HaproxyError(f"Response exceeded {MAX_RESPONSE_SIZE} bytes limit") return response.decode().strip() except socket.timeout: raise HaproxyError("Connection timeout") except ConnectionRefusedError: raise HaproxyError("Connection refused - HAProxy not running?") except UnicodeDecodeError: raise HaproxyError("Invalid UTF-8 in response") except HaproxyError: raise except Exception as e: raise HaproxyError(str(e)) from e def haproxy_cmd_checked(command: str) -> str: """Send command to HAProxy and raise on error response. Args: command: HAProxy command to execute Returns: Command response Raises: HaproxyError: If HAProxy returns an error message """ result = haproxy_cmd(command) _check_response_for_errors(result) return result def _check_response_for_errors(response: str) -> None: """Check HAProxy response for error indicators. Args: response: Response string from HAProxy Raises: HaproxyError: If response contains error indicators """ error_indicators = ["No such", "not found", "error", "failed", "invalid", "unknown"] if response: response_lower = response.lower() for indicator in error_indicators: if indicator.lower() in response_lower: raise HaproxyError(f"HAProxy command failed: {response.strip()}") def haproxy_cmd_batch(commands: list[str]) -> list[str]: """Send multiple commands to HAProxy. Note: HAProxy Runtime API only processes the first command when multiple commands are sent on a single connection. This function sends each command on a separate connection to ensure all commands are executed. Args: commands: List of HAProxy commands to execute Returns: List of responses for each command Raises: HaproxyError: If connection fails or any command returns an error """ if not commands: return [] if len(commands) == 1: return [haproxy_cmd_checked(commands[0])] # Send each command on separate connection (HAProxy limitation) responses = [] for cmd in commands: try: resp = haproxy_cmd_checked(cmd) responses.append(resp) except HaproxyError: raise return responses def reload_haproxy() -> tuple[bool, str]: """Validate and reload HAProxy configuration. Returns: Tuple of (success, message) """ try: validate = subprocess.run( ["podman", "exec", HAPROXY_CONTAINER, "haproxy", "-c", "-f", "/usr/local/etc/haproxy/haproxy.cfg"], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if validate.returncode != 0: return False, f"Config validation failed:\n{validate.stderr}" result = subprocess.run( ["podman", "kill", "--signal", "USR2", HAPROXY_CONTAINER], capture_output=True, text=True, timeout=SUBPROCESS_TIMEOUT ) if result.returncode != 0: return False, f"Reload failed: {result.stderr}" return True, "OK" except subprocess.TimeoutExpired: return False, f"Command timed out after {SUBPROCESS_TIMEOUT} seconds" except FileNotFoundError: return False, "podman command not found" except OSError as e: return False, f"OS error: {e}"