classCouchbaseVectorStore(CouchbaseSearchVectorStore):""" Couchbase Vector Store (deprecated). This class is deprecated, please use CouchbaseSearchVectorStore instead. """def__init__(self,cluster:Any,bucket_name:str,scope_name:str,collection_name:str,index_name:str,text_key:Optional[str]="text",embedding_key:Optional[str]="embedding",metadata_key:Optional[str]="metadata",scoped_index:bool=True,)->None:""" Initializes a connection to a Couchbase Vector Store. This class is deprecated, please use CouchbaseSearchVectorStore instead. """warnings.warn("CouchbaseVectorStore is deprecated, please use CouchbaseSearchVectorStore instead.",DeprecationWarning,stacklevel=2,)super().__init__(cluster,bucket_name,scope_name,collection_name,index_name,text_key,embedding_key,metadata_key,scoped_index,)
classCouchbaseSearchVectorStore(BasePydanticVectorStore):""" Couchbase Vector Store. To use, you should have the ``couchbase`` python package installed. """stores_text:bool=Trueflat_metadata:bool=True# Default batch sizeDEFAULT_BATCH_SIZE:int=100_cluster:Any=PrivateAttr()_bucket:Any=PrivateAttr()_scope:Any=PrivateAttr()_collection:Any=PrivateAttr()_bucket_name:str=PrivateAttr()_scope_name:str=PrivateAttr()_collection_name:str=PrivateAttr()_index_name:str=PrivateAttr()_id_key:str=PrivateAttr()_text_key:str=PrivateAttr()_embedding_key:str=PrivateAttr()_metadata_key:str=PrivateAttr()_scoped_index:bool=PrivateAttr()def__init__(self,cluster:Any,bucket_name:str,scope_name:str,collection_name:str,index_name:str,text_key:Optional[str]="text",embedding_key:Optional[str]="embedding",metadata_key:Optional[str]="metadata",scoped_index:bool=True,)->None:""" Initializes a connection to a Couchbase Vector Store. Args: cluster (Cluster): Couchbase cluster object with active connection. bucket_name (str): Name of bucket to store documents in. scope_name (str): Name of scope in the bucket to store documents in. collection_name (str): Name of collection in the scope to store documents in. index_name (str): Name of the Search index. text_key (Optional[str], optional): The field for the document text. Defaults to "text". embedding_key (Optional[str], optional): The field for the document embedding. Defaults to "embedding". metadata_key (Optional[str], optional): The field for the document metadata. Defaults to "metadata". scoped_index (Optional[bool]): specify whether the index is a scoped index. Set to True by default. Returns: None """try:fromcouchbase.clusterimportClusterexceptImportErrorase:raiseImportError("Could not import couchbase python package. ""Please install couchbase SDK with `pip install couchbase`.")ifnotisinstance(cluster,Cluster):raiseValueError(f"cluster should be an instance of couchbase.Cluster, "f"got {type(cluster)}")super().__init__()self._cluster=clusterifnotbucket_name:raiseValueError("bucket_name must be provided.")ifnotscope_name:raiseValueError("scope_name must be provided.")ifnotcollection_name:raiseValueError("collection_name must be provided.")ifnotindex_name:raiseValueError("index_name must be provided.")self._bucket_name=bucket_nameself._scope_name=scope_nameself._collection_name=collection_nameself._text_key=text_keyself._embedding_key=embedding_keyself._index_name=index_nameself._metadata_key=metadata_keyself._scoped_index=scoped_index# Check if the bucket existsifnotself._check_bucket_exists():raiseValueError(f"Bucket {self._bucket_name} does not exist. "" Please create the bucket before searching.")try:self._bucket=self._cluster.bucket(self._bucket_name)self._scope=self._bucket.scope(self._scope_name)self._collection=self._scope.collection(self._collection_name)exceptExceptionase:raiseValueError("Error connecting to couchbase. ""Please check the connection and credentials.")frome# Check if the scope and collection exists. Throws ValueError if they don'ttry:self._check_scope_and_collection_exists()exceptExceptionase:raise# Check if the index exists. Throws ValueError if it doesn'ttry:self._check_index_exists()exceptExceptionase:raiseself._bucket=self._cluster.bucket(self._bucket_name)self._scope=self._bucket.scope(self._scope_name)self._collection=self._scope.collection(self._collection_name)defadd(self,nodes:List[BaseNode],**kwargs:Any)->List[str]:""" Add nodes to the collection and return their document IDs. Args: nodes (List[BaseNode]): List of nodes to add. **kwargs (Any): Additional keyword arguments. batch_size (int): Size of the batch for batch insert. Returns: List[str]: List of document IDs for the added nodes. """fromcouchbase.exceptionsimportDocumentExistsExceptionbatch_size=kwargs.get("batch_size",self.DEFAULT_BATCH_SIZE)documents_to_insert=[]doc_ids=[]fornodeinnodes:metadata=node_to_metadata_dict(node,remove_text=True,text_field=self._text_key,flat_metadata=self.flat_metadata,)doc_id:str=node.node_iddoc={self._text_key:node.get_content(metadata_mode=MetadataMode.NONE),self._embedding_key:node.embedding,self._metadata_key:metadata,}documents_to_insert.append({doc_id:doc})foriinrange(0,len(documents_to_insert),batch_size):batch=documents_to_insert[i:i+batch_size]try:# convert the list of dicts to a single dict for batch insertinsert_batch={}fordocinbatch:insert_batch.update(doc)logger.debug("Inserting batch of documents to Couchbase",insert_batch)# upsert the batch of documents into the collectionresult=self._collection.upsert_multi(insert_batch)logger.debug(f"Insert result: {result.all_ok}")ifresult.all_ok:doc_ids.extend(insert_batch.keys())exceptDocumentExistsExceptionase:logger.debug(f"Document already exists: {e}")logger.debug("Inserted batch of documents to Couchbase")returndoc_idsdefdelete(self,ref_doc_id:str,**kwargs:Any)->None:""" Delete a document by its reference document ID. Args: ref_doc_id: The reference document ID to be deleted. Returns: None """try:document_field=self._metadata_key+".ref_doc_id"query=f"DELETE FROM `{self._collection_name}` WHERE {document_field} = $ref_doc_id"self._scope.query(query,ref_doc_id=ref_doc_id).execute()logger.debug(f"Deleted document {ref_doc_id}")exceptException:logger.error(f"Error deleting document {ref_doc_id}")raisedefquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:""" Executes a query in the vector store and returns the result. Args: query (VectorStoreQuery): The query object containing the search parameters. **kwargs (Any): Additional keyword arguments. cb_search_options (Dict): Search options to pass to Couchbase Search Returns: VectorStoreQueryResult: The result of the query containing the top-k nodes, similarities, and ids. """importcouchbase.searchassearchfromcouchbase.optionsimportSearchOptionsfromcouchbase.vector_searchimportVectorQuery,VectorSearchfields=query.output_fieldsifnotfields:fields=["*"]# Document text field needs to be returned from the searchifself._text_keynotinfieldsandfields!=["*"]:fields.append(self._text_key)logger.debug("Output Fields: ",fields)k=query.similarity_top_k# Get the search optionssearch_options=kwargs.get("cb_search_options",{})ifsearch_optionsandquery.filters:raiseValueError("Cannot use both filters and cb_search_options")elifquery.filters:couchbase_options=_to_couchbase_filter(query.filters)logger.debug(f"Filters transformed to Couchbase: {couchbase_options}")search_options=couchbase_optionslogger.debug(f"Filters: {search_options}")# Create Search Requestsearch_req=search.SearchRequest.create(VectorSearch.from_vector_query(VectorQuery(self._embedding_key,query.query_embedding,k,)))try:logger.debug("Querying Couchbase")ifself._scoped_index:search_iter=self._scope.search(self._index_name,search_req,SearchOptions(limit=k,fields=fields,raw=search_options),)else:search_iter=self._cluster.search(self._index_name,search_req,SearchOptions(limit=k,fields=fields,raw=search_options),)exceptExceptionase:logger.debug(f"Search failed with error {e}")raiseValueError(f"Search failed with error: {e}")top_k_nodes=[]top_k_scores=[]top_k_ids=[]# Parse the resultsforresultinsearch_iter.rows():text=result.fields.pop(self._text_key,"")score=result.score# Format the metadata into a dictionarymetadata_dict=self._format_metadata(result.fields)id=result.idtry:node=metadata_dict_to_node(metadata_dict,text)exceptException:# Deprecated legacy logic for backwards compatibilitynode=TextNode(text=text,id_=id,score=score,metadata=metadata_dict,)top_k_nodes.append(node)top_k_scores.append(score)top_k_ids.append(id)returnVectorStoreQueryResult(nodes=top_k_nodes,similarities=top_k_scores,ids=top_k_ids)@propertydefclient(self)->Any:""" Property function to access the client attribute. """returnself._clusterdef_check_bucket_exists(self)->bool:""" Check if the bucket exists in the linked Couchbase cluster. Returns: True if the bucket exists """bucket_manager=self._cluster.buckets()try:bucket_manager.get_bucket(self._bucket_name)returnTrueexceptExceptionase:logger.debug("Error checking if bucket exists:",e)returnFalsedef_check_scope_and_collection_exists(self)->bool:""" Check if the scope and collection exists in the linked Couchbase bucket Returns: True if the scope and collection exist in the bucket Raises a ValueError if either is not found. """scope_collection_map:Dict[str,Any]={}# Get a list of all scopes in the bucketforscopeinself._bucket.collections().get_all_scopes():scope_collection_map[scope.name]=[]# Get a list of all the collections in the scopeforcollectioninscope.collections:scope_collection_map[scope.name].append(collection.name)# Check if the scope existsifself._scope_namenotinscope_collection_map:raiseValueError(f"Scope {self._scope_name} not found in Couchbase "f"bucket {self._bucket_name}")# Check if the collection exists in the scopeifself._collection_namenotinscope_collection_map[self._scope_name]:raiseValueError(f"Collection {self._collection_name} not found in scope "f"{self._scope_name} in Couchbase bucket {self._bucket_name}")returnTruedef_check_index_exists(self)->bool:""" Check if the Search index exists in the linked Couchbase cluster Returns: bool: True if the index exists, False otherwise. Raises a ValueError if the index does not exist. """ifself._scoped_index:all_indexes=[index.nameforindexinself._scope.search_indexes().get_all_indexes()]ifself._index_namenotinall_indexes:raiseValueError(f"Index {self._index_name} does not exist. "" Please create the index before searching.")else:all_indexes=[index.nameforindexinself._cluster.search_indexes().get_all_indexes()]ifself._index_namenotinall_indexes:raiseValueError(f"Index {self._index_name} does not exist. "" Please create the index before searching.")returnTruedef_format_metadata(self,row_fields:Dict[str,Any])->Dict[str,Any]:""" Helper method to format the metadata from the Couchbase Search API. Args: row_fields (Dict[str, Any]): The fields to format. Returns: Dict[str, Any]: The formatted metadata. """metadata={}forkey,valueinrow_fields.items():# Couchbase Search returns the metadata key with a prefix# `metadata.` We remove it to get the original metadata keyifkey.startswith(self._metadata_key):new_key=key.split(self._metadata_key+".")[-1]metadata[new_key]=valueelse:metadata[key]=valuereturnmetadata
defadd(self,nodes:List[BaseNode],**kwargs:Any)->List[str]:""" Add nodes to the collection and return their document IDs. Args: nodes (List[BaseNode]): List of nodes to add. **kwargs (Any): Additional keyword arguments. batch_size (int): Size of the batch for batch insert. Returns: List[str]: List of document IDs for the added nodes. """fromcouchbase.exceptionsimportDocumentExistsExceptionbatch_size=kwargs.get("batch_size",self.DEFAULT_BATCH_SIZE)documents_to_insert=[]doc_ids=[]fornodeinnodes:metadata=node_to_metadata_dict(node,remove_text=True,text_field=self._text_key,flat_metadata=self.flat_metadata,)doc_id:str=node.node_iddoc={self._text_key:node.get_content(metadata_mode=MetadataMode.NONE),self._embedding_key:node.embedding,self._metadata_key:metadata,}documents_to_insert.append({doc_id:doc})foriinrange(0,len(documents_to_insert),batch_size):batch=documents_to_insert[i:i+batch_size]try:# convert the list of dicts to a single dict for batch insertinsert_batch={}fordocinbatch:insert_batch.update(doc)logger.debug("Inserting batch of documents to Couchbase",insert_batch)# upsert the batch of documents into the collectionresult=self._collection.upsert_multi(insert_batch)logger.debug(f"Insert result: {result.all_ok}")ifresult.all_ok:doc_ids.extend(insert_batch.keys())exceptDocumentExistsExceptionase:logger.debug(f"Document already exists: {e}")logger.debug("Inserted batch of documents to Couchbase")returndoc_ids
defdelete(self,ref_doc_id:str,**kwargs:Any)->None:""" Delete a document by its reference document ID. Args: ref_doc_id: The reference document ID to be deleted. Returns: None """try:document_field=self._metadata_key+".ref_doc_id"query=f"DELETE FROM `{self._collection_name}` WHERE {document_field} = $ref_doc_id"self._scope.query(query,ref_doc_id=ref_doc_id).execute()logger.debug(f"Deleted document {ref_doc_id}")exceptException:logger.error(f"Error deleting document {ref_doc_id}")raise
defquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:""" Executes a query in the vector store and returns the result. Args: query (VectorStoreQuery): The query object containing the search parameters. **kwargs (Any): Additional keyword arguments. cb_search_options (Dict): Search options to pass to Couchbase Search Returns: VectorStoreQueryResult: The result of the query containing the top-k nodes, similarities, and ids. """importcouchbase.searchassearchfromcouchbase.optionsimportSearchOptionsfromcouchbase.vector_searchimportVectorQuery,VectorSearchfields=query.output_fieldsifnotfields:fields=["*"]# Document text field needs to be returned from the searchifself._text_keynotinfieldsandfields!=["*"]:fields.append(self._text_key)logger.debug("Output Fields: ",fields)k=query.similarity_top_k# Get the search optionssearch_options=kwargs.get("cb_search_options",{})ifsearch_optionsandquery.filters:raiseValueError("Cannot use both filters and cb_search_options")elifquery.filters:couchbase_options=_to_couchbase_filter(query.filters)logger.debug(f"Filters transformed to Couchbase: {couchbase_options}")search_options=couchbase_optionslogger.debug(f"Filters: {search_options}")# Create Search Requestsearch_req=search.SearchRequest.create(VectorSearch.from_vector_query(VectorQuery(self._embedding_key,query.query_embedding,k,)))try:logger.debug("Querying Couchbase")ifself._scoped_index:search_iter=self._scope.search(self._index_name,search_req,SearchOptions(limit=k,fields=fields,raw=search_options),)else:search_iter=self._cluster.search(self._index_name,search_req,SearchOptions(limit=k,fields=fields,raw=search_options),)exceptExceptionase:logger.debug(f"Search failed with error {e}")raiseValueError(f"Search failed with error: {e}")top_k_nodes=[]top_k_scores=[]top_k_ids=[]# Parse the resultsforresultinsearch_iter.rows():text=result.fields.pop(self._text_key,"")score=result.score# Format the metadata into a dictionarymetadata_dict=self._format_metadata(result.fields)id=result.idtry:node=metadata_dict_to_node(metadata_dict,text)exceptException:# Deprecated legacy logic for backwards compatibilitynode=TextNode(text=text,id_=id,score=score,metadata=metadata_dict,)top_k_nodes.append(node)top_k_scores.append(score)top_k_ids.append(id)returnVectorStoreQueryResult(nodes=top_k_nodes,similarities=top_k_scores,ids=top_k_ids)