Source code for langchain_community.vectorstores.kinetica

from __future__ import annotations

import asyncio
import enum
import json
import logging
import struct
import uuid
from collections import OrderedDict
from enum import Enum
from functools import partial
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseSettings
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import maximal_marginal_relevance


[docs]class DistanceStrategy(str, enum.Enum): """距离策略的枚举器。""" EUCLIDEAN = "l2" COSINE = "cosine" MAX_INNER_PRODUCT = "inner"
def _results_to_docs(docs_and_scores: Any) -> List[Document]: """从文档和分数中返回文档。""" return [doc for doc, _ in docs_and_scores]
[docs]class Dimension(int, Enum): """一些已知嵌入的默认维度。""" OPENAI = 1536
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.EUCLIDEAN _LANGCHAIN_DEFAULT_SCHEMA_NAME = "langchain" ## Default Kinetica schema name _LANGCHAIN_DEFAULT_COLLECTION_NAME = ( "langchain_kinetica_embeddings" ## Default Kinetica table name )
[docs]class KineticaSettings(BaseSettings): """`Kinetica` 客户端配置。 属性: host (str) : 用于连接到 MyScale 后端的 URL。 默认为 'localhost'。 port (int) : 用于通过 HTTP 连接的 URL 端口。默认为 8443。 username (str) : 登录的用户名。默认为 None。 password (str) : 登录的密码。默认为 None。 database (str) : 要查找表的数据库名称。默认为 'default'。 table (str) : 要操作的表名称。 默认为 'vector_table'。 metric (str) : 用于计算距离的度量标准, 支持的有 ('angular', 'euclidean', 'manhattan', 'hamming', 'dot')。默认为 'angular'。 https://github.com/spotify/annoy/blob/main/src/annoymodule.cc#L149-L169 """ host: str = "http://127.0.0.1" port: int = 9191 username: Optional[str] = None password: Optional[str] = None database: str = _LANGCHAIN_DEFAULT_SCHEMA_NAME table: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME metric: str = DEFAULT_DISTANCE_STRATEGY.value def __getitem__(self, item: str) -> Any: return getattr(self, item) class Config: env_file = ".env" env_prefix = "kinetica_" env_file_encoding = "utf-8"
[docs]class Kinetica(VectorStore): """`Kinetica` 向量存储。 要使用,应安装``gpudb`` python包。 参数: kinetica_settings: Kinetica连接设置类。 embedding_function: 任何实现`langchain.embeddings.base.Embeddings`接口的嵌入函数。 collection_name: 要使用的集合的名称。 (默认值: langchain) 注意: 这不是表的名称,而是集合的名称。 在初始化存储时将创建表(如果不存在) 因此,请确保用户有权限创建表。 distance_strategy: 要使用的距离策略。 (默认值: COSINE) pre_delete_collection: 如果为True,则如果存在将删除集合。 (默认值: False)。 用于测试。 engine_args: SQLAlchemy的创建引擎参数。 示例: .. code-block:: python from langchain_community.vectorstores import Kinetica, KineticaSettings from langchain_community.embeddings.openai import OpenAIEmbeddings kinetica_settings = KineticaSettings( host="http://127.0.0.1", username="", password="" ) COLLECTION_NAME = "kinetica_store" embeddings = OpenAIEmbeddings() vectorstore = Kinetica.from_documents( documents=docs, embedding=embeddings, collection_name=COLLECTION_NAME, config=kinetica_settings, )"""
[docs] def __init__( self, config: KineticaSettings, embedding_function: Embeddings, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, schema_name: str = _LANGCHAIN_DEFAULT_SCHEMA_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, pre_delete_collection: bool = False, logger: Optional[logging.Logger] = None, relevance_score_fn: Optional[Callable[[float], float]] = None, ) -> None: """Kinetica类的构造函数 参数: config (KineticaSettings): 一个`KineticaSettings`实例 embedding_function (Embeddings): 要使用的嵌入函数 collection_name (str, optional): Kinetica表名称。 默认为_LANGCHAIN_DEFAULT_COLLECTION_NAME。 schema_name (str, optional): Kinetica表名称。 默认为_LANGCHAIN_DEFAULT_SCHEMA_NAME。 distance_strategy (DistanceStrategy, optional): _description_。 默认为DEFAULT_DISTANCE_STRATEGY。 pre_delete_collection (bool, optional): _description_。默认为False。 logger (Optional[logging.Logger], optional): _description_。 默认为None。 """ self._config = config self.embedding_function = embedding_function self.collection_name = collection_name self.schema_name = schema_name self._distance_strategy = distance_strategy self.pre_delete_collection = pre_delete_collection self.logger = logger or logging.getLogger(__name__) self.override_relevance_score_fn = relevance_score_fn self._db = self.__get_db(self._config)
def __post_init__(self, dimensions: int) -> None: """ 初始化商店。 """ try: from gpudb import GPUdbTable except ImportError: raise ImportError( "Could not import Kinetica python API. " "Please install it with `pip install gpudb==7.2.0.1`." ) self.dimensions = dimensions dimension_field = f"vector({dimensions})" if self.pre_delete_collection: self.delete_schema() self.table_name = self.collection_name if self.schema_name is not None and len(self.schema_name) > 0: self.table_name = f"{self.schema_name}.{self.collection_name}" self.table_schema = [ ["text", "string"], ["embedding", "bytes", dimension_field], ["metadata", "string", "json"], ["id", "string", "uuid"], ] self.create_schema() self.EmbeddingStore: GPUdbTable = self.create_tables_if_not_exists() def __get_db(self, config: KineticaSettings) -> Any: try: from gpudb import GPUdb except ImportError: raise ImportError( "Could not import Kinetica python API. " "Please install it with `pip install gpudb==7.2.0.1`." ) options = GPUdb.Options() options.username = config.username options.password = config.password options.skip_ssl_cert_verification = True return GPUdb(host=config.host, options=options) @property def embeddings(self) -> Embeddings: return self.embedding_function @classmethod def __from( cls, config: KineticaSettings, texts: List[str], embeddings: List[List[float]], embedding: Embeddings, dimensions: int, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, pre_delete_collection: bool = False, logger: Optional[logging.Logger] = None, **kwargs: Any, ) -> Kinetica: """用于辅助构建`Kinetica`存储实例的类方法,使用不同的参数组合 参数: config (KineticaSettings): 一个`KineticaSettings`实例 texts (List[str]): 要生成嵌入并存储的文本列表 embeddings (List[List[float]]): 嵌入列表 embedding (Embeddings): 嵌入函数 dimensions (int): 嵌入的维度数量 metadatas (Optional[List[dict]], 可选): 与每个文本相关联的JSON数据列表。默认为None。 ids (Optional[List[str]], 可选): 与每个文本相关联的唯一ID列表(默认为UUID)。默认为None。 collection_name (str, 可选): Kinetica模式名称。默认为_LANGCHAIN_DEFAULT_COLLECTION_NAME。 distance_strategy (DistanceStrategy, 可选): 目前未使用。默认为DEFAULT_DISTANCE_STRATEGY。 pre_delete_collection (bool, 可选): 是否删除Kinetica模式。默认为False。 logger (Optional[logging.Logger], 可选): 用于在不同级别记录日志的记录器。默认为None。 返回: Kinetica: Kinetica类的一个实例 """ if ids is None: ids = [str(uuid.uuid4()) for _ in texts] if not metadatas: metadatas = [{} for _ in texts] store = cls( config=config, collection_name=collection_name, embedding_function=embedding, # dimensions=dimensions, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, logger=logger, **kwargs, ) store.__post_init__(dimensions) store.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs ) return store
[docs] def create_tables_if_not_exists(self) -> Any: """创建用于存储文本和嵌入向量的表格""" try: from gpudb import GPUdbTable except ImportError: raise ImportError( "Could not import Kinetica python API. " "Please install it with `pip install gpudb==7.2.0.1`." ) return GPUdbTable( _type=self.table_schema, name=self.table_name, db=self._db, options={"is_replicated": "true"}, )
[docs] def drop_tables(self) -> None: """删除表格""" self._db.clear_table( f"{self.table_name}", options={"no_error_if_not_exists": "true"} )
[docs] def create_schema(self) -> None: """创建一个新的Kinetica模式""" self._db.create_schema(self.schema_name)
[docs] def delete_schema(self) -> None: """删除一个Kinetica模式,级联设置为`true` 此方法将删除一个包含所有表的模式。 """ self.logger.debug("Trying to delete collection") self._db.drop_schema( self.schema_name, {"no_error_if_not_exists": "true", "cascade": "true"} )
[docs] def add_embeddings( self, texts: Iterable[str], embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """将嵌入添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 embeddings:嵌入向量的列表的列表。 metadatas:与文本相关联的元数据列表。 ids:文本嵌入对的id列表 kwargs:向量存储特定参数 """ if ids is None: ids = [str(uuid.uuid4()) for _ in texts] if not metadatas: metadatas = [{} for _ in texts] records = [] for text, embedding, metadata, id in zip(texts, embeddings, metadatas, ids): buf = struct.pack("%sf" % self.dimensions, *embedding) records.append([text, buf, json.dumps(metadata), id]) self.EmbeddingStore.insert_records(records) return ids
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多的文本通过嵌入并添加到向量存储中。 参数: texts: 要添加到向量存储中的字符串的可迭代对象。 metadatas: 与文本相关联的元数据(JSON数据)的可选列表。 ids: 提供的文本的ID(UUID)列表;如果为None,则将生成ID。 kwargs: 向量存储特定参数 返回: 将文本添加到向量存储中后的ID列表。 """ embeddings = self.embedding_function.embed_documents(list(texts)) self.dimensions = len(embeddings[0]) if not hasattr(self, "EmbeddingStore"): self.__post_init__(self.dimensions) return self.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs )
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[dict] = None, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找相似文档的文本。 k:要返回的文档数量。默认为4。 filter(可选[Dict[str,str]]):按元数据过滤。默认为无。 返回: 返回与查询最相似的文档列表,以及每个文档的得分。 """ embedding = self.embedding_function.embed_query(query) docs = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter ) return docs
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[dict] = None, ) -> List[Tuple[Document, float]]: resp: Dict = self.__query_collection(embedding, k, filter) records: OrderedDict = resp["records"] results = list(zip(*list(records.values()))) return self._results_to_docs_and_scores(results)
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[dict] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。 返回: 返回与查询向量最相似的文档列表。 """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter ) return [doc for doc, _ in docs_and_scores]
def _results_to_docs_and_scores(self, results: Any) -> List[Tuple[Document, float]]: """返回结果中的文档和分数。""" docs = [ ( Document( page_content=result[0], metadata=json.loads(result[1]), ), result[2] if self.embedding_function is not None else None, ) for result in results ] return docs def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - 向量存储中使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 """ if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided # in vectorstore constructor if self._distance_strategy == DistanceStrategy.COSINE: return self._cosine_relevance_score_fn elif self._distance_strategy == DistanceStrategy.EUCLIDEAN: return self._euclidean_relevance_score_fn elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return self._max_inner_product_relevance_score_fn else: raise ValueError( "No supported normalization function" f" for distance_strategy of {self._distance_strategy}." "Consider providing relevance_score_fn to Kinetica constructor." ) @property def distance_strategy(self) -> str: if self._distance_strategy == DistanceStrategy.EUCLIDEAN: return "l2_distance" elif self._distance_strategy == DistanceStrategy.COSINE: return "cosine_distance" elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return "dot_product" else: raise ValueError( f"Got unexpected value for distance: {self._distance_strategy}. " f"Should be one of {', '.join([ds.value for ds in DistanceStrategy])}." ) def __query_collection( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, str]] = None, ) -> Dict: """查询集合。""" # if filter is not None: # filter_clauses = [] # for key, value in filter.items(): # IN = "in" # if isinstance(value, dict) and IN in map(str.lower, value): # value_case_insensitive = { # k.lower(): v for k, v in value.items() # } # filter_by_metadata = self.EmbeddingStore.cmetadata[ # key # ].astext.in_(value_case_insensitive[IN]) # filter_clauses.append(filter_by_metadata) # else: # filter_by_metadata = self.EmbeddingStore.cmetadata[ # key # ].astext == str(value) # filter_clauses.append(filter_by_metadata) json_filter = json.dumps(filter) if filter is not None else None where_clause = ( f" where '{json_filter}' = JSON(metadata) " if json_filter is not None else "" ) embedding_str = "[" + ",".join([str(x) for x in embedding]) + "]" dist_strategy = self.distance_strategy query_string = f""" SELECT text, metadata, {dist_strategy}(embedding, '{embedding_str}') as distance, embedding FROM {self.table_name} {where_clause} ORDER BY distance asc NULLS LAST LIMIT {k} """ self.logger.debug(query_string) resp = self._db.execute_sql_and_decode(query_string) self.logger.debug(resp) return resp
[docs] def max_marginal_relevance_search_with_score_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """使用最大边际相关性和分数返回所选文档的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 要查找相似文档的嵌入。 k (int): 要返回的文档数量。默认为4。 fetch_k (int): 要获取以传递给MMR算法的文档数量。默认为20。 lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。 filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。 返回: List[Tuple[Document, float]]: 通过最大边际相关性选择的文档列表,以及每个文档的得分。 """ resp = self.__query_collection(embedding=embedding, k=fetch_k, filter=filter) records: OrderedDict = resp["records"] results = list(zip(*list(records.values()))) embedding_list = [ struct.unpack("%sf" % self.dimensions, embedding) for embedding in records["embedding"] ] mmr_selected = maximal_marginal_relevance( np.array(embedding, dtype=np.float32), embedding_list, k=k, lambda_mult=lambda_mult, ) candidates = self._results_to_docs_and_scores(results) return [r for i, r in enumerate(candidates) if i in mmr_selected]
[docs] def max_marginal_relevance_search_with_score( self, query: str, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[dict] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回使用最大边际相关性和分数选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: query (str): 要查找类似文档的文本。 k (int): 要返回的文档数量。默认为4。 fetch_k (int): 要获取以传递给MMR算法的文档数量。 默认为20。 lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度, 0对应最大多样性,1对应最小多样性。 默认为0.5。 filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。 返回: List[Tuple[Document, float]]: 通过最大边际相关性选择的文档列表, 以及每个文档的得分。 """ embedding = self.embedding_function.embed_query(query) docs = self.max_marginal_relevance_search_with_score_by_vector( embedding=embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, **kwargs, ) return docs
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档到嵌入向量。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding (str): 要查找类似文档的文本。 k (int): 要返回的文档数量。默认为4。 fetch_k (int): 要获取以传递给MMR算法的文档数量。 默认为20。 lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度, 0对应最大多样性,1对应最小多样性。 默认为0.5。 filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。 返回: List[Document]: 通过最大边际相关性选择的文档列表。 """ docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector( embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, **kwargs, ) return _results_to_docs(docs_and_scores)
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。""" # This is a temporary workaround to make the similarity search # asynchronous. The proper solution is to make the similarity search # asynchronous in the vector store implementations. func = partial( self.max_marginal_relevance_search_by_vector, embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, **kwargs, ) return await asyncio.get_event_loop().run_in_executor(None, func)
[docs] @classmethod def from_texts( cls: Type[Kinetica], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, config: KineticaSettings = KineticaSettings(), collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> Kinetica: """将传入的文本添加到向量存储中并返回 参数: cls (Type[Kinetica]): Kinetica类 texts (List[str]): 生成嵌入的文本列表 embedding (Embeddings): 嵌入列表 metadatas (Optional[List[dict]], optional): 描述文本/文档的JSON字典列表。默认为None。 config (KineticaSettings): `KineticaSettings`实例 collection_name (str, optional): Kinetica模式名称。默认为_LANGCHAIN_DEFAULT_COLLECTION_NAME。 distance_strategy (DistanceStrategy, optional): 距离策略,例如l2、cosine等。默认为DEFAULT_DISTANCE_STRATEGY。 ids (Optional[List[str]], optional): 每个文本/文档的UUID列表。默认为None。 pre_delete_collection (bool, optional): 指示是否删除Kinetica模式。默认为False。 返回: Kinetica: `Kinetica`实例 """ if len(texts) == 0: raise ValueError("texts is empty") try: first_embedding = embedding.embed_documents(texts[0:1]) except NotImplementedError: first_embedding = [embedding.embed_query(texts[0])] dimensions = len(first_embedding[0]) embeddings = embedding.embed_documents(list(texts)) kinetica_store = cls.__from( texts=texts, embeddings=embeddings, embedding=embedding, dimensions=dimensions, config=config, metadatas=metadatas, ids=ids, collection_name=collection_name, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, ) return kinetica_store
[docs] @classmethod def from_embeddings( cls: Type[Kinetica], text_embeddings: List[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, config: KineticaSettings = KineticaSettings(), dimensions: int = Dimension.OPENAI, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> Kinetica: """将传入的嵌入添加到向量存储中并返回 参数: cls (Type[Kinetica]): Kinetica类 text_embeddings (List[Tuple[str, List[float]]]): 一组文本和嵌入 embedding (Embeddings): 嵌入列表 metadatas (Optional[List[dict]], optional): 字典列表,描述文本/文档。默认为None。 config (KineticaSettings): `KineticaSettings`实例 dimensions (int, optional): 向量数据的维度,如果未传入,则使用默认值。默认为Dimension.OPENAI。 collection_name (str, optional): Kinetica模式名称。默认为_LANGCHAIN_DEFAULT_COLLECTION_NAME。 distance_strategy (DistanceStrategy, optional): 距离策略,例如l2、cosine等。默认为DEFAULT_DISTANCE_STRATEGY。 ids (Optional[List[str]], optional): 每个文本/文档的UUID列表。默认为None。 pre_delete_collection (bool, optional): 指示是否删除Kinetica模式。默认为False。 返回: Kinetica: 一个`Kinetica`实例 """ texts = [t[0] for t in text_embeddings] embeddings = [t[1] for t in text_embeddings] dimensions = len(embeddings[0]) return cls.__from( texts=texts, embeddings=embeddings, embedding=embedding, dimensions=dimensions, config=config, metadatas=metadatas, ids=ids, collection_name=collection_name, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, )
[docs] @classmethod def from_documents( cls: Type[Kinetica], documents: List[Document], embedding: Embeddings, config: KineticaSettings = KineticaSettings(), metadatas: Optional[List[dict]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> Kinetica: """将传入的`Document`列表添加到向量存储中并返回 参数: cls (Type[Kinetica]): Kinetica类 texts (List[str]): 生成嵌入的文本列表 embedding (Embeddings): 嵌入列表 config (KineticaSettings): `KineticaSettings`实例 metadatas (Optional[List[dict]], optional): 描述文本/文档的JSON字典列表。默认为None。 collection_name (str, optional): Kinetica模式名称。默认为_LANGCHAIN_DEFAULT_COLLECTION_NAME。 distance_strategy (DistanceStrategy, optional): 距离策略,例如l2、cosine等。默认为DEFAULT_DISTANCE_STRATEGY。 ids (Optional[List[str]], optional): 每个文本/文档的UUID列表。默认为None。 pre_delete_collection (bool, optional): 指示是否删除Kinetica模式。默认为False。 返回: Kinetica: 一个`Kinetica`实例 """ texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] return cls.from_texts( texts=texts, embedding=embedding, metadatas=metadatas, config=config, collection_name=collection_name, distance_strategy=distance_strategy, ids=ids, pre_delete_collection=pre_delete_collection, **kwargs, )