"""NocoDB v2 REST helper — list / upsert / mark-unknown.""" from __future__ import annotations import requests import config def _headers() -> dict: return { "xc-token": config.nocodb_token(), "Content-Type": "application/json", } def _api(table: str) -> str: tid = config.TABLE_IDS[table] return f"{config.NOCODB_URL}/api/v2/tables/{tid}/records" # ── read ──────────────────────────────────────────────────────────── def list_rows(table: str, *, where: str = "", limit: int = 1000) -> list[dict]: """Fetch all rows (paginated).""" rows: list[dict] = [] offset = 0 while True: params: dict = {"limit": limit, "offset": offset} if where: params["where"] = where resp = requests.get(_api(table), headers=_headers(), params=params) resp.raise_for_status() data = resp.json() batch = data.get("list", []) rows.extend(batch) pi = data.get("pageInfo", {}) if pi.get("isLastPage", True) or not batch: break offset += limit return rows def find_by_title(table: str, title: str) -> dict | None: # URL-encode special chars in title for NocoDB where clause safe_title = title.replace(",", "\\,") rows = list_rows(table, where=f"(Title,eq,{safe_title})", limit=1) return rows[0] if rows else None # ── write ─────────────────────────────────────────────────────────── def create_row(table: str, data: dict) -> dict: resp = requests.post(_api(table), headers=_headers(), json=data) resp.raise_for_status() return resp.json() def update_row(table: str, row_id: int, data: dict) -> dict: payload = {"Id": row_id, **data} resp = requests.patch(_api(table), headers=_headers(), json=payload) resp.raise_for_status() return resp.json() def upsert(table: str, title: str, data: dict) -> str: """Insert or update a row keyed by Title. Returns 'created' or 'updated'.""" existing = find_by_title(table, title) if existing: update_row(table, existing["Id"], data) return "updated" else: create_row(table, {**data, "Title": title}) return "created" def mark_unseen(table: str, seen_titles: set[str], *, source_filter: str = ""): """Set status=unknown for rows not in seen_titles (scoped by source).""" where = f"(source,eq,{source_filter})" if source_filter else "" all_rows = list_rows(table, where=where) for row in all_rows: if row.get("Title") not in seen_titles and row.get("status") != "unknown": update_row(table, row["Id"], {"status": "unknown"})