Source code for langchain_community.vectorstores.couchbase

from __future__ import annotations

import uuid
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

if TYPE_CHECKING:
    from couchbase.cluster import Cluster


[docs]class CouchbaseVectorStore(VectorStore): """`Couchbase Vector Store` 矢量存储。 要使用它,您需要 - 最新版本的 `couchbase` 库的安装 - 具有预定义搜索索引且支持矢量字段的 Couchbase 数据库 示例: .. code-block:: python from langchain_community.vectorstores import CouchbaseVectorStore from langchain_openai import OpenAIEmbeddings from couchbase.cluster import Cluster from couchbase.auth import PasswordAuthenticator from couchbase.options import ClusterOptions from datetime import timedelta auth = PasswordAuthenticator(username, password) options = ClusterOptions(auth) connect_string = "couchbases://localhost" cluster = Cluster(connect_string, options) # 等待集群准备就绪。 cluster.wait_until_ready(timedelta(seconds=5)) embeddings = OpenAIEmbeddings() vectorstore = CouchbaseVectorStore( cluster=cluster, bucket_name="", scope_name="", collection_name="", embedding=embeddings, index_name="vector-index", ) vectorstore.add_texts(["hello", "world"]) results = vectorstore.similarity_search("ola", k=1)""" # Default batch size DEFAULT_BATCH_SIZE = 100 _metadata_key = "metadata" _default_text_key = "text" _default_embedding_key = "embedding" def _check_bucket_exists(self) -> bool: """检查链接的Couchbase集群中是否存在存储桶。""" bucket_manager = self._cluster.buckets() try: bucket_manager.get_bucket(self._bucket_name) return True except Exception: return False def _check_scope_and_collection_exists(self) -> bool: """检查链接的Couchbase存储桶中是否存在作用域和集合 如果未找到任何一个,则引发ValueError """ scope_collection_map: Dict[str, Any] = {} # Get a list of all scopes in the bucket for scope in self._bucket.collections().get_all_scopes(): scope_collection_map[scope.name] = [] # Get a list of all the collections in the scope for collection in scope.collections: scope_collection_map[scope.name].append(collection.name) # Check if the scope exists if self._scope_name not in scope_collection_map.keys(): raise ValueError( f"Scope {self._scope_name} not found in Couchbase " f"bucket {self._bucket_name}" ) # Check if the collection exists in the scope if self._collection_name not in scope_collection_map[self._scope_name]: raise ValueError( f"Collection {self._collection_name} not found in scope " f"{self._scope_name} in Couchbase bucket {self._bucket_name}" ) return True def _check_index_exists(self) -> bool: """检查链接的Couchbase集群中是否存在搜索索引 如果索引不存在,则引发ValueError异常 """ if self._scoped_index: all_indexes = [ index.name for index in self._scope.search_indexes().get_all_indexes() ] if self._index_name not in all_indexes: raise ValueError( f"Index {self._index_name} does not exist. " " Please create the index before searching." ) else: all_indexes = [ index.name for index in self._cluster.search_indexes().get_all_indexes() ] if self._index_name not in all_indexes: raise ValueError( f"Index {self._index_name} does not exist. " " Please create the index before searching." ) return True
[docs] def __init__( self, cluster: Cluster, bucket_name: str, scope_name: str, collection_name: str, embedding: Embeddings, index_name: str, *, text_key: Optional[str] = _default_text_key, embedding_key: Optional[str] = _default_embedding_key, scoped_index: bool = True, ) -> None: """初始化Couchbase向量存储。 参数: cluster (Cluster): 具有活动连接的Couchbase集群对象。 bucket_name (str): 存储文档的桶名称。 scope_name (str): 存储文档的作用域中的作用域名称。 collection_name (str): 存储文档的作用域中的集合名称。 embedding (Embeddings): 要使用的嵌入函数。 index_name (str): 要使用的搜索索引的名称。 text_key (optional[str]): 用作文本的文档中的键。 默认设置为text。 embedding_key (optional[str]): 用于嵌入的文档中的键。 默认设置为embedding。 scoped_index (optional[bool]): 指定索引是否为作用域索引。 默认设置为True。 """ try: from couchbase.cluster import Cluster except ImportError as e: raise ImportError( "Could not import couchbase python package. " "Please install couchbase SDK with `pip install couchbase`." ) from e if not isinstance(cluster, Cluster): raise ValueError( f"cluster should be an instance of couchbase.Cluster, " f"got {type(cluster)}" ) self._cluster = cluster if not embedding: raise ValueError("Embeddings instance must be provided.") if not bucket_name: raise ValueError("bucket_name must be provided.") if not scope_name: raise ValueError("scope_name must be provided.") if not collection_name: raise ValueError("collection_name must be provided.") if not index_name: raise ValueError("index_name must be provided.") self._bucket_name = bucket_name self._scope_name = scope_name self._collection_name = collection_name self._embedding_function = embedding self._text_key = text_key self._embedding_key = embedding_key self._index_name = index_name self._scoped_index = scoped_index # Check if the bucket exists if not self._check_bucket_exists(): raise ValueError( f"Bucket {self._bucket_name} does not exist. " " Please create the bucket before searching." ) try: self._bucket = self._cluster.bucket(self._bucket_name) self._scope = self._bucket.scope(self._scope_name) self._collection = self._scope.collection(self._collection_name) except Exception as e: raise ValueError( "Error connecting to couchbase. " "Please check the connection and credentials." ) from e # Check if the scope and collection exists. Throws ValueError if they don't try: self._check_scope_and_collection_exists() except Exception as e: raise e # Check if the index exists. Throws ValueError if it doesn't try: self._check_index_exists() except Exception as e: raise e
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[Dict[str, Any]]] = None, ids: Optional[List[str]] = None, batch_size: Optional[int] = None, **kwargs: Any, ) -> List[str]: """通过嵌入将文本传递并持久化在向量存储中。 如果传递了文档ID,则现有文档(如果有)将被新文档覆盖。 参数: texts(Iterable[str]):要添加到向量存储中的字符串的可迭代对象。 metadatas(Optional[List[Dict]]):与文本相关的元数据的可选列表。 ids(Optional[List[str]):与文本相关的ID的可选列表。 ID必须是整个集合中的唯一字符串。 如果未指定,将生成并使用uuid作为ID。 batch_size(Optional[int]):用于批量插入的可选批量大小。 默认值为100。 返回: List[str]:将文本添加到向量存储中的ID列表。 """ from couchbase.exceptions import DocumentExistsException if not batch_size: batch_size = self.DEFAULT_BATCH_SIZE doc_ids: List[str] = [] if ids is None: ids = [uuid.uuid4().hex for _ in texts] if metadatas is None: metadatas = [{} for _ in texts] embedded_texts = self._embedding_function.embed_documents(list(texts)) documents_to_insert = [ { id: { self._text_key: text, self._embedding_key: vector, self._metadata_key: metadata, } for id, text, vector, metadata in zip( ids, texts, embedded_texts, metadatas ) } ] # Insert in batches for i in range(0, len(documents_to_insert), batch_size): batch = documents_to_insert[i : i + batch_size] try: result = self._collection.upsert_multi(batch[0]) if result.all_ok: doc_ids.extend(batch[0].keys()) except DocumentExistsException as e: raise ValueError(f"Document already exists: {e}") return doc_ids
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """通过ids从向量存储中删除文档。 参数: ids (List[str]): 要删除的文档的ID列表。 batch_size (Optional[int]): 批量删除的可选批量大小。 返回: bool: 如果所有文档都成功删除,则为True,否则为False。 """ from couchbase.exceptions import DocumentNotFoundException if ids is None: raise ValueError("No document ids provided to delete.") batch_size = kwargs.get("batch_size", self.DEFAULT_BATCH_SIZE) deletion_status = True # Delete in batches for i in range(0, len(ids), batch_size): batch = ids[i : i + batch_size] try: result = self._collection.remove_multi(batch) except DocumentNotFoundException as e: deletion_status = False raise ValueError(f"Document not found: {e}") deletion_status &= result.all_ok return deletion_status
@property def embeddings(self) -> Embeddings: """返回查询嵌入对象。""" return self._embedding_function def _format_metadata(self, row_fields: Dict[str, Any]) -> Dict[str, Any]: """辅助方法,用于格式化来自Couchbase Search API的元数据。 参数: row_fields(Dict[str, Any]):需要格式化的字段。 返回: Dict[str, Any]:格式化后的元数据。 """ metadata = {} for key, value in row_fields.items(): # Couchbase Search returns the metadata key with a prefix # `metadata.` We remove it to get the original metadata key if key.startswith(self._metadata_key): new_key = key.split(self._metadata_key + ".")[-1] metadata[new_key] = value else: metadata[key] = value return metadata
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, search_options: Optional[Dict[str, Any]] = {}, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与嵌入向量最相似的文档及其分数。 参数: embedding (List[float]): 要查找与之相似文档的嵌入向量。 k (int): 要返回的文档数量。 默认为4。 search_options (Optional[Dict[str, Any]]): 可选的搜索选项,将其传递给Couchbase搜索。 默认为空字典。 fields (Optional[List[str]]): 要包含在结果元数据中的字段的可选列表。请注意,这些字段需要存储在索引中。 如果未指定任何内容,则默认为索引中存储的所有字段。 返回: 与查询向量最相似的文档及其分数的列表。 """ import couchbase.search as search from couchbase.options import SearchOptions from couchbase.vector_search import VectorQuery, VectorSearch fields = kwargs.get("fields", ["*"]) # Document text field needs to be returned from the search if fields != ["*"] and self._text_key not in fields: fields.append(self._text_key) search_req = search.SearchRequest.create( VectorSearch.from_vector_query( VectorQuery( self._embedding_key, embedding, k, ) ) ) try: if self._scoped_index: search_iter = self._scope.search( self._index_name, search_req, SearchOptions( limit=k, fields=fields, raw=search_options, ), ) else: search_iter = self._cluster.search( index=self._index_name, request=search_req, options=SearchOptions(limit=k, fields=fields, raw=search_options), ) docs_with_score = [] # Parse the results for row in search_iter.rows(): text = row.fields.pop(self._text_key, "") # Format the metadata from Couchbase metadata = self._format_metadata(row.fields) score = row.score doc = Document(page_content=text, metadata=metadata) docs_with_score.append((doc, score)) except Exception as e: raise ValueError(f"Search failed with error: {e}") return docs_with_score
[docs] def similarity_search_with_score( self, query: str, k: int = 4, search_options: Optional[Dict[str, Any]] = {}, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档及其分数。 参数: query(str):要查找相似文档的查询 k(int):要返回的文档数量。 默认为4。 search_options(Optional[Dict[str, Any]]):传递给Couchbase搜索的可选搜索选项。 默认为空字典。 fields(Optional[List[str]]):要包含在结果元数据中的可选字段列表。请注意,这些字段需要存储在索引中。 如果未指定任何内容,则默认为文本和元数据字段。 返回: 与查询最相似的文档及其分数的列表。 """ query_embedding = self.embeddings.embed_query(query) docs_with_score = self.similarity_search_with_score_by_vector( query_embedding, k, search_options, **kwargs ) return docs_with_score
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, search_options: Optional[Dict[str, Any]] = {}, **kwargs: Any, ) -> List[Document]: """返回与向量嵌入最相似的文档。 参数: embedding (List[float]): 要查找相似文档的嵌入。 k (int): 要返回的文档数量。 默认为4。 search_options (Optional[Dict[str, Any]]): 可选的搜索选项,会传递给Couchbase搜索。 默认为空字典。 fields (Optional[List[str]]): 要包含在结果元数据中的可选字段列表。请注意,这些字段需要在索引中存储。 如果未指定任何内容,则默认为文档文本和元数据字段。 返回: 与查询最相似的文档列表。 """ docs_with_score = self.similarity_search_with_score_by_vector( embedding, k, search_options, **kwargs ) return [doc for doc, _ in docs_with_score]
@classmethod def _from_kwargs( cls: Type[CouchbaseVectorStore], embedding: Embeddings, **kwargs: Any, ) -> CouchbaseVectorStore: """从关键字参数初始化Couchbase向量存储以用于向量存储。 参数: embedding: 用于嵌入文本的嵌入对象。 **kwargs: 用于初始化向量存储的关键字参数。 接受的参数包括: - cluster - bucket_name - scope_name - collection_name - index_name - text_key - embedding_key - scoped_index """ cluster = kwargs.get("cluster", None) bucket_name = kwargs.get("bucket_name", None) scope_name = kwargs.get("scope_name", None) collection_name = kwargs.get("collection_name", None) index_name = kwargs.get("index_name", None) text_key = kwargs.get("text_key", cls._default_text_key) embedding_key = kwargs.get("embedding_key", cls._default_embedding_key) scoped_index = kwargs.get("scoped_index", True) return cls( embedding=embedding, cluster=cluster, bucket_name=bucket_name, scope_name=scope_name, collection_name=collection_name, index_name=index_name, text_key=text_key, embedding_key=embedding_key, scoped_index=scoped_index, )
[docs] @classmethod def from_texts( cls: Type[CouchbaseVectorStore], texts: List[str], embedding: Embeddings, metadatas: Optional[List[Dict[Any, Any]]] = None, **kwargs: Any, ) -> CouchbaseVectorStore: """从文本列表构建一个Couchbase向量存储。 示例: .. code-block:: python from langchain_community.vectorstores import CouchbaseVectorStore from langchain_openai import OpenAIEmbeddings from couchbase.cluster import Cluster from couchbase.auth import PasswordAuthenticator from couchbase.options import ClusterOptions from datetime import timedelta auth = PasswordAuthenticator(username, password) options = ClusterOptions(auth) connect_string = "couchbases://localhost" cluster = Cluster(connect_string, options) # 等待集群准备就绪。 cluster.wait_until_ready(timedelta(seconds=5)) embeddings = OpenAIEmbeddings() texts = ["hello", "world"] vectorstore = CouchbaseVectorStore.from_texts( texts, embedding=embeddings, cluster=cluster, bucket_name="", scope_name="", collection_name="", index_name="vector-index", ) 参数: texts (List[str]): 要添加到向量存储的文本列表。 embedding (Embeddings): 要使用的嵌入函数。 metadatas (optional[List[Dict]): 要添加到文档的元数据列表。 **kwargs: 用于初始化向量存储和/或传递给`add_texts`方法的关键字参数。检查构造函数和/或`add_texts`以获取接受的参数列表。 返回: 一个Couchbase向量存储。 """ vector_store = cls._from_kwargs(embedding, **kwargs) batch_size = kwargs.get("batch_size", vector_store.DEFAULT_BATCH_SIZE) ids = kwargs.get("ids", None) vector_store.add_texts( texts, metadatas=metadatas, ids=ids, batch_size=batch_size ) return vector_store