from __future__ import annotations
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore, VectorStoreRetriever
[docs]class VespaStore(VectorStore):
"""`Vespa` 向量存储。
要使用,您应该安装python客户端库``pyvespa``。
示例:
.. code-block:: python
from langchain_community.vectorstores import VespaStore
from langchain_community.embeddings.openai import OpenAIEmbeddings
from vespa.application import Vespa
# 创建一个依赖于您的应用程序的 Vespa 客户端,
# 例如连接到 Vespa Cloud 或本地部署
# 例如 Docker。请参考 PyVespa 文档
# 如何初始化客户端。
vespa_app = Vespa(url="...", port=..., application_package=...)
# 您需要指示 LangChain 使用哪些字段进行嵌入
vespa_config = dict(
page_content_field="text",
embedding_field="embedding",
input_field="query_embedding",
metadata_fields=["date", "rating", "author"]
)
embedding_function = OpenAIEmbeddings()
vectorstore = VespaStore(vespa_app, embedding_function, **vespa_config)"""
[docs] def __init__(
self,
app: Any,
embedding_function: Optional[Embeddings] = None,
page_content_field: Optional[str] = None,
embedding_field: Optional[str] = None,
input_field: Optional[str] = None,
metadata_fields: Optional[List[str]] = None,
) -> None:
"""
使用PyVespa客户端进行初始化。
"""
try:
from vespa.application import Vespa
except ImportError:
raise ImportError(
"Could not import Vespa python package. "
"Please install it with `pip install pyvespa`."
)
if not isinstance(app, Vespa):
raise ValueError(
f"app should be an instance of vespa.application.Vespa, got {type(app)}"
)
self._vespa_app = app
self._embedding_function = embedding_function
self._page_content_field = page_content_field
self._embedding_field = embedding_field
self._input_field = input_field
self._metadata_fields = metadata_fields
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""将文本添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
ids:与文本相关联的可选id列表。
kwargs:向量存储特定参数
返回:
将文本添加到向量存储中后的id列表。
"""
embeddings = None
if self._embedding_function is not None:
embeddings = self._embedding_function.embed_documents(list(texts))
if ids is None:
ids = [str(f"{i+1}") for i, _ in enumerate(texts)]
batch = []
for i, text in enumerate(texts):
fields: Dict[str, Union[str, List[float]]] = {}
if self._page_content_field is not None:
fields[self._page_content_field] = text
if self._embedding_field is not None and embeddings is not None:
fields[self._embedding_field] = embeddings[i]
if metadatas is not None and self._metadata_fields is not None:
for metadata_field in self._metadata_fields:
if metadata_field in metadatas[i]:
fields[metadata_field] = metadatas[i][metadata_field]
batch.append({"id": ids[i], "fields": fields})
results = self._vespa_app.feed_batch(batch)
for result in results:
if not (str(result.status_code).startswith("2")):
raise RuntimeError(
f"Could not add document to Vespa. "
f"Error code: {result.status_code}. "
f"Message: {result.json['message']}"
)
return ids
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
if ids is None:
return False
batch = [{"id": id} for id in ids]
result = self._vespa_app.delete_batch(batch)
return sum([0 if r.status_code == 200 else 1 for r in result]) == 0
def _create_query(
self, query_embedding: List[float], k: int = 4, **kwargs: Any
) -> Dict:
hits = k
doc_embedding_field = self._embedding_field
input_embedding_field = self._input_field
ranking_function = kwargs["ranking"] if "ranking" in kwargs else "default"
filter = kwargs["filter"] if "filter" in kwargs else None
approximate = kwargs["approximate"] if "approximate" in kwargs else False
approximate = "true" if approximate else "false"
yql = "select * from sources * where "
yql += f"{{targetHits: {hits}, approximate: {approximate}}}"
yql += f"nearestNeighbor({doc_embedding_field}, {input_embedding_field})"
if filter is not None:
yql += f" and {filter}"
query = {
"yql": yql,
f"input.query({input_embedding_field})": query_embedding,
"ranking": ranking_function,
"hits": hits,
}
return query
[docs] def similarity_search_by_vector_with_score(
self, query_embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""执行从嵌入向量进行相似性搜索。
参数:
query_embedding: 要搜索的嵌入向量。
k: 要返回的结果数量。
custom_query: 使用此自定义查询而不是默认查询(kwargs)。
kwargs: 其他向量存储特定参数。
返回:
将文本添加到向量存储中的ID列表。
"""
if "custom_query" in kwargs:
query = kwargs["custom_query"]
else:
query = self._create_query(query_embedding, k, **kwargs)
try:
response = self._vespa_app.query(body=query)
except Exception as e:
raise RuntimeError(
f"Could not retrieve data from Vespa: "
f"{e.args[0][0]['summary']}. "
f"Error: {e.args[0][0]['message']}"
)
if not str(response.status_code).startswith("2"):
raise RuntimeError(
f"Could not retrieve data from Vespa. "
f"Error code: {response.status_code}. "
f"Message: {response.json['message']}"
)
root = response.json["root"]
if "errors" in root:
import json
raise RuntimeError(json.dumps(root["errors"]))
if response is None or response.hits is None:
return []
docs = []
for child in response.hits:
page_content = child["fields"][self._page_content_field]
score = child["relevance"]
metadata = {"id": child["id"]}
if self._metadata_fields is not None:
for field in self._metadata_fields:
metadata[field] = child["fields"].get(field)
doc = Document(page_content=page_content, metadata=metadata)
docs.append((doc, score))
return docs
[docs] def similarity_search_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Document]:
results = self.similarity_search_by_vector_with_score(embedding, k, **kwargs)
return [r[0] for r in results]
[docs] def similarity_search_with_score(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
query_emb = []
if self._embedding_function is not None:
query_emb = self._embedding_function.embed_query(query)
return self.similarity_search_by_vector_with_score(query_emb, k, **kwargs)
[docs] def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
results = self.similarity_search_with_score(query, k, **kwargs)
return [r[0] for r in results]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
raise NotImplementedError("MMR search not implemented")
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
raise NotImplementedError("MMR search by vector not implemented")
[docs] @classmethod
def from_texts(
cls: Type[VespaStore],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> VespaStore:
vespa = cls(embedding_function=embedding, **kwargs)
vespa.add_texts(texts=texts, metadatas=metadatas, ids=ids)
return vespa
[docs] def as_retriever(self, **kwargs: Any) -> VectorStoreRetriever:
return super().as_retriever(**kwargs)