from __future__ import annotations
import uuid
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
from couchbase.cluster import Cluster
[docs]class CouchbaseVectorStore(VectorStore):
"""`Couchbase Vector Store` 矢量存储。
要使用它,您需要
- 最新版本的 `couchbase` 库的安装
- 具有预定义搜索索引且支持矢量字段的 Couchbase 数据库
示例:
.. code-block:: python
from langchain_community.vectorstores import CouchbaseVectorStore
from langchain_openai import OpenAIEmbeddings
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
from datetime import timedelta
auth = PasswordAuthenticator(username, password)
options = ClusterOptions(auth)
connect_string = "couchbases://localhost"
cluster = Cluster(connect_string, options)
# 等待集群准备就绪。
cluster.wait_until_ready(timedelta(seconds=5))
embeddings = OpenAIEmbeddings()
vectorstore = CouchbaseVectorStore(
cluster=cluster,
bucket_name="",
scope_name="",
collection_name="",
embedding=embeddings,
index_name="vector-index",
)
vectorstore.add_texts(["hello", "world"])
results = vectorstore.similarity_search("ola", k=1)"""
# Default batch size
DEFAULT_BATCH_SIZE = 100
_metadata_key = "metadata"
_default_text_key = "text"
_default_embedding_key = "embedding"
def _check_bucket_exists(self) -> bool:
"""检查链接的Couchbase集群中是否存在存储桶。"""
bucket_manager = self._cluster.buckets()
try:
bucket_manager.get_bucket(self._bucket_name)
return True
except Exception:
return False
def _check_scope_and_collection_exists(self) -> bool:
"""检查链接的Couchbase存储桶中是否存在作用域和集合
如果未找到任何一个,则引发ValueError
"""
scope_collection_map: Dict[str, Any] = {}
# Get a list of all scopes in the bucket
for scope in self._bucket.collections().get_all_scopes():
scope_collection_map[scope.name] = []
# Get a list of all the collections in the scope
for collection in scope.collections:
scope_collection_map[scope.name].append(collection.name)
# Check if the scope exists
if self._scope_name not in scope_collection_map.keys():
raise ValueError(
f"Scope {self._scope_name} not found in Couchbase "
f"bucket {self._bucket_name}"
)
# Check if the collection exists in the scope
if self._collection_name not in scope_collection_map[self._scope_name]:
raise ValueError(
f"Collection {self._collection_name} not found in scope "
f"{self._scope_name} in Couchbase bucket {self._bucket_name}"
)
return True
def _check_index_exists(self) -> bool:
"""检查链接的Couchbase集群中是否存在搜索索引
如果索引不存在,则引发ValueError异常
"""
if self._scoped_index:
all_indexes = [
index.name for index in self._scope.search_indexes().get_all_indexes()
]
if self._index_name not in all_indexes:
raise ValueError(
f"Index {self._index_name} does not exist. "
" Please create the index before searching."
)
else:
all_indexes = [
index.name for index in self._cluster.search_indexes().get_all_indexes()
]
if self._index_name not in all_indexes:
raise ValueError(
f"Index {self._index_name} does not exist. "
" Please create the index before searching."
)
return True
[docs] def __init__(
self,
cluster: Cluster,
bucket_name: str,
scope_name: str,
collection_name: str,
embedding: Embeddings,
index_name: str,
*,
text_key: Optional[str] = _default_text_key,
embedding_key: Optional[str] = _default_embedding_key,
scoped_index: bool = True,
) -> None:
"""初始化Couchbase向量存储。
参数:
cluster (Cluster): 具有活动连接的Couchbase集群对象。
bucket_name (str): 存储文档的桶名称。
scope_name (str): 存储文档的作用域中的作用域名称。
collection_name (str): 存储文档的作用域中的集合名称。
embedding (Embeddings): 要使用的嵌入函数。
index_name (str): 要使用的搜索索引的名称。
text_key (optional[str]): 用作文本的文档中的键。
默认设置为text。
embedding_key (optional[str]): 用于嵌入的文档中的键。
默认设置为embedding。
scoped_index (optional[bool]): 指定索引是否为作用域索引。
默认设置为True。
"""
try:
from couchbase.cluster import Cluster
except ImportError as e:
raise ImportError(
"Could not import couchbase python package. "
"Please install couchbase SDK with `pip install couchbase`."
) from e
if not isinstance(cluster, Cluster):
raise ValueError(
f"cluster should be an instance of couchbase.Cluster, "
f"got {type(cluster)}"
)
self._cluster = cluster
if not embedding:
raise ValueError("Embeddings instance must be provided.")
if not bucket_name:
raise ValueError("bucket_name must be provided.")
if not scope_name:
raise ValueError("scope_name must be provided.")
if not collection_name:
raise ValueError("collection_name must be provided.")
if not index_name:
raise ValueError("index_name must be provided.")
self._bucket_name = bucket_name
self._scope_name = scope_name
self._collection_name = collection_name
self._embedding_function = embedding
self._text_key = text_key
self._embedding_key = embedding_key
self._index_name = index_name
self._scoped_index = scoped_index
# Check if the bucket exists
if not self._check_bucket_exists():
raise ValueError(
f"Bucket {self._bucket_name} does not exist. "
" Please create the bucket before searching."
)
try:
self._bucket = self._cluster.bucket(self._bucket_name)
self._scope = self._bucket.scope(self._scope_name)
self._collection = self._scope.collection(self._collection_name)
except Exception as e:
raise ValueError(
"Error connecting to couchbase. "
"Please check the connection and credentials."
) from e
# Check if the scope and collection exists. Throws ValueError if they don't
try:
self._check_scope_and_collection_exists()
except Exception as e:
raise e
# Check if the index exists. Throws ValueError if it doesn't
try:
self._check_index_exists()
except Exception as e:
raise e
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[str, Any]]] = None,
ids: Optional[List[str]] = None,
batch_size: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""通过嵌入将文本传递并持久化在向量存储中。
如果传递了文档ID,则现有文档(如果有)将被新文档覆盖。
参数:
texts(Iterable[str]):要添加到向量存储中的字符串的可迭代对象。
metadatas(Optional[List[Dict]]):与文本相关的元数据的可选列表。
ids(Optional[List[str]):与文本相关的ID的可选列表。
ID必须是整个集合中的唯一字符串。
如果未指定,将生成并使用uuid作为ID。
batch_size(Optional[int]):用于批量插入的可选批量大小。
默认值为100。
返回:
List[str]:将文本添加到向量存储中的ID列表。
"""
from couchbase.exceptions import DocumentExistsException
if not batch_size:
batch_size = self.DEFAULT_BATCH_SIZE
doc_ids: List[str] = []
if ids is None:
ids = [uuid.uuid4().hex for _ in texts]
if metadatas is None:
metadatas = [{} for _ in texts]
embedded_texts = self._embedding_function.embed_documents(list(texts))
documents_to_insert = [
{
id: {
self._text_key: text,
self._embedding_key: vector,
self._metadata_key: metadata,
}
for id, text, vector, metadata in zip(
ids, texts, embedded_texts, metadatas
)
}
]
# Insert in batches
for i in range(0, len(documents_to_insert), batch_size):
batch = documents_to_insert[i : i + batch_size]
try:
result = self._collection.upsert_multi(batch[0])
if result.all_ok:
doc_ids.extend(batch[0].keys())
except DocumentExistsException as e:
raise ValueError(f"Document already exists: {e}")
return doc_ids
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""通过ids从向量存储中删除文档。
参数:
ids (List[str]): 要删除的文档的ID列表。
batch_size (Optional[int]): 批量删除的可选批量大小。
返回:
bool: 如果所有文档都成功删除,则为True,否则为False。
"""
from couchbase.exceptions import DocumentNotFoundException
if ids is None:
raise ValueError("No document ids provided to delete.")
batch_size = kwargs.get("batch_size", self.DEFAULT_BATCH_SIZE)
deletion_status = True
# Delete in batches
for i in range(0, len(ids), batch_size):
batch = ids[i : i + batch_size]
try:
result = self._collection.remove_multi(batch)
except DocumentNotFoundException as e:
deletion_status = False
raise ValueError(f"Document not found: {e}")
deletion_status &= result.all_ok
return deletion_status
@property
def embeddings(self) -> Embeddings:
"""返回查询嵌入对象。"""
return self._embedding_function
def _format_metadata(self, row_fields: Dict[str, Any]) -> Dict[str, Any]:
"""辅助方法,用于格式化来自Couchbase Search API的元数据。
参数:
row_fields(Dict[str, Any]):需要格式化的字段。
返回:
Dict[str, Any]:格式化后的元数据。
"""
metadata = {}
for key, value in row_fields.items():
# Couchbase Search returns the metadata key with a prefix
# `metadata.` We remove it to get the original metadata key
if key.startswith(self._metadata_key):
new_key = key.split(self._metadata_key + ".")[-1]
metadata[new_key] = value
else:
metadata[key] = value
return metadata
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
search_options: Optional[Dict[str, Any]] = {},
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与嵌入向量最相似的文档及其分数。
参数:
embedding (List[float]): 要查找与之相似文档的嵌入向量。
k (int): 要返回的文档数量。
默认为4。
search_options (Optional[Dict[str, Any]]): 可选的搜索选项,将其传递给Couchbase搜索。
默认为空字典。
fields (Optional[List[str]]): 要包含在结果元数据中的字段的可选列表。请注意,这些字段需要存储在索引中。
如果未指定任何内容,则默认为索引中存储的所有字段。
返回:
与查询向量最相似的文档及其分数的列表。
"""
import couchbase.search as search
from couchbase.options import SearchOptions
from couchbase.vector_search import VectorQuery, VectorSearch
fields = kwargs.get("fields", ["*"])
# Document text field needs to be returned from the search
if fields != ["*"] and self._text_key not in fields:
fields.append(self._text_key)
search_req = search.SearchRequest.create(
VectorSearch.from_vector_query(
VectorQuery(
self._embedding_key,
embedding,
k,
)
)
)
try:
if self._scoped_index:
search_iter = self._scope.search(
self._index_name,
search_req,
SearchOptions(
limit=k,
fields=fields,
raw=search_options,
),
)
else:
search_iter = self._cluster.search(
index=self._index_name,
request=search_req,
options=SearchOptions(limit=k, fields=fields, raw=search_options),
)
docs_with_score = []
# Parse the results
for row in search_iter.rows():
text = row.fields.pop(self._text_key, "")
# Format the metadata from Couchbase
metadata = self._format_metadata(row.fields)
score = row.score
doc = Document(page_content=text, metadata=metadata)
docs_with_score.append((doc, score))
except Exception as e:
raise ValueError(f"Search failed with error: {e}")
return docs_with_score
[docs] def similarity_search(
self,
query: str,
k: int = 4,
search_options: Optional[Dict[str, Any]] = {},
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档及其分数。
参数:
query(str):要查找相似文档的查询
k(int):要返回的文档数量。
默认为4。
search_options(Optional[Dict[str, Any]]):可选的搜索选项,将其传递给Couchbase搜索。
默认为空字典
fields(Optional[List[str]]):要包含在结果元数据中的可选字段列表。请注意,这些字段需要存储在索引中。
如果未指定任何内容,则默认为索引中存储的所有字段。
返回:
与查询最相似的文档列表。
"""
query_embedding = self.embeddings.embed_query(query)
docs_with_scores = self.similarity_search_with_score_by_vector(
query_embedding, k, search_options, **kwargs
)
return [doc for doc, _ in docs_with_scores]
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
search_options: Optional[Dict[str, Any]] = {},
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档及其分数。
参数:
query(str):要查找相似文档的查询
k(int):要返回的文档数量。
默认为4。
search_options(Optional[Dict[str, Any]]):传递给Couchbase搜索的可选搜索选项。
默认为空字典。
fields(Optional[List[str]]):要包含在结果元数据中的可选字段列表。请注意,这些字段需要存储在索引中。
如果未指定任何内容,则默认为文本和元数据字段。
返回:
与查询最相似的文档及其分数的列表。
"""
query_embedding = self.embeddings.embed_query(query)
docs_with_score = self.similarity_search_with_score_by_vector(
query_embedding, k, search_options, **kwargs
)
return docs_with_score
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
search_options: Optional[Dict[str, Any]] = {},
**kwargs: Any,
) -> List[Document]:
"""返回与向量嵌入最相似的文档。
参数:
embedding (List[float]): 要查找相似文档的嵌入。
k (int): 要返回的文档数量。
默认为4。
search_options (Optional[Dict[str, Any]]): 可选的搜索选项,会传递给Couchbase搜索。
默认为空字典。
fields (Optional[List[str]]): 要包含在结果元数据中的可选字段列表。请注意,这些字段需要在索引中存储。
如果未指定任何内容,则默认为文档文本和元数据字段。
返回:
与查询最相似的文档列表。
"""
docs_with_score = self.similarity_search_with_score_by_vector(
embedding, k, search_options, **kwargs
)
return [doc for doc, _ in docs_with_score]
@classmethod
def _from_kwargs(
cls: Type[CouchbaseVectorStore],
embedding: Embeddings,
**kwargs: Any,
) -> CouchbaseVectorStore:
"""从关键字参数初始化Couchbase向量存储以用于向量存储。
参数:
embedding: 用于嵌入文本的嵌入对象。
**kwargs: 用于初始化向量存储的关键字参数。
接受的参数包括:
- cluster
- bucket_name
- scope_name
- collection_name
- index_name
- text_key
- embedding_key
- scoped_index
"""
cluster = kwargs.get("cluster", None)
bucket_name = kwargs.get("bucket_name", None)
scope_name = kwargs.get("scope_name", None)
collection_name = kwargs.get("collection_name", None)
index_name = kwargs.get("index_name", None)
text_key = kwargs.get("text_key", cls._default_text_key)
embedding_key = kwargs.get("embedding_key", cls._default_embedding_key)
scoped_index = kwargs.get("scoped_index", True)
return cls(
embedding=embedding,
cluster=cluster,
bucket_name=bucket_name,
scope_name=scope_name,
collection_name=collection_name,
index_name=index_name,
text_key=text_key,
embedding_key=embedding_key,
scoped_index=scoped_index,
)
[docs] @classmethod
def from_texts(
cls: Type[CouchbaseVectorStore],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[Dict[Any, Any]]] = None,
**kwargs: Any,
) -> CouchbaseVectorStore:
"""从文本列表构建一个Couchbase向量存储。
示例:
.. code-block:: python
from langchain_community.vectorstores import CouchbaseVectorStore
from langchain_openai import OpenAIEmbeddings
from couchbase.cluster import Cluster
from couchbase.auth import PasswordAuthenticator
from couchbase.options import ClusterOptions
from datetime import timedelta
auth = PasswordAuthenticator(username, password)
options = ClusterOptions(auth)
connect_string = "couchbases://localhost"
cluster = Cluster(connect_string, options)
# 等待集群准备就绪。
cluster.wait_until_ready(timedelta(seconds=5))
embeddings = OpenAIEmbeddings()
texts = ["hello", "world"]
vectorstore = CouchbaseVectorStore.from_texts(
texts,
embedding=embeddings,
cluster=cluster,
bucket_name="",
scope_name="",
collection_name="",
index_name="vector-index",
)
参数:
texts (List[str]): 要添加到向量存储的文本列表。
embedding (Embeddings): 要使用的嵌入函数。
metadatas (optional[List[Dict]): 要添加到文档的元数据列表。
**kwargs: 用于初始化向量存储和/或传递给`add_texts`方法的关键字参数。检查构造函数和/或`add_texts`以获取接受的参数列表。
返回:
一个Couchbase向量存储。
"""
vector_store = cls._from_kwargs(embedding, **kwargs)
batch_size = kwargs.get("batch_size", vector_store.DEFAULT_BATCH_SIZE)
ids = kwargs.get("ids", None)
vector_store.add_texts(
texts, metadatas=metadatas, ids=ids, batch_size=batch_size
)
return vector_store