from __future__ import annotations
import asyncio
import typing
import uuid
from typing import (
Any,
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
)
import numpy as np
if typing.TYPE_CHECKING:
from cassandra.cluster import Session
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_community.utilities.cassandra import SetupMode
from langchain_community.vectorstores.utils import maximal_marginal_relevance
CVST = TypeVar("CVST", bound="Cassandra")
[docs]class Cassandra(VectorStore):
_embedding_dimension: Union[int, None]
def _get_embedding_dimension(self) -> int:
if self._embedding_dimension is None:
self._embedding_dimension = len(
self.embedding.embed_query("This is a sample sentence.")
)
return self._embedding_dimension
async def _aget_embedding_dimension(self) -> int:
if self._embedding_dimension is None:
self._embedding_dimension = len(
await self.embedding.aembed_query("This is a sample sentence.")
)
return self._embedding_dimension
[docs] def __init__(
self,
embedding: Embeddings,
session: Optional[Session] = None,
keyspace: Optional[str] = None,
table_name: str = "",
ttl_seconds: Optional[int] = None,
*,
body_index_options: Optional[List[Tuple[str, Any]]] = None,
setup_mode: SetupMode = SetupMode.SYNC,
) -> None:
"""Apache Cassandra(R) 用于向量存储工作负载。
要使用它,您需要安装最新版本的 `cassio` 库,并且需要支持向量功能的Cassandra集群/ Astra DB实例。
访问 cassio.org 网站以获取详尽的快速入门和代码示例。
示例:
.. code-block:: python
from langchain_community.vectorstores import Cassandra
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
session = ... # 创建您的Cassandra会话对象
keyspace = 'my_keyspace' # keyspace 应该已经存在
table_name = 'my_vector_store'
vectorstore = Cassandra(embeddings, session, keyspace, table_name)
参数:
embedding: 要使用的嵌入函数。
session: Cassandra 驱动程序会话。如果未提供,则从 cassio 中解析。
keyspace: Cassandra 键空间。如果未提供,则从 cassio 中解析。
table_name: Cassandra 表(必填)。
ttl_seconds: 添加文本的可选存活时间。
body_index_options: 用于创建 body 索引的可选选项。
例如 body_index_options = [cassio.table.cql.STANDARD_ANALYZER]
setup_mode: 用于创建Cassandra表的模式(SYNC、ASYNC 或 OFF)。
"""
try:
from cassio.table import MetadataVectorCassandraTable
except (ImportError, ModuleNotFoundError):
raise ImportError(
"Could not import cassio python package. "
"Please install it with `pip install cassio`."
)
if not table_name:
raise ValueError("Missing required parameter 'table_name'.")
self.embedding = embedding
self.session = session
self.keyspace = keyspace
self.table_name = table_name
self.ttl_seconds = ttl_seconds
#
self._embedding_dimension = None
#
kwargs: Dict[str, Any] = {}
if body_index_options is not None:
kwargs["body_index_options"] = body_index_options
if setup_mode == SetupMode.ASYNC:
kwargs["async_setup"] = True
embedding_dimension: Union[int, Awaitable[int], None] = None
if setup_mode == SetupMode.ASYNC:
embedding_dimension = self._aget_embedding_dimension()
elif setup_mode == SetupMode.SYNC:
embedding_dimension = self._get_embedding_dimension()
self.table = MetadataVectorCassandraTable(
session=session,
keyspace=keyspace,
table=table_name,
vector_dimension=embedding_dimension,
metadata_indexing="all",
primary_key_type="TEXT",
skip_provisioning=setup_mode == SetupMode.OFF,
**kwargs,
)
@property
def embeddings(self) -> Embeddings:
return self.embedding
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""基础的VectorTable已经返回一个“适当的分数”,即在[0, 1]范围内,数值越高表示更*相似*,因此这里的最终分数转换不会颠倒区间:
"""
return lambda score: score
[docs] def delete_collection(self) -> None:
"""只是`clear`的别名
(为了更好地与其他VectorStore实现对齐)。
"""
self.clear()
[docs] async def adelete_collection(self) -> None:
"""只是对`aclear`的别名
(为了更好地与其他VectorStore实现对齐)。
"""
await self.aclear()
[docs] def clear(self) -> None:
"""清空表格。"""
self.table.clear()
[docs] async def aclear(self) -> None:
"""清空表格。"""
await self.table.aclear()
[docs] def delete_by_document_id(self, document_id: str) -> None:
"""根据文档ID删除。
参数:
document_id:要删除的文档ID。
"""
return self.table.delete(row_id=document_id)
[docs] async def adelete_by_document_id(self, document_id: str) -> None:
"""根据文档ID删除。
参数:
document_id:要删除的文档ID。
"""
return await self.table.adelete(row_id=document_id)
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""根据向量ID删除。
参数:
ids:要删除的ID列表。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
if ids is None:
raise ValueError("No ids provided to delete.")
for document_id in ids:
self.delete_by_document_id(document_id)
return True
[docs] async def adelete(
self, ids: Optional[List[str]] = None, **kwargs: Any
) -> Optional[bool]:
"""根据向量ID删除。
参数:
ids:要删除的ID列表。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
if ids is None:
raise ValueError("No ids provided to delete.")
for document_id in ids:
await self.adelete_by_document_id(document_id)
return True
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
batch_size: int = 16,
ttl_seconds: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多的文本通过嵌入并添加到向量存储。
参数:
texts: 要添加到向量存储的文本。
metadatas: 可选的元数据列表。
ids: 可选的ID列表。
batch_size: 发送到服务器的并发请求数量。
ttl_seconds: 添加文本的可选生存时间。
返回:
List[str]: 添加文本的ID列表。
"""
_texts = list(texts)
ids = ids or [uuid.uuid4().hex for _ in _texts]
metadatas = metadatas or [{}] * len(_texts)
ttl_seconds = ttl_seconds or self.ttl_seconds
embedding_vectors = self.embedding.embed_documents(_texts)
for i in range(0, len(_texts), batch_size):
batch_texts = _texts[i : i + batch_size]
batch_embedding_vectors = embedding_vectors[i : i + batch_size]
batch_ids = ids[i : i + batch_size]
batch_metadatas = metadatas[i : i + batch_size]
futures = [
self.table.put_async(
row_id=text_id,
body_blob=text,
vector=embedding_vector,
metadata=metadata or {},
ttl_seconds=ttl_seconds,
)
for text, embedding_vector, text_id, metadata in zip(
batch_texts, batch_embedding_vectors, batch_ids, batch_metadatas
)
]
for future in futures:
future.result()
return ids
[docs] async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
concurrency: int = 16,
ttl_seconds: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的文本。
metadatas:元数据的可选列表。
ids:ID的可选列表。
并发性:查询数据库的并发数。
默认为16。
ttl_seconds:添加的文本的可选生存时间。
返回:
List[str]:已添加文本的ID列表。
"""
_texts = list(texts)
ids = ids or [uuid.uuid4().hex for _ in _texts]
_metadatas: List[dict] = metadatas or [{}] * len(_texts)
ttl_seconds = ttl_seconds or self.ttl_seconds
embedding_vectors = await self.embedding.aembed_documents(_texts)
sem = asyncio.Semaphore(concurrency)
async def send_concurrently(
row_id: str, text: str, embedding_vector: List[float], metadata: dict
) -> None:
async with sem:
await self.table.aput(
row_id=row_id,
body_blob=text,
vector=embedding_vector,
metadata=metadata or {},
ttl_seconds=ttl_seconds,
)
for i in range(0, len(_texts)):
tasks = [
asyncio.create_task(
send_concurrently(
ids[i], _texts[i], embedding_vectors[i], _metadatas[i]
)
)
]
await asyncio.gather(*tasks)
return ids
@staticmethod
def _search_to_documents(
hits: Iterable[Dict[str, Any]],
) -> List[Tuple[Document, float, str]]:
# We stick to 'cos' distance as it can be normalized on a 0-1 axis
# (1=most relevant), as required by this class' contract.
return [
(
Document(
page_content=hit["body_blob"],
metadata=hit["metadata"],
),
0.5 + 0.5 * hit["distance"],
hit["row_id"],
)
for hit in hits
]
# id-returning search facilities
[docs] def similarity_search_with_score_id_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
) -> List[Tuple[Document, float, str]]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词。
目前仅由Astra DB支持。
返回:
与查询向量最相似的文档的列表(文档,分数,id)。
"""
kwargs: Dict[str, Any] = {}
if filter is not None:
kwargs["metadata"] = filter
if body_search is not None:
kwargs["body_search"] = body_search
hits = self.table.metric_ann_search(
vector=embedding,
n=k,
metric="cos",
**kwargs,
)
return self._search_to_documents(hits)
[docs] async def asimilarity_search_with_score_id_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
) -> List[Tuple[Document, float, str]]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词。
目前仅由Astra DB支持。
返回:
与查询向量最相似的文档的列表(文档,分数,id)。
"""
kwargs: Dict[str, Any] = {}
if filter is not None:
kwargs["metadata"] = filter
if body_search is not None:
kwargs["body_search"] = body_search
hits = await self.table.ametric_ann_search(
vector=embedding,
n=k,
metric="cos",
**kwargs,
)
return self._search_to_documents(hits)
[docs] def similarity_search_with_score_id(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
) -> List[Tuple[Document, float, str]]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要应用的元数据过滤器。
body_search:要应用的文档文本搜索词语。
目前仅由Astra DB支持。
返回:
与查询向量最相似的文档的列表(文档,分数,id)。
"""
embedding_vector = self.embedding.embed_query(query)
return self.similarity_search_with_score_id_by_vector(
embedding=embedding_vector,
k=k,
filter=filter,
body_search=body_search,
)
[docs] async def asimilarity_search_with_score_id(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
) -> List[Tuple[Document, float, str]]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要应用的元数据过滤器。
body_search:要应用的文档文本搜索词语。
目前仅由Astra DB支持。
返回:
与查询向量最相似的文档的列表(文档,分数,id)。
"""
embedding_vector = await self.embedding.aembed_query(query)
return await self.asimilarity_search_with_score_id_by_vector(
embedding=embedding_vector,
k=k,
filter=filter,
body_search=body_search,
)
# id-unaware search facilities
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
) -> List[Tuple[Document, float]]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词。
目前仅由Astra DB支持。
返回:
(文档,分数)列表,与查询向量最相似的文档。
"""
return [
(doc, score)
for (doc, score, docId) in self.similarity_search_with_score_id_by_vector(
embedding=embedding,
k=k,
filter=filter,
body_search=body_search,
)
]
[docs] async def asimilarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
) -> List[Tuple[Document, float]]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词。
目前仅由Astra DB支持。
返回:
(文档,分数)列表,与查询向量最相似的文档。
"""
return [
(doc, score)
for (
doc,
score,
_,
) in await self.asimilarity_search_with_score_id_by_vector(
embedding=embedding,
k=k,
filter=filter,
body_search=body_search,
)
]
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要应用的元数据过滤器。
body_search:要应用的文档文本搜索词语。
目前仅由Astra DB支持。
返回:
文档列表,与查询向量最相似。
"""
embedding_vector = self.embedding.embed_query(query)
return self.similarity_search_by_vector(
embedding_vector,
k,
filter=filter,
body_search=body_search,
)
[docs] async def asimilarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要应用的元数据过滤器。
body_search:要应用的文档文本搜索词语。
目前仅由Astra DB支持。
返回:
文档列表,与查询向量最相似。
"""
embedding_vector = await self.embedding.aembed_query(query)
return await self.asimilarity_search_by_vector(
embedding_vector,
k,
filter=filter,
body_search=body_search,
)
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词语。
目前仅由Astra DB支持。
返回:
文档列表,与查询向量最相似。
"""
return [
doc
for doc, _ in self.similarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
body_search=body_search,
)
]
[docs] async def asimilarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词语。
目前仅由Astra DB支持。
返回:
文档列表,与查询向量最相似。
"""
return [
doc
for doc, _ in await self.asimilarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
body_search=body_search,
)
]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要应用的元数据过滤器。
body_search:要应用的文档文本搜索词。
目前仅由Astra DB支持。
返回:
(文档,分数)的列表,最接近查询向量的文档。
"""
embedding_vector = self.embedding.embed_query(query)
return self.similarity_search_with_score_by_vector(
embedding_vector,
k,
filter=filter,
body_search=body_search,
)
[docs] async def asimilarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。默认为4。
filter:要应用的元数据过滤器。
body_search:要应用的文档文本搜索词。
目前仅由Astra DB支持。
返回:
(文档,分数)的列表,最接近查询向量的文档。
"""
embedding_vector = await self.embedding.aembed_query(query)
return await self.asimilarity_search_with_score_by_vector(
embedding_vector,
k,
filter=filter,
body_search=body_search,
)
@staticmethod
def _mmr_search_to_documents(
prefetch_hits: List[Dict[str, Any]],
embedding: List[float],
k: int,
lambda_mult: float,
) -> List[Document]:
# let the mmr utility pick the *indices* in the above array
mmr_chosen_indices = maximal_marginal_relevance(
np.array(embedding, dtype=np.float32),
[pf_hit["vector"] for pf_hit in prefetch_hits],
k=k,
lambda_mult=lambda_mult,
)
mmr_hits = [
pf_hit
for pf_index, pf_hit in enumerate(prefetch_hits)
if pf_index in mmr_chosen_indices
]
return [
Document(
page_content=hit["body_blob"],
metadata=hit["metadata"],
)
for hit in mmr_hits
]
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取并传递给MMR算法的文档数量。默认为20。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词语。目前仅由Astra DB支持。
返回:
通过最大边际相关性选择的文档列表。
"""
_kwargs: Dict[str, Any] = {}
if filter is not None:
_kwargs["metadata"] = filter
if body_search is not None:
_kwargs["body_search"] = body_search
prefetch_hits = list(
self.table.metric_ann_search(
vector=embedding,
n=fetch_k,
metric="cos",
**_kwargs,
)
)
return self._mmr_search_to_documents(prefetch_hits, embedding, k, lambda_mult)
[docs] async def amax_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取并传递给MMR算法的文档数量。默认为20。
lambda_mult: 介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词语。目前仅由Astra DB支持。
返回:
通过最大边际相关性选择的文档列表。
"""
_kwargs: Dict[str, Any] = {}
if filter is not None:
_kwargs["metadata"] = filter
if body_search is not None:
_kwargs["body_search"] = body_search
prefetch_hits = list(
await self.table.ametric_ann_search(
vector=embedding,
n=fetch_k,
metric="cos",
**_kwargs,
)
)
return self._mmr_search_to_documents(prefetch_hits, embedding, k, lambda_mult)
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 要查找相似文档的嵌入。
k: 要返回的文档数量。默认为4。
fetch_k: 要获取以传递给MMR算法的文档数量。
默认为20。
lambda_mult: 0到1之间的数字,确定结果之间多样性的程度
0对应最大多样性,1对应最小多样性。
默认为0.5。
filter: 要应用的元数据过滤器。
body_search: 要应用的文档文本搜索词。
目前仅由Astra DB支持。
返回:
通过最大边际相关性选择的文档列表。
"""
embedding_vector = self.embedding.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding_vector,
k,
fetch_k,
lambda_mult=lambda_mult,
filter=filter,
body_search=body_search,
)
[docs] async def amax_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
body_search: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query:要查找类似文档的文本。
k:要返回的文档数量。
fetch_k:要获取的文档数量,以传递给MMR算法。
lambda_mult:介于0和1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。
默认为0.5。
filter:要应用的元数据过滤器。
body_search:要应用的文档文本搜索词语。
目前仅由Astra DB支持。
返回:
通过最大边际相关性选择的文档列表。
"""
embedding_vector = await self.embedding.aembed_query(query)
return await self.amax_marginal_relevance_search_by_vector(
embedding_vector,
k,
fetch_k,
lambda_mult=lambda_mult,
filter=filter,
body_search=body_search,
)
[docs] @classmethod
def from_texts(
cls: Type[CVST],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
*,
session: Optional[Session] = None,
keyspace: Optional[str] = None,
table_name: str = "",
ids: Optional[List[str]] = None,
batch_size: int = 16,
ttl_seconds: Optional[int] = None,
body_index_options: Optional[List[Tuple[str, Any]]] = None,
**kwargs: Any,
) -> CVST:
"""从原始文本创建一个Cassandra向量存储。
参数:
texts: 要添加到向量存储中的文本。
embedding: 要使用的嵌入函数。
metadatas: 与文本相关的元数据的可选列表。
session: Cassandra驱动程序会话。
如果未提供,则会从cassio中解析。
keyspace: Cassandra键空间。
如果未提供,则会从cassio中解析。
table_name: Cassandra表(必填)。
ids: 与文本相关的ID的可选列表。
batch_size: 发送到服务器的并发请求数量。
默认为16。
ttl_seconds: 添加的文本的可选存活时间。
body_index_options: 用于创建正文索引的可选选项。
例如,body_index_options = [cassio.table.cql.STANDARD_ANALYZER]
返回:
一个Cassandra向量存储。
"""
store = cls(
embedding=embedding,
session=session,
keyspace=keyspace,
table_name=table_name,
ttl_seconds=ttl_seconds,
body_index_options=body_index_options,
)
store.add_texts(
texts=texts, metadatas=metadatas, ids=ids, batch_size=batch_size
)
return store
[docs] @classmethod
async def afrom_texts(
cls: Type[CVST],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
*,
session: Optional[Session] = None,
keyspace: Optional[str] = None,
table_name: str = "",
ids: Optional[List[str]] = None,
concurrency: int = 16,
ttl_seconds: Optional[int] = None,
body_index_options: Optional[List[Tuple[str, Any]]] = None,
**kwargs: Any,
) -> CVST:
"""从原始文本创建一个Cassandra向量存储。
参数:
texts: 要添加到向量存储中的文本。
embedding: 要使用的嵌入函数。
metadatas: 与文本相关的元数据的可选列表。
session: Cassandra驱动程序会话。
如果未提供,则会从cassio中解析。
keyspace: Cassandra键空间。
如果未提供,则会从cassio中解析。
table_name: Cassandra表(必填)。
ids: 与文本相关的ID的可选列表。
concurrency: 发送到数据库的并发查询数。
默认为16。
ttl_seconds: 添加的文本的可选存活时间。
body_index_options: 用于创建正文索引的可选选项。
例如:body_index_options = [cassio.table.cql.STANDARD_ANALYZER]
返回:
一个Cassandra向量存储。
"""
store = cls(
embedding=embedding,
session=session,
keyspace=keyspace,
table_name=table_name,
ttl_seconds=ttl_seconds,
setup_mode=SetupMode.ASYNC,
body_index_options=body_index_options,
)
await store.aadd_texts(
texts=texts, metadatas=metadatas, ids=ids, concurrency=concurrency
)
return store
[docs] @classmethod
def from_documents(
cls: Type[CVST],
documents: List[Document],
embedding: Embeddings,
*,
session: Optional[Session] = None,
keyspace: Optional[str] = None,
table_name: str = "",
ids: Optional[List[str]] = None,
batch_size: int = 16,
ttl_seconds: Optional[int] = None,
body_index_options: Optional[List[Tuple[str, Any]]] = None,
**kwargs: Any,
) -> CVST:
"""从文档列表创建一个Cassandra向量存储。
参数:
documents: 要添加到向量存储中的文档。
embedding: 要使用的嵌入函数。
session: Cassandra驱动程序会话。
如果未提供,则会从cassio中解析。
keyspace: Cassandra键空间。
如果未提供,则会从cassio中解析。
table_name: Cassandra表(必填)。
ids: 与文档关联的可选ID列表。
batch_size: 发送到服务器的并发请求数量。
默认为16。
ttl_seconds: 添加文档的可选存活时间。
body_index_options: 用于创建正文索引的可选选项。
例如,body_index_options = [cassio.table.cql.STANDARD_ANALYZER]
返回:
一个Cassandra向量存储。
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return cls.from_texts(
texts=texts,
embedding=embedding,
metadatas=metadatas,
session=session,
keyspace=keyspace,
table_name=table_name,
ids=ids,
batch_size=batch_size,
ttl_seconds=ttl_seconds,
body_index_options=body_index_options,
**kwargs,
)
[docs] @classmethod
async def afrom_documents(
cls: Type[CVST],
documents: List[Document],
embedding: Embeddings,
*,
session: Optional[Session] = None,
keyspace: Optional[str] = None,
table_name: str = "",
ids: Optional[List[str]] = None,
concurrency: int = 16,
ttl_seconds: Optional[int] = None,
body_index_options: Optional[List[Tuple[str, Any]]] = None,
**kwargs: Any,
) -> CVST:
"""从文档列表创建一个Cassandra向量存储。
参数:
documents: 要添加到向量存储的文档。
embedding: 要使用的嵌入函数。
session: Cassandra驱动程序会话。
如果未提供,则从cassio中解析。
keyspace: Cassandra键空间。
如果未提供,则从cassio中解析。
table_name: Cassandra表(必填)。
ids: 与文档关联的ID的可选列表。
concurrency: 发送到数据库的并发查询数。
默认为16。
ttl_seconds: 添加文档的可选存活时间。
body_index_options: 用于创建正文索引的可选选项。
例如 body_index_options = [cassio.table.cql.STANDARD_ANALYZER]
返回:
一个Cassandra向量存储。
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return await cls.afrom_texts(
texts=texts,
embedding=embedding,
metadatas=metadatas,
session=session,
keyspace=keyspace,
table_name=table_name,
ids=ids,
concurrency=concurrency,
ttl_seconds=ttl_seconds,
body_index_options=body_index_options,
**kwargs,
)