"""Load secrets from Vault (hvac) with environment variable fallback.""" from __future__ import annotations import os import hvac def _mcp_vault_env() -> tuple[str, str]: """Read Vault ADDR/TOKEN from ~/.mcp.json (MCP server config).""" mcp_path = os.path.expanduser("~/.mcp.json") if not os.path.exists(mcp_path): return "", "" try: import json data = json.loads(open(mcp_path).read()) vault_cfg = data.get("mcpServers", {}).get("vault", {}) env = vault_cfg.get("env", {}) return env.get("VAULT_ADDR", ""), env.get("VAULT_TOKEN", "") except Exception: return "", "" def _vault_client(): addr = os.environ.get("VAULT_ADDR", "") token = os.environ.get("VAULT_TOKEN", "") # fallback: ~/.vault-token if not token: token_file = os.path.expanduser("~/.vault-token") if os.path.exists(token_file): token = open(token_file).read().strip() # try env/file credentials first if addr and token: try: client = hvac.Client(url=addr, token=token) if client.is_authenticated(): return client except Exception: pass # fallback: read from .mcp.json mcp_addr, mcp_token = _mcp_vault_env() if mcp_addr and mcp_token: try: client = hvac.Client(url=mcp_addr, token=mcp_token) if client.is_authenticated(): return client except Exception: pass return None def _read_vault(path: str, field: str) -> str | None: client = _vault_client() if not client: return None try: resp = client.secrets.kv.v2.read_secret_version(path=path, raise_on_deleted_version=True) return resp["data"]["data"].get(field) except Exception: return None # --- public accessors --- NOCODB_URL = "https://nocodb.inouter.com" NOCODB_BASE_ID = "p2i2d68fk5kohnu" # Table IDs (created in Infra Registry base) TABLE_IDS = { "infra_services": "m50co33pnnq5q9g", "infra_routes": "mhts7j7wqltlf6a", "infra_cdn_zones": "mac43joyrncoowh", "infra_containers": "mm9wqmc91ufu81i", } APISIX_ADMIN_URL = "http://100.108.39.107:9180" # --- Incus REST API --- INCUS_REMOTES = { "jp1": { "url": "https://100.109.123.1:8443", "projects": ["monitoring", "db", "default"], }, "kr1": { "url": "https://100.84.111.28:8443", "projects": ["default", "inbest", "karakeep", "security"], }, } INCUS_CERT_DIR = os.environ.get("INCUS_CERT_DIR", "/etc/infra-tool") def incus_cert() -> str: return os.path.join(INCUS_CERT_DIR, "incus-client.crt") def incus_key() -> str: return os.path.join(INCUS_CERT_DIR, "incus-client.key") def incus_certs_available() -> bool: return os.path.exists(incus_cert()) and os.path.exists(incus_key()) # --- K8s API --- K8S_API_SERVER = os.environ.get("K8S_API_SERVER", "") K8S_CA_CERT = os.environ.get("K8S_CA_CERT", "") def k8s_token() -> str: token = os.environ.get("K8S_TOKEN", "") if token: return token token_path = os.environ.get("K8S_TOKEN_PATH", "/etc/infra-tool/k8s-token") try: with open(token_path) as f: return f.read().strip() except FileNotFoundError: return "" # --- secrets --- def nocodb_token() -> str: return ( _read_vault("nocodb/api-token", "token") or os.environ.get("NOCODB_TOKEN", "") ) def bunny_api_key() -> str: return ( _read_vault("bunnycdn", "api_key") or os.environ.get("BUNNY_API_KEY", "") ) def apisix_admin_key() -> str: return os.environ.get("APISIX_ADMIN_KEY", "edd1c9f034335f136f87ad84b625c8f1")