"""Domain path tracing: CDN → Gateway → Backend.""" from __future__ import annotations import json import subprocess import click import nocodb_client def _tailscale_map() -> dict[str, str]: """Return {ip: hostname} from tailscale status.""" try: out = subprocess.run( ["tailscale", "status", "--json"], capture_output=True, text=True, timeout=5, ) if out.returncode != 0: return {} data = json.loads(out.stdout) result = {} for peer in data.get("Peer", {}).values(): name = _ts_display_name(peer) for addr in peer.get("TailscaleIPs", []): result[addr] = name self_node = data.get("Self", {}) name = _ts_display_name(self_node) for addr in self_node.get("TailscaleIPs", []): result[addr] = name return result except Exception: return {} def _ts_display_name(node: dict) -> str: """Pick the best display name: DNSName (sans tailnet suffix) > HostName.""" dns = node.get("DNSName", "") if dns: return dns.split(".")[0] return node.get("HostName", "") def _is_tailscale_ip(ip: str) -> bool: return ip.startswith("100.") def lookup(domain: str) -> bool: """Print the full infra path for a domain. Returns True if anything found.""" found = False # 1) CDN layer — check infra_cdn_zones hostnames cdn_zones = nocodb_client.list_rows("infra_cdn_zones") for zone in cdn_zones: hostnames = [] raw = zone.get("hostnames", "") if raw: try: hostnames = json.loads(raw) except json.JSONDecodeError: hostnames = [raw] if domain in hostnames: click.echo( f' [CDN] BunnyCDN zone "{zone.get("Title", "?")}" ' f'→ origin {zone.get("origin_url", "?")}' ) found = True # 2) Gateway layer — check infra_routes host routes = nocodb_client.list_rows("infra_routes") for route in routes: hosts = [h.strip() for h in route.get("host", "").split(",")] if domain in hosts: nodes = route.get("upstream_nodes", "") plugins = route.get("plugins", "") click.echo( f' [GATEWAY] APISIX route "{route.get("Title", "?")}" → {nodes}' ) if plugins: click.echo(f" plugins: {plugins}") found = True # try to resolve upstream IP to a K8s/Incus backend _resolve_backend(nodes) # 3) Direct service lookup services = nocodb_client.list_rows( "infra_services", where=f"(domain,eq,{domain})" ) for svc in services: if svc.get("source") in ("apisix", "bunnycdn"): continue # already shown above _print_service(svc) found = True if not found: click.echo(f" Not found: {domain}") return found def _resolve_backend(upstream_nodes_json: str): """Try to match upstream IP to a known K8s service, Incus container, or Tailscale host.""" if not upstream_nodes_json: return try: nodes = json.loads(upstream_nodes_json) except json.JSONDecodeError: return for addr in nodes: ip = addr.split(":")[0] if ":" in addr else addr port = addr.split(":")[1] if ":" in addr else "" # check K8s services k8s_services = nocodb_client.list_rows( "infra_services", where="(source,eq,k8s)" ) for svc in k8s_services: if svc.get("upstream_ip", "").startswith(ip): _print_service(svc) return # check Incus containers containers = nocodb_client.list_rows( "infra_containers", where=f"(ipv4,eq,{ip})" ) for c in containers: click.echo( f' [BACKEND] Incus container "{c.get("Title", "?")}" ' f'({c.get("ipv4", "?")})' ) click.echo(f' status: {c.get("status", "?")}') return # fallback: resolve Tailscale IP → hostname if _is_tailscale_ip(ip): ts_map = _tailscale_map() hostname = ts_map.get(ip) if hostname: dest = f"{hostname} ({ip}:{port})" if port else f"{hostname} ({ip})" click.echo(f" [BACKEND] Tailscale host: {dest}") return def _print_service(svc: dict): layer = svc.get("layer", "?").upper() source = svc.get("source", "?") click.echo( f' [{layer}] {source} "{svc.get("display_name", svc.get("Title", "?"))}" ' f'({svc.get("namespace", "")} ns, {svc.get("cluster", "")})' ) click.echo(f' upstream: {svc.get("upstream_ip", "?")}') click.echo(f' status: {svc.get("status", "?")}')