- Add UserKnownHostsFile=/dev/null to prevent write errors on read-only .ssh - Wrap all SSH commands with 'bash -c' for fish shell compatibility on remote Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
150 lines
4.0 KiB
Python
150 lines
4.0 KiB
Python
"""SSH remote execution for HAProxy MCP Server.
|
|
|
|
When REMOTE_MODE is enabled (SSH_HOST is set), file I/O and subprocess
|
|
commands are executed on the remote HAProxy host via SSH.
|
|
"""
|
|
|
|
import subprocess
|
|
from .config import (
|
|
SSH_HOST,
|
|
SSH_USER,
|
|
SSH_KEY,
|
|
SSH_PORT,
|
|
REMOTE_MODE,
|
|
SUBPROCESS_TIMEOUT,
|
|
logger,
|
|
)
|
|
|
|
|
|
def _ssh_base_cmd() -> list[str]:
|
|
"""Build base SSH command with options."""
|
|
cmd = [
|
|
"ssh",
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null",
|
|
"-o", "BatchMode=yes",
|
|
"-o", "ConnectTimeout=10",
|
|
"-p", str(SSH_PORT),
|
|
]
|
|
if SSH_KEY:
|
|
cmd.extend(["-i", SSH_KEY])
|
|
cmd.append(f"{SSH_USER}@{SSH_HOST}")
|
|
return cmd
|
|
|
|
|
|
def remote_exec(command: str, timeout: int = SUBPROCESS_TIMEOUT) -> subprocess.CompletedProcess:
|
|
"""Execute a command on the remote host via SSH.
|
|
|
|
Args:
|
|
command: Shell command to execute remotely
|
|
timeout: Command timeout in seconds
|
|
|
|
Returns:
|
|
CompletedProcess with stdout/stderr
|
|
|
|
Raises:
|
|
subprocess.TimeoutExpired: If command times out
|
|
OSError: If SSH command fails to execute
|
|
"""
|
|
ssh_cmd = _ssh_base_cmd() + ["bash", "-c", command]
|
|
logger.debug("SSH exec: %s", command)
|
|
return subprocess.run(
|
|
ssh_cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
)
|
|
|
|
|
|
def remote_read_file(path: str) -> str:
|
|
"""Read a file from the remote host.
|
|
|
|
Args:
|
|
path: Absolute file path on remote host
|
|
|
|
Returns:
|
|
File contents as string
|
|
|
|
Raises:
|
|
FileNotFoundError: If file doesn't exist on remote
|
|
IOError: If read fails
|
|
"""
|
|
result = remote_exec(f"cat {path}")
|
|
if result.returncode != 0:
|
|
stderr = result.stderr.strip()
|
|
if "No such file" in stderr:
|
|
raise FileNotFoundError(f"Remote file not found: {path}")
|
|
raise IOError(f"Failed to read remote file {path}: {stderr}")
|
|
return result.stdout
|
|
|
|
|
|
def remote_write_file(path: str, content: str) -> None:
|
|
"""Write content to a file on the remote host atomically.
|
|
|
|
Uses temp file + mv for atomic write, matching local behavior.
|
|
|
|
Args:
|
|
path: Absolute file path on remote host
|
|
content: Content to write
|
|
|
|
Raises:
|
|
IOError: If write fails
|
|
"""
|
|
ssh_cmd = _ssh_base_cmd()
|
|
# Atomic write: write to temp file, then rename (force bash for compatibility)
|
|
remote_script = f"tmpf=$(mktemp {path}.tmp.XXXXXX) && cat > \"$tmpf\" && mv \"$tmpf\" {path}"
|
|
ssh_cmd.extend(["bash", "-c", remote_script])
|
|
|
|
logger.debug("SSH write: %s (%d bytes)", path, len(content))
|
|
result = subprocess.run(
|
|
ssh_cmd,
|
|
input=content,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=SUBPROCESS_TIMEOUT,
|
|
)
|
|
if result.returncode != 0:
|
|
raise IOError(f"Failed to write remote file {path}: {result.stderr.strip()}")
|
|
|
|
|
|
def remote_file_exists(path: str) -> bool:
|
|
"""Check if a file exists on the remote host.
|
|
|
|
Args:
|
|
path: Absolute file path on remote host
|
|
|
|
Returns:
|
|
True if file exists
|
|
"""
|
|
result = remote_exec(f"test -f {path} && echo yes || echo no")
|
|
return result.stdout.strip() == "yes"
|
|
|
|
|
|
def run_command(args: list[str], timeout: int = SUBPROCESS_TIMEOUT) -> subprocess.CompletedProcess:
|
|
"""Execute a command locally or remotely based on REMOTE_MODE.
|
|
|
|
Args:
|
|
args: Command and arguments as list
|
|
timeout: Command timeout in seconds
|
|
|
|
Returns:
|
|
CompletedProcess with stdout/stderr
|
|
"""
|
|
if REMOTE_MODE:
|
|
# Join args into a shell command for SSH
|
|
# Quote arguments that contain spaces
|
|
quoted = []
|
|
for a in args:
|
|
if " " in a or "'" in a or '"' in a:
|
|
quoted.append(f"'{a}'")
|
|
else:
|
|
quoted.append(a)
|
|
return remote_exec(" ".join(quoted), timeout=timeout)
|
|
else:
|
|
return subprocess.run(
|
|
args,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
)
|