WangDL fbdae9078f
Some checks failed
Deploy API Server / build-and-deploy (push) Failing after 22s
feat: Python RAG Worker + NestJS 内部 API(文档解析/切片/embedding/Qdrant/候选生成)
2026-05-19 22:35:12 +08:00

61 lines
1.9 KiB
Python

"""Qdrant 索引服务"""
import httpx
from config import QDRANT_URL, QDRANT_COLLECTION
async def upsert_points(points: list[dict]):
"""批量写入 Qdrant points"""
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.put(
f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points",
params={"wait": "true"},
json={"points": points},
)
if resp.status_code != 200:
raise RuntimeError(f"Qdrant upsert failed: {resp.text}")
async def search(
vector: list[float],
user_id: str,
knowledge_base_id: str,
top_k: int = 5,
) -> list[dict]:
"""语义检索"""
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points/search",
json={
"vector": vector,
"filter": {
"must": [
{"key": "userId", "match": {"value": user_id}},
{"key": "knowledgeBaseId", "match": {"value": knowledge_base_id}},
{"key": "deleted", "match": {"value": False}},
],
},
"limit": top_k,
"with_payload": True,
},
)
if resp.status_code != 200:
raise RuntimeError(f"Qdrant search failed: {resp.text}")
return resp.json()["result"]
async def mark_deleted(source_id: str):
"""将指定 source 的所有 points 标记为 deleted=true"""
async with httpx.AsyncClient(timeout=30) as client:
await client.post(
f"{QDRANT_URL}/collections/{QDRANT_COLLECTION}/points/update",
json={
"filter": {
"must": [
{"key": "sourceId", "match": {"value": source_id}},
]
},
"set": {"deleted": True},
},
)