Files
infra-tool/api.py
kappa 5e59261f63 Add infra-tool: infrastructure registry with Incus container deployment
Service registry & discovery system that aggregates infrastructure metadata
from Incus, K8s, APISIX, and BunnyCDN into NocoDB. Includes FastAPI HTTP API,
systemd timer for 15-min auto-sync, and dual-mode collectors (REST API for
container deployment, CLI/SSH fallback for local use). Deployed to jp1:infra-tool
with Tailscale socket proxy for host network visibility.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 09:13:43 +09:00

201 lines
6.1 KiB
Python

"""FastAPI HTTP API for infra-tool (internal Tailscale network, no auth)."""
from __future__ import annotations
import io
from enum import Enum
import click
import requests
from fastapi import FastAPI, Query
import config
import nocodb_client
import sync as sync_mod
import lookup as lookup_mod
app = FastAPI(title="infra-tool API", version="0.1.0")
# ---------------------------------------------------------------------------
# Table name mapping
# ---------------------------------------------------------------------------
class TableName(str, Enum):
services = "services"
routes = "routes"
containers = "containers"
zones = "zones"
_TABLE_MAP: dict[str, str] = {
"services": "infra_services",
"routes": "infra_routes",
"containers": "infra_containers",
"zones": "infra_cdn_zones",
}
# ---------------------------------------------------------------------------
# GET /lookup/{domain}
# ---------------------------------------------------------------------------
@app.get("/lookup/{domain}")
def lookup_domain(domain: str) -> dict:
"""Trace the full infrastructure path for a domain across CDN, gateway, and backend layers."""
buf = io.StringIO()
# click.echo writes to sys.stdout by default; redirect it via the `file` param
original_echo = click.echo
captured_lines: list[str] = []
def capturing_echo(message=None, file=None, nl=True, err=False, color=None):
# Always capture regardless of err flag — both stdout and stderr are useful
text = message or ""
captured_lines.append(str(text))
# Also forward to the StringIO buffer for consistency
buf.write(str(text) + ("\n" if nl else ""))
# Monkey-patch click.echo within the lookup module so all calls are captured
import lookup as _lookup_module
original_mod_echo = _lookup_module.click.echo
_lookup_module.click.echo = capturing_echo
try:
found = lookup_mod.lookup(domain)
finally:
_lookup_module.click.echo = original_mod_echo
return {
"domain": domain,
"found": found,
"trace": captured_lines,
}
# ---------------------------------------------------------------------------
# GET /list/{table}
# ---------------------------------------------------------------------------
@app.get("/list/{table}")
def list_table(
table: TableName,
where: str = Query(default="", description="NocoDB where filter"),
) -> dict:
"""List rows from an infra registry table with an optional NocoDB where filter."""
nocodb_table = _TABLE_MAP[table.value]
rows = nocodb_client.list_rows(nocodb_table, where=where)
return {
"table": table.value,
"count": len(rows),
"rows": rows,
}
# ---------------------------------------------------------------------------
# GET /status
# ---------------------------------------------------------------------------
@app.get("/status")
def status() -> dict:
"""Check connectivity to all infrastructure services."""
results: dict[str, object] = {}
# NocoDB
try:
token = config.nocodb_token()
resp = requests.get(
f"{config.NOCODB_URL}/api/v2/meta/bases/",
headers={"xc-token": token},
timeout=5,
)
results["nocodb"] = {"ok": resp.ok, "status_code": resp.status_code}
except Exception as exc:
results["nocodb"] = {"ok": False, "error": str(exc)}
# APISIX Admin API
try:
resp = requests.get(
f"{config.APISIX_ADMIN_URL}/apisix/admin/routes",
headers={"X-API-KEY": config.apisix_admin_key()},
timeout=5,
)
results["apisix"] = {"ok": resp.ok, "status_code": resp.status_code}
except Exception as exc:
results["apisix"] = {"ok": False, "error": str(exc)}
# Vault
try:
vault = config._vault_client()
results["vault"] = {"ok": vault is not None and vault.is_authenticated()}
except Exception as exc:
results["vault"] = {"ok": False, "error": str(exc)}
# BunnyCDN
try:
api_key = config.bunny_api_key()
resp = requests.get(
"https://api.bunny.net/pullzone",
headers={"AccessKey": api_key},
timeout=5,
)
results["bunnycdn"] = {"ok": resp.ok, "status_code": resp.status_code}
except Exception as exc:
results["bunnycdn"] = {"ok": False, "error": str(exc)}
# Incus certificates
results["incus_certs"] = {"ok": config.incus_certs_available()}
# K8s API
try:
k8s_server = config.K8S_API_SERVER
if k8s_server:
token = config.k8s_token()
resp = requests.get(
f"{k8s_server}/healthz",
headers={"Authorization": f"Bearer {token}"},
timeout=5,
verify=False,
)
results["k8s"] = {"ok": resp.ok, "status_code": resp.status_code}
else:
results["k8s"] = {"ok": False, "error": "K8S_API_SERVER not configured"}
except Exception as exc:
results["k8s"] = {"ok": False, "error": str(exc)}
# Tailscale (CLI via host socket proxy)
try:
import subprocess
out = subprocess.run(
["tailscale", "status", "--json"],
capture_output=True, text=True, timeout=5,
)
if out.returncode == 0:
import json as _json
peers = len(_json.loads(out.stdout).get("Peer", {}))
results["tailscale"] = {"ok": True, "peers": peers}
else:
results["tailscale"] = {"ok": False, "error": "CLI unavailable"}
except Exception as exc:
results["tailscale"] = {"ok": False, "error": str(exc)}
return results
# ---------------------------------------------------------------------------
# POST /sync
# ---------------------------------------------------------------------------
@app.post("/sync")
def sync(
source: str | None = Query(
default=None,
description=f"One of: {', '.join(sync_mod.SYNC_ORDER)}. Omit to sync all.",
),
) -> dict:
"""Run infra-tool sync and return per-source stats."""
results = sync_mod.sync_all(dry_run=False, source=source)
return {"results": results}