"""Collect K8s services via K8s REST API or SSH → incus exec → kubectl. Mode selection (auto-detected at call time): - K8s REST API: config.K8S_API_SERVER is set (non-empty) - SSH fallback: config.K8S_API_SERVER is empty (original behavior) """ from __future__ import annotations import json import subprocess from datetime import datetime, timezone import urllib3 import config # --------------------------------------------------------------------------- # Internal data fetchers # --------------------------------------------------------------------------- def _get_services_data() -> dict | None: """Return parsed JSON from `kubectl get svc -A -o json`, via API or SSH.""" if config.K8S_API_SERVER: return _kubectl_json_api("/api/v1/services") return _kubectl_json_ssh("kubectl get svc -A -o json") def _get_pods_data() -> dict | None: """Return parsed JSON from `kubectl get pods -A -o json`, via API or SSH.""" if config.K8S_API_SERVER: return _kubectl_json_api("/api/v1/pods") return _kubectl_json_ssh("kubectl get pods -A -o json") # --------------------------------------------------------------------------- # Transport implementations # --------------------------------------------------------------------------- def _kubectl_json_api(path: str) -> dict | None: """Call the K8s REST API and return parsed JSON. Uses Bearer token auth. TLS verification uses config.K8S_CA_CERT when provided; disables verification (with suppressed warnings) otherwise. """ try: import requests # imported lazily — only needed in API mode except ImportError: print("[k8s] 'requests' package not installed; cannot use K8s REST API mode") return None verify: str | bool = config.K8S_CA_CERT if config.K8S_CA_CERT else False if not verify: # Suppress InsecureRequestWarning for self-signed cluster certs urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) url = f"{config.K8S_API_SERVER.rstrip('/')}{path}" token = config.k8s_token() headers = {"Authorization": f"Bearer {token}"} if token else {} try: resp = requests.get(url, headers=headers, verify=verify, timeout=30) if resp.status_code != 200: print(f"[k8s] API returned {resp.status_code} for {path}") return None return resp.json() except requests.exceptions.RequestException as exc: print(f"[k8s] REST API request failed for {path}: {exc}") return None except ValueError as exc: print(f"[k8s] Failed to parse JSON from {path}: {exc}") return None def _kubectl_json_ssh( cmd: str, host: str = "incus-jp1", container: str = "k8s", ) -> dict | None: """Run kubectl via SSH → incus exec and return parsed JSON. This is the original transport, preserved for backward compatibility. """ full_cmd = f"incus exec {container} -- {cmd}" try: out = subprocess.run( ["ssh", host, full_cmd], capture_output=True, text=True, timeout=30, ) if out.returncode != 0: return None return json.loads(out.stdout) except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError): return None # --------------------------------------------------------------------------- # Public collectors # --------------------------------------------------------------------------- def collect_services() -> list[dict]: """Return list of K8s service records for NocoDB infra_services.""" data = _get_services_data() if not data: return [] now = datetime.now(timezone.utc).isoformat() results = [] for item in data.get("items", []): meta = item.get("metadata", {}) spec = item.get("spec", {}) name = meta.get("name", "") ns = meta.get("namespace", "") # skip kubernetes internal services if name == "kubernetes" or ns == "kube-system": continue cluster_ip = spec.get("clusterIP", "") ports = spec.get("ports", []) upstream = ( f"{cluster_ip}:{ports[0]['port']}" if ports and cluster_ip else cluster_ip ) results.append({ "Title": f"k8s:{ns}/{name}", "display_name": name, "domain": "", "source": "k8s", "layer": "backend", "status": "up", "upstream_ip": upstream, "upstream_host": name, "namespace": ns, "cluster": "jp1/k8s", "last_seen": now, }) return results def collect_pods_status() -> dict[str, str]: """Return {svc_name: phase} derived from pod names. Pod suffix (the last two hyphen-separated segments) is stripped to approximate the owning service name. """ data = _get_pods_data() if not data: return {} result: dict[str, str] = {} for pod in data.get("items", []): name = pod.get("metadata", {}).get("name", "") phase = pod.get("status", {}).get("phase", "Unknown") # strip pod suffix to approximate svc name base = "-".join(name.split("-")[:-2]) if name.count("-") >= 2 else name if base: result[base] = phase return result