from __future__ import annotations
import logging
import uuid
from typing import Any, Iterable, List, Optional, Tuple
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import DistanceStrategy
logger = logging.getLogger(__name__)
[docs]class KDBAI(VectorStore):
"""`KDB.AI` 向量存储。
请查看 [https://kdb.ai](https://kdb.ai)
要使用,您应该安装 `kdbai_client` python 包。
参数:
table: 用作存储的 kdbai_client.Table 对象,
embedding: 任何实现 `langchain.embeddings.base.Embeddings` 接口的嵌入函数,
distance_strategy: DistanceStrategy.EUCLIDEAN_DISTANCE、DistanceStrategy.DOT_PRODUCT 或 DistanceStrategy.COSINE 中的一个选项。
查看示例 [notebook](https://github.com/KxSystems/langchain/blob/KDB.AI/docs/docs/integrations/vectorstores/kdbai.ipynb)。"""
[docs] def __init__(
self,
table: Any,
embedding: Embeddings,
distance_strategy: Optional[
DistanceStrategy
] = DistanceStrategy.EUCLIDEAN_DISTANCE,
):
try:
import kdbai_client # noqa
except ImportError:
raise ImportError(
"Could not import kdbai_client python package. "
"Please install it with `pip install kdbai_client`."
)
self._table = table
self._embedding = embedding
self.distance_strategy = distance_strategy
@property
def embeddings(self) -> Optional[Embeddings]:
if isinstance(self._embedding, Embeddings):
return self._embedding
return None
def _embed_documents(self, texts: Iterable[str]) -> List[List[float]]:
if isinstance(self._embedding, Embeddings):
return self._embedding.embed_documents(list(texts))
return [self._embedding(t) for t in texts]
def _embed_query(self, text: str) -> List[float]:
if isinstance(self._embedding, Embeddings):
return self._embedding.embed_query(text)
return self._embedding(text)
def _insert(
self,
texts: List[str],
ids: Optional[List[str]],
metadata: Optional[Any] = None,
) -> None:
try:
import numpy as np
except ImportError:
raise ImportError(
"Could not import numpy python package. "
"Please install it with `pip install numpy`."
)
try:
import pandas as pd
except ImportError:
raise ImportError(
"Could not import pandas python package. "
"Please install it with `pip install pandas`."
)
embeds = self._embedding.embed_documents(texts)
df = pd.DataFrame()
df["id"] = ids
df["text"] = [t.encode("utf-8") for t in texts]
df["embeddings"] = [np.array(e, dtype="float32") for e in embeds]
if metadata is not None:
df = pd.concat([df, metadata], axis=1)
self._table.insert(df, warn=False)
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 32,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts (Iterable[str]): 要添加到向量存储的文本。
metadatas (Optional[List[dict]]): 每个文本块对应的元数据列表。
ids (Optional[List[str]]): 每个文本块对应的ID列表。
batch_size (Optional[int]): 一次插入的文本块批次大小。
返回:
List[str]: 添加的文本的ID列表。
"""
try:
import pandas as pd
except ImportError:
raise ImportError(
"Could not import pandas python package. "
"Please install it with `pip install pandas`."
)
texts = list(texts)
metadf: pd.DataFrame = None
if metadatas is not None:
if isinstance(metadatas, pd.DataFrame):
metadf = metadatas
else:
metadf = pd.DataFrame(metadatas)
out_ids: List[str] = []
nbatches = (len(texts) - 1) // batch_size + 1
for i in range(nbatches):
istart = i * batch_size
iend = (i + 1) * batch_size
batch = texts[istart:iend]
if ids:
batch_ids = ids[istart:iend]
else:
batch_ids = [str(uuid.uuid4()) for _ in range(len(batch))]
if metadf is not None:
batch_meta = metadf.iloc[istart:iend].reset_index(drop=True)
else:
batch_meta = None
self._insert(batch, batch_ids, batch_meta)
out_ids = out_ids + batch_ids
return out_ids
[docs] def add_documents(
self, documents: List[Document], batch_size: int = 32, **kwargs: Any
) -> List[str]:
"""运行更多文档通过嵌入并添加到向量存储。
参数:
documents(List[Document]):要添加到向量存储的文档。
batch_size(Optional[int]):一次插入的文档批量大小。
返回:
List[str]:已添加文本的ID列表。
"""
try:
import pandas as pd
except ImportError:
raise ImportError(
"Could not import pandas python package. "
"Please install it with `pip install pandas`."
)
texts = [x.page_content for x in documents]
metadata = pd.DataFrame([x.metadata for x in documents])
return self.add_texts(texts, metadata=metadata, batch_size=batch_size)
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 1,
filter: Optional[List] = [],
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""运行与查询字符串距离相关的相似性搜索。
参数:
query (str): 查询字符串。
k (Optional[int]): 要检索的邻居数量。
filter (Optional[List]): KDB.AI元数据过滤子句: https://code.kx.com/kdbai/use/filter.html
返回:
List[Document]: 相似文档的列表。
"""
return self.similarity_search_by_vector_with_score(
self._embed_query(query), k=k, filter=filter, **kwargs
)
[docs] def similarity_search_by_vector_with_score(
self,
embedding: List[float],
*,
k: int = 1,
filter: Optional[List] = [],
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与嵌入最相似的文档,以及相似度分数。
参数:
embedding(List[float]):查询向量。
k(Optional[int]):要检索的邻居数量。
filter(Optional[List]):KDB.AI元数据过滤子句:https://code.kx.com/kdbai/use/filter.html
返回:
List[Document]:相似文档的列表。
"""
if "n" in kwargs:
k = kwargs.pop("n")
matches = self._table.search(vectors=[embedding], n=k, filter=filter, **kwargs)
docs: list = []
if isinstance(matches, list):
matches = matches[0]
else:
return docs
for row in matches.to_dict(orient="records"):
text = row.pop("text")
score = row.pop("__nn_distance")
docs.append(
(
Document(
page_content=text,
metadata={k: v for k, v in row.items() if k != "text"},
),
score,
)
)
return docs
[docs] def similarity_search(
self,
query: str,
k: int = 1,
filter: Optional[List] = [],
**kwargs: Any,
) -> List[Document]:
"""运行从查询字符串进行相似性搜索。
参数:
query (str): 查询字符串。
k (Optional[int]): 要检索的邻居数量。
filter (Optional[List]): KDB.AI元数据过滤子句:https://code.kx.com/kdbai/use/filter.html
返回:
List[Document]: 相似文档的列表。
"""
docs_and_scores = self.similarity_search_with_score(
query, k=k, filter=filter, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] @classmethod
def from_texts(
cls: Any,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> Any:
"""未实现。"""
raise Exception("未实现。")