Source code for langchain_community.vectorstores.kdbai

from __future__ import annotations

import logging
import uuid
from typing import Any, Iterable, List, Optional, Tuple

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import DistanceStrategy

logger = logging.getLogger(__name__)


[docs]class KDBAI(VectorStore): """`KDB.AI` 向量存储。 请查看 [https://kdb.ai](https://kdb.ai) 要使用,您应该安装 `kdbai_client` python 包。 参数: table: 用作存储的 kdbai_client.Table 对象, embedding: 任何实现 `langchain.embeddings.base.Embeddings` 接口的嵌入函数, distance_strategy: DistanceStrategy.EUCLIDEAN_DISTANCE、DistanceStrategy.DOT_PRODUCT 或 DistanceStrategy.COSINE 中的一个选项。 查看示例 [notebook](https://github.com/KxSystems/langchain/blob/KDB.AI/docs/docs/integrations/vectorstores/kdbai.ipynb)。"""
[docs] def __init__( self, table: Any, embedding: Embeddings, distance_strategy: Optional[ DistanceStrategy ] = DistanceStrategy.EUCLIDEAN_DISTANCE, ): try: import kdbai_client # noqa except ImportError: raise ImportError( "Could not import kdbai_client python package. " "Please install it with `pip install kdbai_client`." ) self._table = table self._embedding = embedding self.distance_strategy = distance_strategy
@property def embeddings(self) -> Optional[Embeddings]: if isinstance(self._embedding, Embeddings): return self._embedding return None def _embed_documents(self, texts: Iterable[str]) -> List[List[float]]: if isinstance(self._embedding, Embeddings): return self._embedding.embed_documents(list(texts)) return [self._embedding(t) for t in texts] def _embed_query(self, text: str) -> List[float]: if isinstance(self._embedding, Embeddings): return self._embedding.embed_query(text) return self._embedding(text) def _insert( self, texts: List[str], ids: Optional[List[str]], metadata: Optional[Any] = None, ) -> None: try: import numpy as np except ImportError: raise ImportError( "Could not import numpy python package. " "Please install it with `pip install numpy`." ) try: import pandas as pd except ImportError: raise ImportError( "Could not import pandas python package. " "Please install it with `pip install pandas`." ) embeds = self._embedding.embed_documents(texts) df = pd.DataFrame() df["id"] = ids df["text"] = [t.encode("utf-8") for t in texts] df["embeddings"] = [np.array(e, dtype="float32") for e in embeds] if metadata is not None: df = pd.concat([df, metadata], axis=1) self._table.insert(df, warn=False)
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = 32, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts (Iterable[str]): 要添加到向量存储的文本。 metadatas (Optional[List[dict]]): 每个文本块对应的元数据列表。 ids (Optional[List[str]]): 每个文本块对应的ID列表。 batch_size (Optional[int]): 一次插入的文本块批次大小。 返回: List[str]: 添加的文本的ID列表。 """ try: import pandas as pd except ImportError: raise ImportError( "Could not import pandas python package. " "Please install it with `pip install pandas`." ) texts = list(texts) metadf: pd.DataFrame = None if metadatas is not None: if isinstance(metadatas, pd.DataFrame): metadf = metadatas else: metadf = pd.DataFrame(metadatas) out_ids: List[str] = [] nbatches = (len(texts) - 1) // batch_size + 1 for i in range(nbatches): istart = i * batch_size iend = (i + 1) * batch_size batch = texts[istart:iend] if ids: batch_ids = ids[istart:iend] else: batch_ids = [str(uuid.uuid4()) for _ in range(len(batch))] if metadf is not None: batch_meta = metadf.iloc[istart:iend].reset_index(drop=True) else: batch_meta = None self._insert(batch, batch_ids, batch_meta) out_ids = out_ids + batch_ids return out_ids
[docs] def add_documents( self, documents: List[Document], batch_size: int = 32, **kwargs: Any ) -> List[str]: """运行更多文档通过嵌入并添加到向量存储。 参数: documents(List[Document]):要添加到向量存储的文档。 batch_size(Optional[int]):一次插入的文档批量大小。 返回: List[str]:已添加文本的ID列表。 """ try: import pandas as pd except ImportError: raise ImportError( "Could not import pandas python package. " "Please install it with `pip install pandas`." ) texts = [x.page_content for x in documents] metadata = pd.DataFrame([x.metadata for x in documents]) return self.add_texts(texts, metadata=metadata, batch_size=batch_size)
[docs] def similarity_search_with_score( self, query: str, k: int = 1, filter: Optional[List] = [], **kwargs: Any, ) -> List[Tuple[Document, float]]: """运行与查询字符串距离相关的相似性搜索。 参数: query (str): 查询字符串。 k (Optional[int]): 要检索的邻居数量。 filter (Optional[List]): KDB.AI元数据过滤子句: https://code.kx.com/kdbai/use/filter.html 返回: List[Document]: 相似文档的列表。 """ return self.similarity_search_by_vector_with_score( self._embed_query(query), k=k, filter=filter, **kwargs )
[docs] def similarity_search_by_vector_with_score( self, embedding: List[float], *, k: int = 1, filter: Optional[List] = [], **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与嵌入最相似的文档,以及相似度分数。 参数: embedding(List[float]):查询向量。 k(Optional[int]):要检索的邻居数量。 filter(Optional[List]):KDB.AI元数据过滤子句:https://code.kx.com/kdbai/use/filter.html 返回: List[Document]:相似文档的列表。 """ if "n" in kwargs: k = kwargs.pop("n") matches = self._table.search(vectors=[embedding], n=k, filter=filter, **kwargs) docs: list = [] if isinstance(matches, list): matches = matches[0] else: return docs for row in matches.to_dict(orient="records"): text = row.pop("text") score = row.pop("__nn_distance") docs.append( ( Document( page_content=text, metadata={k: v for k, v in row.items() if k != "text"}, ), score, ) ) return docs
[docs] @classmethod def from_texts( cls: Any, texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> Any: """未实现。""" raise Exception("未实现。")