Source code for langchain_community.vectorstores.cassandra

from __future__ import annotations

import asyncio
import typing
import uuid
from typing import (
    Any,
    Awaitable,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    Union,
)

import numpy as np

if typing.TYPE_CHECKING:
    from cassandra.cluster import Session

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

from langchain_community.utilities.cassandra import SetupMode
from langchain_community.vectorstores.utils import maximal_marginal_relevance

CVST = TypeVar("CVST", bound="Cassandra")


[docs]class Cassandra(VectorStore): _embedding_dimension: Union[int, None] def _get_embedding_dimension(self) -> int: if self._embedding_dimension is None: self._embedding_dimension = len( self.embedding.embed_query("This is a sample sentence.") ) return self._embedding_dimension async def _aget_embedding_dimension(self) -> int: if self._embedding_dimension is None: self._embedding_dimension = len( await self.embedding.aembed_query("This is a sample sentence.") ) return self._embedding_dimension
[docs] def __init__( self, embedding: Embeddings, session: Optional[Session] = None, keyspace: Optional[str] = None, table_name: str = "", ttl_seconds: Optional[int] = None, *, body_index_options: Optional[List[Tuple[str, Any]]] = None, setup_mode: SetupMode = SetupMode.SYNC, ) -> None: """Apache Cassandra(R) 用于向量存储工作负载。 要使用它,您需要安装最新版本的 `cassio` 库,并且需要支持向量功能的Cassandra集群/ Astra DB实例。 访问 cassio.org 网站以获取详尽的快速入门和代码示例。 示例: .. code-block:: python from langchain_community.vectorstores import Cassandra from langchain_openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings() session = ... # 创建您的Cassandra会话对象 keyspace = 'my_keyspace' # keyspace 应该已经存在 table_name = 'my_vector_store' vectorstore = Cassandra(embeddings, session, keyspace, table_name) 参数: embedding: 要使用的嵌入函数。 session: Cassandra 驱动程序会话。如果未提供,则从 cassio 中解析。 keyspace: Cassandra 键空间。如果未提供,则从 cassio 中解析。 table_name: Cassandra 表(必填)。 ttl_seconds: 添加文本的可选存活时间。 body_index_options: 用于创建 body 索引的可选选项。 例如 body_index_options = [cassio.table.cql.STANDARD_ANALYZER] setup_mode: 用于创建Cassandra表的模式(SYNC、ASYNC 或 OFF)。 """ try: from cassio.table import MetadataVectorCassandraTable except (ImportError, ModuleNotFoundError): raise ImportError( "Could not import cassio python package. " "Please install it with `pip install cassio`." ) if not table_name: raise ValueError("Missing required parameter 'table_name'.") self.embedding = embedding self.session = session self.keyspace = keyspace self.table_name = table_name self.ttl_seconds = ttl_seconds # self._embedding_dimension = None # kwargs: Dict[str, Any] = {} if body_index_options is not None: kwargs["body_index_options"] = body_index_options if setup_mode == SetupMode.ASYNC: kwargs["async_setup"] = True embedding_dimension: Union[int, Awaitable[int], None] = None if setup_mode == SetupMode.ASYNC: embedding_dimension = self._aget_embedding_dimension() elif setup_mode == SetupMode.SYNC: embedding_dimension = self._get_embedding_dimension() self.table = MetadataVectorCassandraTable( session=session, keyspace=keyspace, table=table_name, vector_dimension=embedding_dimension, metadata_indexing="all", primary_key_type="TEXT", skip_provisioning=setup_mode == SetupMode.OFF, **kwargs, )
@property def embeddings(self) -> Embeddings: return self.embedding def _select_relevance_score_fn(self) -> Callable[[float], float]: """基础的VectorTable已经返回一个“适当的分数”,即在[0, 1]范围内,数值越高表示更*相似*,因此这里的最终分数转换不会颠倒区间: """ return lambda score: score
[docs] def delete_collection(self) -> None: """只是`clear`的别名 (为了更好地与其他VectorStore实现对齐)。 """ self.clear()
[docs] async def adelete_collection(self) -> None: """只是对`aclear`的别名 (为了更好地与其他VectorStore实现对齐)。 """ await self.aclear()
[docs] def clear(self) -> None: """清空表格。""" self.table.clear()
[docs] async def aclear(self) -> None: """清空表格。""" await self.table.aclear()
[docs] def delete_by_document_id(self, document_id: str) -> None: """根据文档ID删除。 参数: document_id:要删除的文档ID。 """ return self.table.delete(row_id=document_id)
[docs] async def adelete_by_document_id(self, document_id: str) -> None: """根据文档ID删除。 参数: document_id:要删除的文档ID。 """ return await self.table.adelete(row_id=document_id)
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """根据向量ID删除。 参数: ids:要删除的ID列表。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ if ids is None: raise ValueError("No ids provided to delete.") for document_id in ids: self.delete_by_document_id(document_id) return True
[docs] async def adelete( self, ids: Optional[List[str]] = None, **kwargs: Any ) -> Optional[bool]: """根据向量ID删除。 参数: ids:要删除的ID列表。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ if ids is None: raise ValueError("No ids provided to delete.") for document_id in ids: await self.adelete_by_document_id(document_id) return True
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = 16, ttl_seconds: Optional[int] = None, **kwargs: Any, ) -> List[str]: """运行更多的文本通过嵌入并添加到向量存储。 参数: texts: 要添加到向量存储的文本。 metadatas: 可选的元数据列表。 ids: 可选的ID列表。 batch_size: 发送到服务器的并发请求数量。 ttl_seconds: 添加文本的可选生存时间。 返回: List[str]: 添加文本的ID列表。 """ _texts = list(texts) ids = ids or [uuid.uuid4().hex for _ in _texts] metadatas = metadatas or [{}] * len(_texts) ttl_seconds = ttl_seconds or self.ttl_seconds embedding_vectors = self.embedding.embed_documents(_texts) for i in range(0, len(_texts), batch_size): batch_texts = _texts[i : i + batch_size] batch_embedding_vectors = embedding_vectors[i : i + batch_size] batch_ids = ids[i : i + batch_size] batch_metadatas = metadatas[i : i + batch_size] futures = [ self.table.put_async( row_id=text_id, body_blob=text, vector=embedding_vector, metadata=metadata or {}, ttl_seconds=ttl_seconds, ) for text, embedding_vector, text_id, metadata in zip( batch_texts, batch_embedding_vectors, batch_ids, batch_metadatas ) ] for future in futures: future.result() return ids
[docs] async def aadd_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, concurrency: int = 16, ttl_seconds: Optional[int] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的文本。 metadatas:元数据的可选列表。 ids:ID的可选列表。 并发性:查询数据库的并发数。 默认为16。 ttl_seconds:添加的文本的可选生存时间。 返回: List[str]:已添加文本的ID列表。 """ _texts = list(texts) ids = ids or [uuid.uuid4().hex for _ in _texts] _metadatas: List[dict] = metadatas or [{}] * len(_texts) ttl_seconds = ttl_seconds or self.ttl_seconds embedding_vectors = await self.embedding.aembed_documents(_texts) sem = asyncio.Semaphore(concurrency) async def send_concurrently( row_id: str, text: str, embedding_vector: List[float], metadata: dict ) -> None: async with sem: await self.table.aput( row_id=row_id, body_blob=text, vector=embedding_vector, metadata=metadata or {}, ttl_seconds=ttl_seconds, ) for i in range(0, len(_texts)): tasks = [ asyncio.create_task( send_concurrently( ids[i], _texts[i], embedding_vectors[i], _metadatas[i] ) ) ] await asyncio.gather(*tasks) return ids
@staticmethod def _search_to_documents( hits: Iterable[Dict[str, Any]], ) -> List[Tuple[Document, float, str]]: # We stick to 'cos' distance as it can be normalized on a 0-1 axis # (1=most relevant), as required by this class' contract. return [ ( Document( page_content=hit["body_blob"], metadata=hit["metadata"], ), 0.5 + 0.5 * hit["distance"], hit["row_id"], ) for hit in hits ] # id-returning search facilities
[docs] def similarity_search_with_score_id_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, ) -> List[Tuple[Document, float, str]]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 要应用的元数据过滤器。 body_search: 要应用的文档文本搜索词。 目前仅由Astra DB支持。 返回: 与查询向量最相似的文档的列表(文档,分数,id)。 """ kwargs: Dict[str, Any] = {} if filter is not None: kwargs["metadata"] = filter if body_search is not None: kwargs["body_search"] = body_search hits = self.table.metric_ann_search( vector=embedding, n=k, metric="cos", **kwargs, ) return self._search_to_documents(hits)
[docs] async def asimilarity_search_with_score_id_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, ) -> List[Tuple[Document, float, str]]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 要应用的元数据过滤器。 body_search: 要应用的文档文本搜索词。 目前仅由Astra DB支持。 返回: 与查询向量最相似的文档的列表(文档,分数,id)。 """ kwargs: Dict[str, Any] = {} if filter is not None: kwargs["metadata"] = filter if body_search is not None: kwargs["body_search"] = body_search hits = await self.table.ametric_ann_search( vector=embedding, n=k, metric="cos", **kwargs, ) return self._search_to_documents(hits)
[docs] def similarity_search_with_score_id( self, query: str, k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, ) -> List[Tuple[Document, float, str]]: """返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:要应用的元数据过滤器。 body_search:要应用的文档文本搜索词语。 目前仅由Astra DB支持。 返回: 与查询向量最相似的文档的列表(文档,分数,id)。 """ embedding_vector = self.embedding.embed_query(query) return self.similarity_search_with_score_id_by_vector( embedding=embedding_vector, k=k, filter=filter, body_search=body_search, )
[docs] async def asimilarity_search_with_score_id( self, query: str, k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, ) -> List[Tuple[Document, float, str]]: """返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:要应用的元数据过滤器。 body_search:要应用的文档文本搜索词语。 目前仅由Astra DB支持。 返回: 与查询向量最相似的文档的列表(文档,分数,id)。 """ embedding_vector = await self.embedding.aembed_query(query) return await self.asimilarity_search_with_score_id_by_vector( embedding=embedding_vector, k=k, filter=filter, body_search=body_search, )
# id-unaware search facilities
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, ) -> List[Tuple[Document, float]]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 要应用的元数据过滤器。 body_search: 要应用的文档文本搜索词。 目前仅由Astra DB支持。 返回: (文档,分数)列表,与查询向量最相似的文档。 """ return [ (doc, score) for (doc, score, docId) in self.similarity_search_with_score_id_by_vector( embedding=embedding, k=k, filter=filter, body_search=body_search, ) ]
[docs] async def asimilarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, ) -> List[Tuple[Document, float]]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 要应用的元数据过滤器。 body_search: 要应用的文档文本搜索词。 目前仅由Astra DB支持。 返回: (文档,分数)列表,与查询向量最相似的文档。 """ return [ (doc, score) for ( doc, score, _, ) in await self.asimilarity_search_with_score_id_by_vector( embedding=embedding, k=k, filter=filter, body_search=body_search, ) ]
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 要应用的元数据过滤器。 body_search: 要应用的文档文本搜索词语。 目前仅由Astra DB支持。 返回: 文档列表,与查询向量最相似。 """ return [ doc for doc, _ in self.similarity_search_with_score_by_vector( embedding, k, filter=filter, body_search=body_search, ) ]
[docs] async def asimilarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 filter: 要应用的元数据过滤器。 body_search: 要应用的文档文本搜索词语。 目前仅由Astra DB支持。 返回: 文档列表,与查询向量最相似。 """ return [ doc for doc, _ in await self.asimilarity_search_with_score_by_vector( embedding, k, filter=filter, body_search=body_search, ) ]
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:要应用的元数据过滤器。 body_search:要应用的文档文本搜索词。 目前仅由Astra DB支持。 返回: (文档,分数)的列表,最接近查询向量的文档。 """ embedding_vector = self.embedding.embed_query(query) return self.similarity_search_with_score_by_vector( embedding_vector, k, filter=filter, body_search=body_search, )
[docs] async def asimilarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找类似文档的文本。 k:要返回的文档数量。默认为4。 filter:要应用的元数据过滤器。 body_search:要应用的文档文本搜索词。 目前仅由Astra DB支持。 返回: (文档,分数)的列表,最接近查询向量的文档。 """ embedding_vector = await self.embedding.aembed_query(query) return await self.asimilarity_search_with_score_by_vector( embedding_vector, k, filter=filter, body_search=body_search, )
@staticmethod def _mmr_search_to_documents( prefetch_hits: List[Dict[str, Any]], embedding: List[float], k: int, lambda_mult: float, ) -> List[Document]: # let the mmr utility pick the *indices* in the above array mmr_chosen_indices = maximal_marginal_relevance( np.array(embedding, dtype=np.float32), [pf_hit["vector"] for pf_hit in prefetch_hits], k=k, lambda_mult=lambda_mult, ) mmr_hits = [ pf_hit for pf_index, pf_hit in enumerate(prefetch_hits) if pf_index in mmr_chosen_indices ] return [ Document( page_content=hit["body_blob"], metadata=hit["metadata"], ) for hit in mmr_hits ]
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取并传递给MMR算法的文档数量。默认为20。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。 filter: 要应用的元数据过滤器。 body_search: 要应用的文档文本搜索词语。目前仅由Astra DB支持。 返回: 通过最大边际相关性选择的文档列表。 """ _kwargs: Dict[str, Any] = {} if filter is not None: _kwargs["metadata"] = filter if body_search is not None: _kwargs["body_search"] = body_search prefetch_hits = list( self.table.metric_ann_search( vector=embedding, n=fetch_k, metric="cos", **_kwargs, ) ) return self._mmr_search_to_documents(prefetch_hits, embedding, k, lambda_mult)
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Dict[str, str]] = None, body_search: Optional[Union[str, List[str]]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 要查找相似文档的嵌入。 k: 要返回的文档数量。默认为4。 fetch_k: 要获取并传递给MMR算法的文档数量。默认为20。 lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。 filter: 要应用的元数据过滤器。 body_search: 要应用的文档文本搜索词语。目前仅由Astra DB支持。 返回: 通过最大边际相关性选择的文档列表。 """ _kwargs: Dict[str, Any] = {} if filter is not None: _kwargs["metadata"] = filter if body_search is not None: _kwargs["body_search"] = body_search prefetch_hits = list( await self.table.ametric_ann_search( vector=embedding, n=fetch_k, metric="cos", **_kwargs, ) ) return self._mmr_search_to_documents(prefetch_hits, embedding, k, lambda_mult)
[docs] @classmethod def from_texts( cls: Type[CVST], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, *, session: Optional[Session] = None, keyspace: Optional[str] = None, table_name: str = "", ids: Optional[List[str]] = None, batch_size: int = 16, ttl_seconds: Optional[int] = None, body_index_options: Optional[List[Tuple[str, Any]]] = None, **kwargs: Any, ) -> CVST: """从原始文本创建一个Cassandra向量存储。 参数: texts: 要添加到向量存储中的文本。 embedding: 要使用的嵌入函数。 metadatas: 与文本相关的元数据的可选列表。 session: Cassandra驱动程序会话。 如果未提供,则会从cassio中解析。 keyspace: Cassandra键空间。 如果未提供,则会从cassio中解析。 table_name: Cassandra表(必填)。 ids: 与文本相关的ID的可选列表。 batch_size: 发送到服务器的并发请求数量。 默认为16。 ttl_seconds: 添加的文本的可选存活时间。 body_index_options: 用于创建正文索引的可选选项。 例如,body_index_options = [cassio.table.cql.STANDARD_ANALYZER] 返回: 一个Cassandra向量存储。 """ store = cls( embedding=embedding, session=session, keyspace=keyspace, table_name=table_name, ttl_seconds=ttl_seconds, body_index_options=body_index_options, ) store.add_texts( texts=texts, metadatas=metadatas, ids=ids, batch_size=batch_size ) return store
[docs] @classmethod async def afrom_texts( cls: Type[CVST], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, *, session: Optional[Session] = None, keyspace: Optional[str] = None, table_name: str = "", ids: Optional[List[str]] = None, concurrency: int = 16, ttl_seconds: Optional[int] = None, body_index_options: Optional[List[Tuple[str, Any]]] = None, **kwargs: Any, ) -> CVST: """从原始文本创建一个Cassandra向量存储。 参数: texts: 要添加到向量存储中的文本。 embedding: 要使用的嵌入函数。 metadatas: 与文本相关的元数据的可选列表。 session: Cassandra驱动程序会话。 如果未提供,则会从cassio中解析。 keyspace: Cassandra键空间。 如果未提供,则会从cassio中解析。 table_name: Cassandra表(必填)。 ids: 与文本相关的ID的可选列表。 concurrency: 发送到数据库的并发查询数。 默认为16。 ttl_seconds: 添加的文本的可选存活时间。 body_index_options: 用于创建正文索引的可选选项。 例如:body_index_options = [cassio.table.cql.STANDARD_ANALYZER] 返回: 一个Cassandra向量存储。 """ store = cls( embedding=embedding, session=session, keyspace=keyspace, table_name=table_name, ttl_seconds=ttl_seconds, setup_mode=SetupMode.ASYNC, body_index_options=body_index_options, ) await store.aadd_texts( texts=texts, metadatas=metadatas, ids=ids, concurrency=concurrency ) return store
[docs] @classmethod def from_documents( cls: Type[CVST], documents: List[Document], embedding: Embeddings, *, session: Optional[Session] = None, keyspace: Optional[str] = None, table_name: str = "", ids: Optional[List[str]] = None, batch_size: int = 16, ttl_seconds: Optional[int] = None, body_index_options: Optional[List[Tuple[str, Any]]] = None, **kwargs: Any, ) -> CVST: """从文档列表创建一个Cassandra向量存储。 参数: documents: 要添加到向量存储中的文档。 embedding: 要使用的嵌入函数。 session: Cassandra驱动程序会话。 如果未提供,则会从cassio中解析。 keyspace: Cassandra键空间。 如果未提供,则会从cassio中解析。 table_name: Cassandra表(必填)。 ids: 与文档关联的可选ID列表。 batch_size: 发送到服务器的并发请求数量。 默认为16。 ttl_seconds: 添加文档的可选存活时间。 body_index_options: 用于创建正文索引的可选选项。 例如,body_index_options = [cassio.table.cql.STANDARD_ANALYZER] 返回: 一个Cassandra向量存储。 """ texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return cls.from_texts( texts=texts, embedding=embedding, metadatas=metadatas, session=session, keyspace=keyspace, table_name=table_name, ids=ids, batch_size=batch_size, ttl_seconds=ttl_seconds, body_index_options=body_index_options, **kwargs, )
[docs] @classmethod async def afrom_documents( cls: Type[CVST], documents: List[Document], embedding: Embeddings, *, session: Optional[Session] = None, keyspace: Optional[str] = None, table_name: str = "", ids: Optional[List[str]] = None, concurrency: int = 16, ttl_seconds: Optional[int] = None, body_index_options: Optional[List[Tuple[str, Any]]] = None, **kwargs: Any, ) -> CVST: """从文档列表创建一个Cassandra向量存储。 参数: documents: 要添加到向量存储的文档。 embedding: 要使用的嵌入函数。 session: Cassandra驱动程序会话。 如果未提供,则从cassio中解析。 keyspace: Cassandra键空间。 如果未提供,则从cassio中解析。 table_name: Cassandra表(必填)。 ids: 与文档关联的ID的可选列表。 concurrency: 发送到数据库的并发查询数。 默认为16。 ttl_seconds: 添加文档的可选存活时间。 body_index_options: 用于创建正文索引的可选选项。 例如 body_index_options = [cassio.table.cql.STANDARD_ANALYZER] 返回: 一个Cassandra向量存储。 """ texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return await cls.afrom_texts( texts=texts, embedding=embedding, metadatas=metadatas, session=session, keyspace=keyspace, table_name=table_name, ids=ids, concurrency=concurrency, ttl_seconds=ttl_seconds, body_index_options=body_index_options, **kwargs, )