## Large function extraction - servers.py: Extract 8 _impl functions from register_server_tools (449 lines) - certificates.py: Extract 7 _impl functions from register_certificate_tools (386 lines) - MCP tool wrappers now delegate to module-level implementation functions ## Exception handling improvements - Replace 11 broad `except Exception` with specific types - health.py: (OSError, subprocess.SubprocessError) - configuration.py: (HaproxyError, IOError, OSError, ValueError) - servers.py: (IOError, OSError, ValueError) - certificates.py: FileNotFoundError, (subprocess.SubprocessError, OSError) ## Duplicate code extraction - Add parse_servers_state() to utils.py (replaces 4 duplicate parsers) - Add disable_server_slot() to utils.py (replaces duplicate patterns) - Update health.py, servers.py, domains.py to use new helpers ## Other improvements - Add TypedDict types in file_ops.py and health.py - Set file permissions (0o600) for sensitive files - Update tests to use specific exception types Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
97 lines
2.3 KiB
Python
97 lines
2.3 KiB
Python
"""Input validation functions for HAProxy MCP Server."""
|
|
|
|
import ipaddress
|
|
from .config import DOMAIN_PATTERN, BACKEND_NAME_PATTERN, NON_ALNUM_PATTERN
|
|
|
|
|
|
def validate_domain(domain: str) -> bool:
|
|
"""Validate domain format.
|
|
|
|
Args:
|
|
domain: The domain name to validate
|
|
|
|
Returns:
|
|
True if domain is valid, False otherwise
|
|
"""
|
|
if not domain or len(domain) > 253:
|
|
return False
|
|
return bool(DOMAIN_PATTERN.match(domain))
|
|
|
|
|
|
def validate_ip(ip: str, allow_empty: bool = False) -> bool:
|
|
"""Validate IPv4 or IPv6 address format.
|
|
|
|
Args:
|
|
ip: The IP address to validate
|
|
allow_empty: If True, empty string is considered valid
|
|
|
|
Returns:
|
|
True if IP is valid, False otherwise
|
|
"""
|
|
if not ip:
|
|
return allow_empty
|
|
try:
|
|
ipaddress.ip_address(ip)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def validate_port(port: str) -> bool:
|
|
"""Validate port number is in valid range.
|
|
|
|
Args:
|
|
port: Port number as string
|
|
|
|
Returns:
|
|
True if port is valid (1-65535), False otherwise
|
|
"""
|
|
if not port or not port.isdigit():
|
|
return False
|
|
port_num = int(port)
|
|
return 1 <= port_num <= 65535
|
|
|
|
|
|
def validate_port_int(port: int) -> bool:
|
|
"""Validate port number as integer is in valid range.
|
|
|
|
Args:
|
|
port: Port number as integer
|
|
|
|
Returns:
|
|
True if port is valid (1-65535), False otherwise
|
|
"""
|
|
return isinstance(port, int) and 1 <= port <= 65535
|
|
|
|
|
|
def validate_backend_name(name: str) -> bool:
|
|
"""Validate backend or server name to prevent command injection.
|
|
|
|
Args:
|
|
name: The backend or server name to validate
|
|
|
|
Returns:
|
|
True if name contains only safe characters
|
|
"""
|
|
if not name or len(name) > 255:
|
|
return False
|
|
return bool(BACKEND_NAME_PATTERN.match(name))
|
|
|
|
|
|
def domain_to_backend(domain: str) -> str:
|
|
"""Convert domain to backend name (alphanumeric + underscore only).
|
|
|
|
Args:
|
|
domain: The domain name to convert
|
|
|
|
Returns:
|
|
Backend name with non-alphanumeric characters replaced by underscores
|
|
|
|
Raises:
|
|
ValueError: If resulting name is invalid
|
|
"""
|
|
result = NON_ALNUM_PATTERN.sub('_', domain)
|
|
if not validate_backend_name(result):
|
|
raise ValueError(f"Invalid backend name after conversion: {result}")
|
|
return result
|