"""
路径向量存储客户端。
Pathway Vector Server是使用Pathway框架编写的流水线,它索引给定文件夹中的所有文件,嵌入它们,并构建一个向量索引。该流水线会对源文件的更改做出反应,自动更新适当的索引条目。
PathwayVectorClient实现了LangChain VectorStore接口,并查询PathwayVectorServer以检索最新的文档。
您可以使用该客户端与Pathway Vector Store的托管实例一起使用,或者按照https://pathway.com/developers/user-guide/llm-xpack/vectorstore_pipeline/中描述的方式运行您自己的实例。
"""
import json
import logging
from typing import Any, Callable, Iterable, List, Optional, Tuple
import requests
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
# Copied from https://github.com/pathwaycom/pathway/blob/main/python/pathway/xpacks/llm/vector_store.py
# to remove dependency on Pathway library.
class _VectorStoreClient:
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
url: Optional[str] = None,
):
"""一个可以用来查询 :py:class:`VectorStoreServer` 的客户端。
请提供 `url`,或者 `host` 和 `port`。
参数:
- host: `:py:class:`VectorStoreServer` 监听的主机
- port: `:py:class:`VectorStoreServer` 监听的端口
- url: `:py:class:`VectorStoreServer` 监听的url
"""
err = "Either (`host` and `port`) or `url` must be provided, but not both."
if url is not None:
if host or port:
raise ValueError(err)
self.url = url
else:
if host is None:
raise ValueError(err)
port = port or 80
self.url = f"http://{host}:{port}"
def query(
self, query: str, k: int = 3, metadata_filter: Optional[str] = None
) -> List[dict]:
"""执行查询到向量存储并获取结果。
参数:
- 查询:
- k:要返回的文档数量
- metadata_filter:可选字符串,表示元数据过滤查询,采用JMESPath格式。只有满足此过滤条件的文档才会被搜索。
"""
data = {"query": query, "k": k}
if metadata_filter is not None:
data["metadata_filter"] = metadata_filter
url = self.url + "/v1/retrieve"
response = requests.post(
url,
data=json.dumps(data),
headers={"Content-Type": "application/json"},
timeout=3,
)
responses = response.json()
return sorted(responses, key=lambda x: x["dist"])
# Make an alias
__call__ = query
def get_vectorstore_statistics(self) -> dict:
"""获取有关向量存储的基本统计信息。"""
url = self.url + "/v1/statistics"
response = requests.post(
url,
json={},
headers={"Content-Type": "application/json"},
)
responses = response.json()
return responses
def get_input_files(
self,
metadata_filter: Optional[str] = None,
filepath_globpattern: Optional[str] = None,
) -> list:
"""获取向量存储中文档的信息。
参数:
metadata_filter: 可选字符串,表示JMESPath格式的元数据过滤查询。搜索将仅针对满足此过滤条件的文档进行。
filepath_globpattern: 可选的glob模式,指定将为此查询搜索哪些文档。
"""
url = self.url + "/v1/inputs"
response = requests.post(
url,
json={
"metadata_filter": metadata_filter,
"filepath_globpattern": filepath_globpattern,
},
headers={"Content-Type": "application/json"},
)
responses = response.json()
return responses
[docs]class PathwayVectorClient(VectorStore):
"""连接到 Pathway Vector 存储的 VectorStore。"""
[docs] def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
url: Optional[str] = None,
) -> None:
"""一个用于查询 Pathway Vector Store 的客户端。
请提供 `url`,或 `host` 和 `port`。
参数:
- host: Pathway Vector Store 监听的主机
- port: Pathway Vector Store 监听的端口
- url: Pathway Vector Store 监听的url
"""
self.client = _VectorStoreClient(host, port, url)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""路径对于这种方法不合适。"""
raise NotImplementedError(
"Pathway vector store does not support adding or removing texts"
" from client."
)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> "PathwayVectorClient":
raise NotImplementedError(
"Pathway vector store does not support initializing from_texts."
)
[docs] def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
metadata_filter = kwargs.pop("metadata_filter", None)
if kwargs:
logging.warning(
"Unknown kwargs passed to PathwayVectorClient.similarity_search: %s",
kwargs,
)
rets = self.client(query=query, k=k, metadata_filter=metadata_filter)
return [
Document(page_content=ret["text"], metadata=ret["metadata"]) for ret in rets
]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
metadata_filter: Optional[str] = None,
) -> List[Tuple[Document, float]]:
"""运行带有距离的Pathway相似性搜索。
参数:
- query(str):要搜索的查询文本。
- k(int):要返回的结果数量。默认为4。
- metadata_filter(可选[str]):按元数据过滤。
过滤查询应采用JMESPath格式。默认为None。
返回:
List[Tuple[Document, float]]:与查询文本最相似的文档列表,每个文档对应的余弦距离为浮点数。
较低的分数表示更相似。
"""
rets = self.client(query=query, k=k, metadata_filter=metadata_filter)
return [
(Document(page_content=ret["text"], metadata=ret["metadata"]), ret["dist"])
for ret in rets
]
def _select_relevance_score_fn(self) -> Callable[[float], float]:
return self._cosine_relevance_score_fn
[docs] def get_vectorstore_statistics(self) -> dict:
"""获取有关向量存储的基本统计信息。"""
return self.client.get_vectorstore_statistics()