refactor: Extract large functions, improve exception handling, remove duplicates

## Large function extraction
- servers.py: Extract 8 _impl functions from register_server_tools (449 lines)
- certificates.py: Extract 7 _impl functions from register_certificate_tools (386 lines)
- MCP tool wrappers now delegate to module-level implementation functions

## Exception handling improvements
- Replace 11 broad `except Exception` with specific types
- health.py: (OSError, subprocess.SubprocessError)
- configuration.py: (HaproxyError, IOError, OSError, ValueError)
- servers.py: (IOError, OSError, ValueError)
- certificates.py: FileNotFoundError, (subprocess.SubprocessError, OSError)

## Duplicate code extraction
- Add parse_servers_state() to utils.py (replaces 4 duplicate parsers)
- Add disable_server_slot() to utils.py (replaces duplicate patterns)
- Update health.py, servers.py, domains.py to use new helpers

## Other improvements
- Add TypedDict types in file_ops.py and health.py
- Set file permissions (0o600) for sensitive files
- Update tests to use specific exception types

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
kaffa
2026-02-03 13:23:51 +09:00
parent e66c5ddc7f
commit 06ab47aca8
12 changed files with 891 additions and 723 deletions

View File

@@ -12,14 +12,12 @@ from ..config import (
MAP_FILE,
SERVERS_FILE,
HAPROXY_CONTAINER,
StateField,
STATE_MIN_COLUMNS,
)
from ..exceptions import HaproxyError
from ..validation import validate_domain, validate_backend_name
from ..haproxy_client import haproxy_cmd
from ..file_ops import get_backend_and_prefix
from ..utils import parse_stat_csv
from ..utils import parse_stat_csv, parse_servers_state
def register_health_tools(mcp):
@@ -83,7 +81,7 @@ def register_health_tools(mcp):
except subprocess.TimeoutExpired:
result["components"]["container"] = {"status": "timeout"}
result["status"] = "unhealthy"
except Exception as e:
except (OSError, subprocess.SubprocessError) as e:
result["components"]["container"] = {"status": "error", "error": str(e)}
# Check configuration files
@@ -144,34 +142,34 @@ def register_health_tools(mcp):
}
# Parse server state for address info
for line in state_output.split("\n"):
parts = line.split()
if len(parts) >= STATE_MIN_COLUMNS and parts[StateField.BE_NAME] == backend:
server_name = parts[StateField.SRV_NAME]
addr = parts[StateField.SRV_ADDR]
port = parts[StateField.SRV_PORT]
parsed_state = parse_servers_state(state_output)
backend_servers = parsed_state.get(backend, {})
# Skip disabled servers (0.0.0.0)
if addr == "0.0.0.0":
continue
for server_name, srv_info in backend_servers.items():
addr = srv_info["addr"]
port = srv_info["port"]
server_info: dict[str, Any] = {
"name": server_name,
"addr": f"{addr}:{port}",
"status": "unknown"
}
# Skip disabled servers (0.0.0.0)
if addr == "0.0.0.0":
continue
# Get status from stat output
if server_name in status_map:
server_info["status"] = status_map[server_name]["status"]
server_info["check_status"] = status_map[server_name]["check_status"]
server_info["weight"] = status_map[server_name]["weight"]
server_info: dict[str, Any] = {
"name": server_name,
"addr": f"{addr}:{port}",
"status": "unknown"
}
result["servers"].append(server_info)
result["total_count"] += 1
# Get status from stat output
if server_name in status_map:
server_info["status"] = status_map[server_name]["status"]
server_info["check_status"] = status_map[server_name]["check_status"]
server_info["weight"] = status_map[server_name]["weight"]
if server_info["status"] == "UP":
result["healthy_count"] += 1
result["servers"].append(server_info)
result["total_count"] += 1
if server_info["status"] == "UP":
result["healthy_count"] += 1
# Determine overall status
if result["total_count"] == 0: