"""Vertex AI API client with HTTP session pooling.""" import json import requests from typing import List from config import ( PROJECT_ID, LOCATION, VERTEX_API_KEY, REQUEST_TIMEOUT ) from utils.logging import get_logger from utils.validation import sanitize_for_prompt logger = get_logger(__name__) # HTTP session for connection pooling _session = None def _get_session() -> requests.Session: """Get or create HTTP session for connection pooling.""" global _session if _session is None: _session = requests.Session() _session.headers.update({ "Content-Type": "application/json" }) return _session def get_embedding(text: str) -> List[float]: """ Get text embedding from Vertex AI. Args: text: Text to embed Returns: Embedding vector Raises: Exception: If API call fails """ url = ( f"https://{LOCATION}-aiplatform.googleapis.com/v1/" f"projects/{PROJECT_ID}/locations/{LOCATION}/" f"publishers/google/models/text-embedding-004:predict" ) try: session = _get_session() response = session.post( url, params={"key": VERTEX_API_KEY}, json={"instances": [{"content": text}]}, timeout=REQUEST_TIMEOUT ) if response.status_code != 200: logger.error(f"Vertex AI embedding error: {response.status_code} - {response.text}") raise Exception(f"Vertex AI API Error: {response.text}") result = response.json() return result["predictions"][0]["embeddings"]["values"] except requests.exceptions.Timeout: logger.error("Vertex AI embedding request timeout") raise Exception("Vertex AI request timeout") except requests.exceptions.RequestException as e: logger.error(f"Vertex AI embedding request failed: {str(e)}") raise Exception(f"Vertex AI request failed: {str(e)}") def merge_with_llm(old: str, new: str) -> str: """ Merge two texts intelligently using Vertex AI Gemini. Args: old: Existing text new: New text to merge Returns: Merged text Raises: Exception: If API call fails """ url = ( f"https://{LOCATION}-aiplatform.googleapis.com/v1/" f"projects/{PROJECT_ID}/locations/{LOCATION}/" f"publishers/google/models/gemini-2.0-flash:generateContent" ) # Sanitize inputs old_sanitized = sanitize_for_prompt(old) new_sanitized = sanitize_for_prompt(new) prompt = f"""기존 정보와 새 정보를 병합하세요. 규칙: 1. 충돌하는 내용은 새 정보를 우선합니다. 2. 중복은 제거하고 간결하게 정리합니다. 3. 보완되는 정보는 자연스럽게 통합합니다. 4. 결과만 출력하세요. 설명은 불필요합니다. [기존 정보] {old_sanitized} [새 정보] {new_sanitized} [병합 결과]""" try: session = _get_session() response = session.post( url, params={"key": VERTEX_API_KEY}, json={"contents": [{"role": "user", "parts": [{"text": prompt}]}]}, timeout=REQUEST_TIMEOUT ) if response.status_code != 200: logger.error(f"Gemini merge error: {response.status_code} - {response.text}") raise Exception(f"Gemini API Error: {response.text}") result = response.json() merged = result["candidates"][0]["content"]["parts"][0]["text"].strip() logger.info(f"Successfully merged texts (old: {len(old)} chars, new: {len(new)} chars)") return merged except requests.exceptions.Timeout: logger.error("Gemini merge request timeout") raise Exception("Gemini request timeout") except requests.exceptions.RequestException as e: logger.error(f"Gemini merge request failed: {str(e)}") raise Exception(f"Gemini request failed: {str(e)}") def analyze_relations_with_llm(new_doc: str, new_tag: str, similar_docs: list) -> list: """ Analyze relations between new document and existing documents using Gemini. Args: new_doc: New document content new_tag: New document tag similar_docs: List of similar documents with id, text, tag, score Returns: List of relations: [{"id": doc_id, "relation": relation_type}, ...] """ if not similar_docs: return [] url = ( f"https://{LOCATION}-aiplatform.googleapis.com/v1/" f"projects/{PROJECT_ID}/locations/{LOCATION}/" f"publishers/google/models/gemini-2.0-flash:generateContent" ) # Build document summary docs_summary = [] for i, doc in enumerate(similar_docs): preview = doc["text"][:300] + "..." if len(doc["text"]) > 300 else doc["text"] preview_sanitized = sanitize_for_prompt(preview) docs_summary.append( f"[{i+1}] ID: {doc['id']}, Tag: {doc['tag']}\n내용: {preview_sanitized}" ) new_doc_sanitized = sanitize_for_prompt(new_doc[:500]) prompt = f"""새 문서와 기존 문서들 간의 관계를 분석하세요. [새 문서] Tag: {new_tag} 내용: {new_doc_sanitized} [기존 문서들] {chr(10).join(docs_summary)} [관계 유형] - depends_on: 새 문서가 기존 문서에 의존 (API 사용, 라이브러리 참조 등) - part_of: 새 문서가 기존 문서의 일부 (같은 프로젝트, 하위 기능 등) - extends: 새 문서가 기존 문서를 확장 (기능 추가, 버전 업 등) - see_also: 관련 참고 문서 (비슷한 주제, 참고할 만한 내용) - updates: 새 문서가 기존 문서의 업데이트/수정 버전 - none: 관계 없음 (유사도가 높아도 실제 관계가 없는 경우) [출력 형식] JSON 배열로만 출력하세요. 설명 없이 JSON만 출력. 관계가 있는 문서만 포함하세요. 예시: [{{"id": "문서ID", "relation": "depends_on"}}, {{"id": "문서ID", "relation": "see_also"}}] 관계가 없으면 빈 배열 출력: [] [분석 결과]""" try: session = _get_session() response = session.post( url, params={"key": VERTEX_API_KEY}, json={"contents": [{"role": "user", "parts": [{"text": prompt}]}]}, timeout=REQUEST_TIMEOUT ) if response.status_code != 200: logger.warning(f"Gemini relation analysis error: {response.status_code}") return [] result_text = response.json()["candidates"][0]["content"]["parts"][0]["text"].strip() # Remove JSON code block markers if result_text.startswith("```"): lines = result_text.split("\n") result_text = "\n".join(lines[1:-1] if lines[-1] == "```" else lines[1:]) relations = json.loads(result_text) # Filter valid relations valid_relations = [] valid_types = {"depends_on", "part_of", "extends", "see_also", "updates", "related"} for rel in relations: if isinstance(rel, dict) and "id" in rel and "relation" in rel: if rel["relation"] in valid_types: valid_relations.append(rel) logger.info(f"Analyzed relations: found {len(valid_relations)} valid relations") return valid_relations except (json.JSONDecodeError, KeyError, IndexError) as e: logger.warning(f"Failed to parse relation analysis: {str(e)}") return [] except requests.exceptions.Timeout: logger.warning("Gemini relation analysis timeout") return [] except requests.exceptions.RequestException as e: logger.warning(f"Gemini relation analysis request failed: {str(e)}") return []