from __future__ import annotations
import datetime
import os
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
)
from uuid import uuid4
import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import maximal_marginal_relevance
if TYPE_CHECKING:
import weaviate
def _default_schema(index_name: str, text_key: str) -> Dict:
return {
"class": index_name,
"properties": [
{
"name": text_key,
"dataType": ["text"],
}
],
}
def _create_weaviate_client(
url: Optional[str] = None,
api_key: Optional[str] = None,
**kwargs: Any,
) -> weaviate.Client:
try:
import weaviate
except ImportError:
raise ImportError(
"Could not import weaviate python package. "
"Please install it with `pip install weaviate-client`"
)
url = url or os.environ.get("WEAVIATE_URL")
api_key = api_key or os.environ.get("WEAVIATE_API_KEY")
auth = weaviate.auth.AuthApiKey(api_key=api_key) if api_key else None
return weaviate.Client(url=url, auth_client_secret=auth, **kwargs)
def _default_score_normalizer(val: float) -> float:
return 1 - 1 / (1 + np.exp(val))
def _json_serializable(value: Any) -> Any:
if isinstance(value, datetime.datetime):
return value.isoformat()
return value
[docs]class Weaviate(VectorStore):
"""`Weaviate`向量存储。
要使用,应安装``weaviate-client`` python包。
示例:
.. code-block:: python
import weaviate
from langchain_community.vectorstores import Weaviate
client = weaviate.Client(url=os.environ["WEAVIATE_URL"], ...)
weaviate = Weaviate(client, index_name, text_key)"""
[docs] def __init__(
self,
client: Any,
index_name: str,
text_key: str,
embedding: Optional[Embeddings] = None,
attributes: Optional[List[str]] = None,
relevance_score_fn: Optional[
Callable[[float], float]
] = _default_score_normalizer,
by_text: bool = True,
):
"""使用Weaviate客户端进行初始化。"""
try:
import weaviate
except ImportError:
raise ImportError(
"Could not import weaviate python package. "
"Please install it with `pip install weaviate-client`."
)
if not isinstance(client, weaviate.Client):
raise ValueError(
f"client should be an instance of weaviate.Client, got {type(client)}"
)
self._client = client
self._index_name = index_name
self._embedding = embedding
self._text_key = text_key
self._query_attrs = [self._text_key]
self.relevance_score_fn = relevance_score_fn
self._by_text = by_text
if attributes is not None:
self._query_attrs.extend(attributes)
@property
def embeddings(self) -> Optional[Embeddings]:
return self._embedding
def _select_relevance_score_fn(self) -> Callable[[float], float]:
return (
self.relevance_score_fn
if self.relevance_score_fn
else _default_score_normalizer
)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""将带有元数据(属性)的文本上传到Weaviate。"""
from weaviate.util import get_valid_uuid
ids = []
embeddings: Optional[List[List[float]]] = None
if self._embedding:
if not isinstance(texts, list):
texts = list(texts)
embeddings = self._embedding.embed_documents(texts)
with self._client.batch as batch:
for i, text in enumerate(texts):
data_properties = {self._text_key: text}
if metadatas is not None:
for key, val in metadatas[i].items():
data_properties[key] = _json_serializable(val)
# Allow for ids (consistent w/ other methods)
# # Or uuids (backwards compatible w/ existing arg)
# If the UUID of one of the objects already exists
# then the existing object will be replaced by the new object.
_id = get_valid_uuid(uuid4())
if "uuids" in kwargs:
_id = kwargs["uuids"][i]
elif "ids" in kwargs:
_id = kwargs["ids"][i]
batch.add_data_object(
data_object=data_properties,
class_name=self._index_name,
uuid=_id,
vector=embeddings[i] if embeddings else None,
tenant=kwargs.get("tenant"),
)
ids.append(_id)
return ids
[docs] def similarity_search(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query:要查找与之相似的文档的文本。
k:要返回的文档数量。默认为4。
返回:
与查询最相似的文档列表。
"""
if self._by_text:
return self.similarity_search_by_text(query, k, **kwargs)
else:
if self._embedding is None:
raise ValueError(
"_embedding cannot be None for similarity_search when "
"_by_text=False"
)
embedding = self._embedding.embed_query(query)
return self.similarity_search_by_vector(embedding, k, **kwargs)
[docs] def similarity_search_by_text(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query:要查找与之相似的文档的文本。
k:要返回的文档数量。默认为4。
返回:
与查询最相似的文档列表。
"""
content: Dict[str, Any] = {"concepts": [query]}
if kwargs.get("search_distance"):
content["certainty"] = kwargs.get("search_distance")
query_obj = self._client.query.get(self._index_name, self._query_attrs)
if kwargs.get("where_filter"):
query_obj = query_obj.with_where(kwargs.get("where_filter"))
if kwargs.get("tenant"):
query_obj = query_obj.with_tenant(kwargs.get("tenant"))
if kwargs.get("additional"):
query_obj = query_obj.with_additional(kwargs.get("additional"))
result = query_obj.with_near_text(content).with_limit(k).do()
if "errors" in result:
raise ValueError(f"Error during query: {result['errors']}")
docs = []
for res in result["data"]["Get"][self._index_name]:
text = res.pop(self._text_key)
docs.append(Document(page_content=text, metadata=res))
return docs
[docs] def similarity_search_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Document]:
"""在Weaviate中通过嵌入向量查找相似的文档。"""
vector = {"vector": embedding}
query_obj = self._client.query.get(self._index_name, self._query_attrs)
if kwargs.get("where_filter"):
query_obj = query_obj.with_where(kwargs.get("where_filter"))
if kwargs.get("tenant"):
query_obj = query_obj.with_tenant(kwargs.get("tenant"))
if kwargs.get("additional"):
query_obj = query_obj.with_additional(kwargs.get("additional"))
result = query_obj.with_near_vector(vector).with_limit(k).do()
if "errors" in result:
raise ValueError(f"Error during query: {result['errors']}")
docs = []
for res in result["data"]["Get"][self._index_name]:
text = res.pop(self._text_key)
docs.append(Document(page_content=text, metadata=res))
return docs
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
fetch_k:要获取的文档数量以传递给MMR算法。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
if self._embedding is not None:
embedding = self._embedding.embed_query(query)
else:
raise ValueError(
"max_marginal_relevance_search requires a suitable Embeddings object"
)
return self.max_marginal_relevance_search_by_vector(
embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, **kwargs
)
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding:要查找相似文档的嵌入。
k:要返回的文档数量。默认为4。
fetch_k:要获取的文档数量以传递给MMR算法。
lambda_mult:0到1之间的数字,确定结果之间多样性的程度,0对应最大多样性,1对应最小多样性。默认为0.5。
返回:
由最大边际相关性选择的文档列表。
"""
vector = {"vector": embedding}
query_obj = self._client.query.get(self._index_name, self._query_attrs)
if kwargs.get("where_filter"):
query_obj = query_obj.with_where(kwargs.get("where_filter"))
if kwargs.get("tenant"):
query_obj = query_obj.with_tenant(kwargs.get("tenant"))
results = (
query_obj.with_additional("vector")
.with_near_vector(vector)
.with_limit(fetch_k)
.do()
)
payload = results["data"]["Get"][self._index_name]
embeddings = [result["_additional"]["vector"] for result in payload]
mmr_selected = maximal_marginal_relevance(
np.array(embedding), embeddings, k=k, lambda_mult=lambda_mult
)
docs = []
for idx in mmr_selected:
text = payload[idx].pop(self._text_key)
payload[idx].pop("_additional")
meta = payload[idx]
docs.append(Document(page_content=text, metadata=meta))
return docs
[docs] def similarity_search_with_score(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
"""返回与查询文本最相似的文档列表,以及每个文档的余弦距离(浮点数)。得分越低表示相似度越高。
"""
if self._embedding is None:
raise ValueError(
"_embedding cannot be None for similarity_search_with_score"
)
content: Dict[str, Any] = {"concepts": [query]}
if kwargs.get("search_distance"):
content["certainty"] = kwargs.get("search_distance")
query_obj = self._client.query.get(self._index_name, self._query_attrs)
if kwargs.get("where_filter"):
query_obj = query_obj.with_where(kwargs.get("where_filter"))
if kwargs.get("tenant"):
query_obj = query_obj.with_tenant(kwargs.get("tenant"))
embedded_query = self._embedding.embed_query(query)
if not self._by_text:
vector = {"vector": embedded_query}
result = (
query_obj.with_near_vector(vector)
.with_limit(k)
.with_additional("vector")
.do()
)
else:
result = (
query_obj.with_near_text(content)
.with_limit(k)
.with_additional("vector")
.do()
)
if "errors" in result:
raise ValueError(f"Error during query: {result['errors']}")
docs_and_scores = []
for res in result["data"]["Get"][self._index_name]:
text = res.pop(self._text_key)
score = np.dot(res["_additional"]["vector"], embedded_query)
docs_and_scores.append((Document(page_content=text, metadata=res), score))
return docs_and_scores
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
*,
client: Optional[weaviate.Client] = None,
weaviate_url: Optional[str] = None,
weaviate_api_key: Optional[str] = None,
batch_size: Optional[int] = None,
index_name: Optional[str] = None,
text_key: str = "text",
by_text: bool = False,
relevance_score_fn: Optional[
Callable[[float], float]
] = _default_score_normalizer,
**kwargs: Any,
) -> Weaviate:
"""从原始文档构建Weaviate包装器。
这是一个用户友好的接口,可以:
1. 嵌入文档。
2. 在Weaviate实例中为嵌入创建一个新索引。
3. 将文档添加到新创建的Weaviate索引中。
这旨在是一个快速入门的方式。
参数:
texts: 要添加到向量存储中的文本。
embedding: 要使用的文本嵌入模型。
metadatas: 每个文本相关的元数据。
client: 要使用的weaviate.Client。
weaviate_url: Weaviate的URL。如果使用Weaviate云服务,请从“详细信息”选项卡中获取。可以作为命名参数传递,也可以通过设置环境变量“WEAVIATE_URL”传递。如果提供了client,则不应指定。
weaviate_api_key: Weaviate API密钥。如果启用并使用Weaviate云服务,请从“详细信息”选项卡中获取。可以作为命名参数传递,也可以通过设置环境变量“WEAVIATE_API_KEY”传递。如果提供了client,则不应指定。
batch_size: 批量操作的大小。
index_name: 索引名称。
text_key: 用于上传/检索文本到/从向量存储中的键。
by_text: 是否按文本或按嵌入进行搜索。
relevance_score_fn: 用于将向量存储使用的任何距离函数转换为相关性分数的函数,这是一个标准化的相似度分数(0表示不相似,1表示相似)。
**kwargs: 要传递给“Weaviate.__init__()”的其他命名参数。
示例:
.. code-block:: python
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Weaviate
embeddings = OpenAIEmbeddings()
weaviate = Weaviate.from_texts(
texts,
embeddings,
weaviate_url="http://localhost:8080"
)
"""
try:
from weaviate.util import get_valid_uuid
except ImportError as e:
raise ImportError(
"Could not import weaviate python package. "
"Please install it with `pip install weaviate-client`"
) from e
client = client or _create_weaviate_client(
url=weaviate_url,
api_key=weaviate_api_key,
)
if batch_size:
client.batch.configure(batch_size=batch_size)
index_name = index_name or f"LangChain_{uuid4().hex}"
schema = _default_schema(index_name, text_key)
# check whether the index already exists
if not client.schema.exists(index_name):
client.schema.create_class(schema)
embeddings = embedding.embed_documents(texts) if embedding else None
attributes = list(metadatas[0].keys()) if metadatas else None
# If the UUID of one of the objects already exists
# then the existing object will be replaced by the new object.
if "uuids" in kwargs:
uuids = kwargs.pop("uuids")
else:
uuids = [get_valid_uuid(uuid4()) for _ in range(len(texts))]
with client.batch as batch:
for i, text in enumerate(texts):
data_properties = {
text_key: text,
}
if metadatas is not None:
for key in metadatas[i].keys():
data_properties[key] = metadatas[i][key]
_id = uuids[i]
# if an embedding strategy is not provided, we let
# weaviate create the embedding. Note that this will only
# work if weaviate has been installed with a vectorizer module
# like text2vec-contextionary for example
params = {
"uuid": _id,
"data_object": data_properties,
"class_name": index_name,
}
if embeddings is not None:
params["vector"] = embeddings[i]
batch.add_data_object(**params)
batch.flush()
return cls(
client,
index_name,
text_key,
embedding=embedding,
attributes=attributes,
relevance_score_fn=relevance_score_fn,
by_text=by_text,
**kwargs,
)
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> None:
"""根据向量ID删除。
参数:
ids:要删除的ID列表。
"""
if ids is None:
raise ValueError("No ids provided to delete.")
# TODO: Check if this can be done in bulk
for id in ids:
self._client.data_object.delete(uuid=id)