Files
rag-mcp/tools/core.py
kappa 2858e0a344 Initial commit: RAG MCP Server with relationship graph
Features:
- Vector search with Pinecone + Vertex AI embeddings
- Document relationships (link, unlink, related, graph)
- Auto-link with LLM analysis
- Intelligent merge with Gemini

Modular structure:
- clients/: Pinecone, Vertex AI
- tools/: core, relations, stats
- utils/: validation, logging

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 11:05:45 +09:00

305 lines
9.7 KiB
Python

"""Core RAG tools: save, retrieve, update, delete."""
import uuid
from typing import Optional
from clients import get_index, get_embedding, merge_with_llm, analyze_relations_with_llm
from utils.logging import get_logger
from utils.validation import validate_content, validate_tag, validate_document_id
from config import AUTO_LINK_THRESHOLD, AUTO_LINK_TOP_K
from .relations import (
parse_relation,
format_relation,
_add_reverse_relation,
_remove_reverse_relation
)
import time
import threading
logger = get_logger(__name__)
# Rate limiting (in-memory counter)
_rate_limiter = {
"requests": [],
"lock": threading.Lock()
}
MAX_REQUESTS_PER_MINUTE = 60
def rate_limit_check() -> bool:
"""
Check if rate limit is exceeded.
Returns:
True if allowed, False if rate limit exceeded
"""
with _rate_limiter["lock"]:
now = time.time()
# Remove requests older than 1 minute
_rate_limiter["requests"] = [
req_time for req_time in _rate_limiter["requests"]
if now - req_time < 60
]
if len(_rate_limiter["requests"]) >= MAX_REQUESTS_PER_MINUTE:
return False
_rate_limiter["requests"].append(now)
return True
def rag_save(content: str, tag: Optional[str] = "general", relations: Optional[str] = None, auto_link: bool = False) -> str:
"""
Save important information to vector database.
Args:
content: Text content to save
tag: Tag for filtering (default: general)
relations: Comma-separated relations (e.g., "id1:depends_on,id2:see_also")
auto_link: Auto-create relations using LLM analysis
Returns:
Success message with document ID or error message
"""
if not rate_limit_check():
logger.warning("Rate limit exceeded for rag_save")
return "Error: Rate limit exceeded. Please wait before retrying."
# Validate inputs
is_valid, error_msg = validate_content(content)
if not is_valid:
logger.warning(f"Content validation failed: {error_msg}")
return f"Error: {error_msg}"
is_valid, error_msg = validate_tag(tag)
if not is_valid:
logger.warning(f"Tag validation failed: {error_msg}")
return f"Error: {error_msg}"
try:
index = get_index()
vector = get_embedding(content)
doc_id = str(uuid.uuid4())
# Parse manual relations
rel_list = []
if relations:
rel_list = [r.strip() for r in relations.split(',') if r.strip()]
# Auto-create relations
auto_relations = []
if auto_link:
logger.info(f"Auto-linking document with threshold={AUTO_LINK_THRESHOLD}, top_k={AUTO_LINK_TOP_K}")
# Search similar documents
similar_results = index.query(
vector=vector,
top_k=AUTO_LINK_TOP_K,
include_metadata=True
)
# Filter by threshold
similar_docs = []
for match in similar_results.get("matches", []):
if match.get("score", 0) >= AUTO_LINK_THRESHOLD:
similar_docs.append({
"id": match["id"],
"text": match["metadata"].get("text", ""),
"tag": match["metadata"].get("tag", ""),
"score": match["score"]
})
# LLM relation analysis
if similar_docs:
analyzed = analyze_relations_with_llm(content, tag, similar_docs)
for rel in analyzed:
rel_str = format_relation(rel["id"], rel["relation"])
if rel_str not in rel_list:
rel_list.append(rel_str)
auto_relations.append(rel_str)
metadata = {
"text": content,
"tag": tag,
"relations": rel_list
}
index.upsert(vectors=[{
"id": doc_id,
"values": vector,
"metadata": metadata
}])
# Create bidirectional relations
for rel_str in rel_list:
target_id, rel_type = parse_relation(rel_str)
_add_reverse_relation(target_id, doc_id, rel_type)
# Format result
result = f"Saved with ID: {doc_id}"
if auto_relations:
result += f"\n\nAuto-linked ({len(auto_relations)}):"
for rel in auto_relations:
target_id, rel_type = parse_relation(rel)
result += f"\n --[{rel_type}]--> {target_id[:8]}..."
if relations:
result += f"\nManual relations: {relations}"
logger.info(f"Document saved: {doc_id}, tag={tag}, relations={len(rel_list)}")
return result
except Exception as e:
logger.error(f"rag_save failed: {str(e)}", exc_info=True)
return f"Error: {str(e)}"
def rag_retrieve(query: str, top_k: int = 3, tag: Optional[str] = None) -> str:
"""
Retrieve relevant information from vector database.
Args:
query: Search query
top_k: Number of results to return (default: 3)
tag: Filter by specific tag (default: None, search all)
Returns:
Formatted search results or error message
"""
if not rate_limit_check():
logger.warning("Rate limit exceeded for rag_retrieve")
return "Error: Rate limit exceeded. Please wait before retrying."
# Validate query
is_valid, error_msg = validate_content(query)
if not is_valid:
logger.warning(f"Query validation failed: {error_msg}")
return f"Error: {error_msg}"
try:
index = get_index()
query_vector = get_embedding(query)
filter_dict = {"tag": {"$eq": tag}} if tag else None
results = index.query(
vector=query_vector,
top_k=top_k,
include_metadata=True,
filter=filter_dict
)
if not results["matches"]:
logger.info("No matching documents found")
return "관련된 정보를 찾지 못했습니다."
formatted = []
for i, res in enumerate(results["matches"], 1):
if "metadata" in res:
text = res["metadata"]["text"]
tag_val = res["metadata"].get("tag", "")
relations = res["metadata"].get("relations", [])
doc_id = res["id"]
score = res.get("score", 0)
entry = f"[{i}] ID: {doc_id} (score: {score:.3f}, tag: {tag_val})"
if relations:
entry += f"\n Relations: {relations}"
entry += f"\n {text}"
formatted.append(entry)
logger.info(f"Retrieved {len(formatted)} documents for query")
return "검색 결과:\n" + "\n---\n".join(formatted)
except Exception as e:
logger.error(f"rag_retrieve failed: {str(e)}", exc_info=True)
return f"Error: {str(e)}"
def rag_update(id: str, new_info: str) -> str:
"""
Intelligently merge existing and new information.
Args:
id: Document ID to update
new_info: New information to add or merge
Returns:
Update summary or error message
"""
if not rate_limit_check():
logger.warning("Rate limit exceeded for rag_update")
return "Error: Rate limit exceeded. Please wait before retrying."
# Validate inputs
is_valid, error_msg = validate_document_id(id)
if not is_valid:
logger.warning(f"Document ID validation failed: {error_msg}")
return f"Error: {error_msg}"
is_valid, error_msg = validate_content(new_info)
if not is_valid:
logger.warning(f"New info validation failed: {error_msg}")
return f"Error: {error_msg}"
try:
index = get_index()
result = index.fetch(ids=[id])
if id not in result["vectors"]:
logger.warning(f"Document not found: {id}")
return f"Error: Not found: {id}"
old_metadata = result["vectors"][id]["metadata"]
old_text = old_metadata.get("text", "")
tag = old_metadata.get("tag", "general")
relations = old_metadata.get("relations", [])
merged = merge_with_llm(old_text, new_info)
vector = get_embedding(merged)
index.upsert(vectors=[{
"id": id,
"values": vector,
"metadata": {"text": merged, "tag": tag, "relations": relations}
}])
logger.info(f"Document updated: {id}")
return f"Updated: {id}\n\n[기존]\n{old_text}\n\n[새 정보]\n{new_info}\n\n[병합 결과]\n{merged}"
except Exception as e:
logger.error(f"rag_update failed: {str(e)}", exc_info=True)
return f"Error: {str(e)}"
def rag_delete(id: str) -> str:
"""
Delete document by ID.
Args:
id: Document ID to delete
Returns:
Success message or error message
"""
if not rate_limit_check():
logger.warning("Rate limit exceeded for rag_delete")
return "Error: Rate limit exceeded. Please wait before retrying."
# Validate input
is_valid, error_msg = validate_document_id(id)
if not is_valid:
logger.warning(f"Document ID validation failed: {error_msg}")
return f"Error: {error_msg}"
try:
index = get_index()
# Remove reverse relations
result = index.fetch(ids=[id])
if id in result["vectors"]:
relations = result["vectors"][id]["metadata"].get("relations", [])
for rel_str in relations:
target_id, _ = parse_relation(rel_str)
_remove_reverse_relation(target_id, id)
index.delete(ids=[id])
logger.info(f"Document deleted: {id}")
return f"Deleted: {id}"
except Exception as e:
logger.error(f"rag_delete failed: {str(e)}", exc_info=True)
return f"Error: {str(e)}"