"""Wrapper around TileDB vector database."""from__future__importannotationsimportpickleimportrandomimportsysfromtypingimportAny,Dict,Iterable,List,Mapping,Optional,Tupleimportnumpyasnpfromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.utilsimportguard_importfromlangchain_core.vectorstoresimportVectorStorefromlangchain_community.vectorstores.utilsimportmaximal_marginal_relevanceINDEX_METRICS=frozenset(["euclidean"])DEFAULT_METRIC="euclidean"DOCUMENTS_ARRAY_NAME="documents"VECTOR_INDEX_NAME="vectors"MAX_UINT64=np.iinfo(np.dtype("uint64")).maxMAX_FLOAT_32=np.finfo(np.dtype("float32")).maxMAX_FLOAT=sys.float_info.max
[docs]defdependable_tiledb_import()->Any:"""Import tiledb-vector-search if available, otherwise raise error."""return(guard_import("tiledb.vector_search"),guard_import("tiledb"),)
[docs]defget_vector_index_uri_from_group(group:Any)->str:"""Get the URI of the vector index."""returngroup[VECTOR_INDEX_NAME].uri
[docs]defget_documents_array_uri_from_group(group:Any)->str:"""Get the URI of the documents array from group. Args: group: TileDB group object. Returns: URI of the documents array. """returngroup[DOCUMENTS_ARRAY_NAME].uri
[docs]defget_vector_index_uri(uri:str)->str:"""Get the URI of the vector index."""returnf"{uri}/{VECTOR_INDEX_NAME}"
[docs]defget_documents_array_uri(uri:str)->str:"""Get the URI of the documents array."""returnf"{uri}/{DOCUMENTS_ARRAY_NAME}"
[docs]classTileDB(VectorStore):"""TileDB vector store. To use, you should have the ``tiledb-vector-search`` python package installed. Example: .. code-block:: python from langchain_community import TileDB embeddings = OpenAIEmbeddings() db = TileDB(embeddings, index_uri, metric) """
[docs]def__init__(self,embedding:Embeddings,index_uri:str,metric:str,*,vector_index_uri:str="",docs_array_uri:str="",config:Optional[Mapping[str,Any]]=None,timestamp:Any=None,allow_dangerous_deserialization:bool=False,**kwargs:Any,):"""Initialize with necessary components. Args: allow_dangerous_deserialization: whether to allow deserialization of the data which involves loading data using pickle. data can be modified by malicious actors to deliver a malicious payload that results in execution of arbitrary code on your machine. """ifnotallow_dangerous_deserialization:raiseValueError("TileDB relies on pickle for serialization and deserialization. ""This can be dangerous if the data is intercepted and/or modified ""by malicious actors prior to being de-serialized. ""If you are sure that the data is safe from modification, you can "" set allow_dangerous_deserialization=True to proceed. ""Loading of compromised data using pickle can result in execution of ""arbitrary code on your machine.")self.embedding=embeddingself.embedding_function=embedding.embed_queryself.index_uri=index_uriself.metric=metricself.config=configtiledb_vs,tiledb=(guard_import("tiledb.vector_search"),guard_import("tiledb"),)withtiledb.scope_ctx(ctx_or_config=config):index_group=tiledb.Group(self.index_uri,"r")self.vector_index_uri=(vector_index_uriifvector_index_uri!=""elseget_vector_index_uri_from_group(index_group))self.docs_array_uri=(docs_array_uriifdocs_array_uri!=""elseget_documents_array_uri_from_group(index_group))index_group.close()group=tiledb.Group(self.vector_index_uri,"r")self.index_type=group.meta.get("index_type")group.close()self.timestamp=timestampifself.index_type=="FLAT":self.vector_index=tiledb_vs.flat_index.FlatIndex(uri=self.vector_index_uri,config=self.config,timestamp=self.timestamp,**kwargs,)elifself.index_type=="IVF_FLAT":self.vector_index=tiledb_vs.ivf_flat_index.IVFFlatIndex(uri=self.vector_index_uri,config=self.config,timestamp=self.timestamp,**kwargs,)
[docs]defprocess_index_results(self,ids:List[int],scores:List[float],*,k:int=4,filter:Optional[Dict[str,Any]]=None,score_threshold:float=MAX_FLOAT,)->List[Tuple[Document,float]]:"""Turns TileDB results into a list of documents and scores. Args: ids: List of indices of the documents in the index. scores: List of distances of the documents in the index. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, Any]]): Filter by metadata. Defaults to None. score_threshold: Optional, a floating point value to filter the resulting set of retrieved docs Returns: List of Documents and scores. """tiledb=guard_import("tiledb")docs=[]docs_array=tiledb.open(self.docs_array_uri,"r",timestamp=self.timestamp,config=self.config)foridx,scoreinzip(ids,scores):ifidx==0andscore==0:continueifidx==MAX_UINT64andscore==MAX_FLOAT_32:continuedoc=docs_array[idx]ifdocisNoneorlen(doc["text"])==0:raiseValueError(f"Could not find document for id {idx}, got {doc}")pickled_metadata=doc.get("metadata")result_doc=Document(page_content=str(doc["text"][0]))ifpickled_metadataisnotNone:metadata=pickle.loads(# ignore[pickle]: explicit-opt-innp.array(pickled_metadata.tolist()).astype(np.uint8).tobytes())result_doc.metadata=metadataiffilterisnotNone:filter={key:[value]ifnotisinstance(value,list)elsevalueforkey,valueinfilter.items()}ifall(result_doc.metadata.get(key)invalueforkey,valueinfilter.items()):docs.append((result_doc,score))else:docs.append((result_doc,score))docs_array.close()docs=[(doc,score)fordoc,scoreindocsifscore<=score_threshold]returndocs[:k]
[docs]defsimilarity_search_with_score_by_vector(self,embedding:List[float],*,k:int=4,filter:Optional[Dict[str,Any]]=None,fetch_k:int=20,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs most similar to query. Args: embedding: Embedding vector to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, Any]]): Filter by metadata. Defaults to None. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. **kwargs: kwargs to be passed to similarity search. Can include: nprobe: Optional, number of partitions to check if using IVF_FLAT index score_threshold: Optional, a floating point value to filter the resulting set of retrieved docs Returns: List of documents most similar to the query text and distance in float for each. Lower score represents more similarity. """if"score_threshold"inkwargs:score_threshold=kwargs.pop("score_threshold")else:score_threshold=MAX_FLOATd,i=self.vector_index.query(np.array([np.array(embedding).astype(np.float32)]).astype(np.float32),k=kiffilterisNoneelsefetch_k,**kwargs,)returnself.process_index_results(ids=i[0],scores=d[0],filter=filter,k=k,score_threshold=score_threshold)
[docs]defsimilarity_search_with_score(self,query:str,*,k:int=4,filter:Optional[Dict[str,Any]]=None,fetch_k:int=20,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. Returns: List of documents most similar to the query text with Distance as float. Lower score represents more similarity. """embedding=self.embedding_function(query)docs=self.similarity_search_with_score_by_vector(embedding,k=k,filter=filter,fetch_k=fetch_k,**kwargs,)returndocs
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,filter:Optional[Dict[str,Any]]=None,fetch_k:int=20,**kwargs:Any,)->List[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. Returns: List of Documents most similar to the embedding. """docs_and_scores=self.similarity_search_with_score_by_vector(embedding,k=k,filter=filter,fetch_k=fetch_k,**kwargs,)return[docfordoc,_indocs_and_scores]
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[Dict[str,Any]]=None,fetch_k:int=20,**kwargs:Any,)->List[Document]:"""Return docs most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: (Optional[Dict[str, str]]): Filter by metadata. Defaults to None. fetch_k: (Optional[int]) Number of Documents to fetch before filtering. Defaults to 20. Returns: List of Documents most similar to the query. """docs_and_scores=self.similarity_search_with_score(query,k=k,filter=filter,fetch_k=fetch_k,**kwargs)return[docfordoc,_indocs_and_scores]
[docs]defmax_marginal_relevance_search_with_score_by_vector(self,embedding:List[float],*,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Tuple[Document,float]]:"""Return docs and their similarity scores selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch before filtering to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents and similarity scores selected by maximal marginal relevance and score for each. """if"score_threshold"inkwargs:score_threshold=kwargs.pop("score_threshold")else:score_threshold=MAX_FLOATscores,indices=self.vector_index.query(np.array([np.array(embedding).astype(np.float32)]).astype(np.float32),k=fetch_kiffilterisNoneelsefetch_k*2,**kwargs,)results=self.process_index_results(ids=indices[0],scores=scores[0],filter=filter,k=fetch_kiffilterisNoneelsefetch_k*2,score_threshold=score_threshold,)embeddings=[self.embedding.embed_documents([doc.page_content])[0]fordoc,_inresults]mmr_selected=maximal_marginal_relevance(np.array([embedding],dtype=np.float32),embeddings,k=k,lambda_mult=lambda_mult,)docs_and_scores=[]foriinmmr_selected:docs_and_scores.append(results[i])returndocs_and_scores
[docs]defmax_marginal_relevance_search_by_vector(self,embedding:List[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch before filtering to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents selected by maximal marginal relevance. """docs_and_scores=self.max_marginal_relevance_search_with_score_by_vector(embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)return[docfordoc,_indocs_and_scores]
[docs]defmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[Dict[str,Any]]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch before filtering (if needed) to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. Returns: List of Documents selected by maximal marginal relevance. """embedding=self.embedding_function(query)docs=self.max_marginal_relevance_search_by_vector(embedding,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,filter=filter,**kwargs,)returndocs
[docs]@classmethoddefcreate(cls,index_uri:str,index_type:str,dimensions:int,vector_type:np.dtype,*,metadatas:bool=True,config:Optional[Mapping[str,Any]]=None,)->None:tiledb_vs,tiledb=(guard_import("tiledb.vector_search"),guard_import("tiledb"),)withtiledb.scope_ctx(ctx_or_config=config):try:tiledb.group_create(index_uri)excepttiledb.TileDBErroraserr:raiseerrgroup=tiledb.Group(index_uri,"w")vector_index_uri=get_vector_index_uri(group.uri)docs_uri=get_documents_array_uri(group.uri)ifindex_type=="FLAT":tiledb_vs.flat_index.create(uri=vector_index_uri,dimensions=dimensions,vector_type=vector_type,config=config,)elifindex_type=="IVF_FLAT":tiledb_vs.ivf_flat_index.create(uri=vector_index_uri,dimensions=dimensions,vector_type=vector_type,config=config,)group.add(vector_index_uri,name=VECTOR_INDEX_NAME)# Create TileDB array to store Documents# TODO add a Document store API to tiledb-vector-search to allow storing# different types of objects and metadata in a more generic way.dim=tiledb.Dim(name="id",domain=(0,MAX_UINT64-1),dtype=np.dtype(np.uint64),)dom=tiledb.Domain(dim)text_attr=tiledb.Attr(name="text",dtype=np.dtype("U1"),var=True)attrs=[text_attr]ifmetadatas:metadata_attr=tiledb.Attr(name="metadata",dtype=np.uint8,var=True)attrs.append(metadata_attr)schema=tiledb.ArraySchema(domain=dom,sparse=True,allows_duplicates=False,attrs=attrs,)tiledb.Array.create(docs_uri,schema)group.add(docs_uri,name=DOCUMENTS_ARRAY_NAME)group.close()
@classmethoddef__from(cls,texts:List[str],embeddings:List[List[float]],embedding:Embeddings,index_uri:str,*,metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,metric:str=DEFAULT_METRIC,index_type:str="FLAT",config:Optional[Mapping[str,Any]]=None,index_timestamp:int=0,**kwargs:Any,)->TileDB:ifmetricnotinINDEX_METRICS:raiseValueError((f"Unsupported distance metric: {metric}. "f"Expected one of {list(INDEX_METRICS)}"))tiledb_vs,tiledb=(guard_import("tiledb.vector_search"),guard_import("tiledb"),)input_vectors=np.array(embeddings).astype(np.float32)cls.create(index_uri=index_uri,index_type=index_type,dimensions=input_vectors.shape[1],vector_type=input_vectors.dtype,metadatas=metadatasisnotNone,config=config,)withtiledb.scope_ctx(ctx_or_config=config):ifnotembeddings:raiseValueError("embeddings must be provided to build a TileDB index")vector_index_uri=get_vector_index_uri(index_uri)docs_uri=get_documents_array_uri(index_uri)ifidsisNone:ids=[str(random.randint(0,MAX_UINT64-1))for_intexts]external_ids=np.array(ids).astype(np.uint64)tiledb_vs.ingestion.ingest(index_type=index_type,index_uri=vector_index_uri,input_vectors=input_vectors,external_ids=external_ids,index_timestamp=index_timestampifindex_timestamp!=0elseNone,config=config,**kwargs,)withtiledb.open(docs_uri,"w")asA:ifexternal_idsisNone:external_ids=np.zeros(len(texts),dtype=np.uint64)foriinrange(len(texts)):external_ids[i]=idata={}data["text"]=np.array(texts)ifmetadatasisnotNone:metadata_attr=np.empty([len(metadatas)],dtype=object)i=0formetadatainmetadatas:metadata_attr[i]=np.frombuffer(pickle.dumps(metadata),dtype=np.uint8)i+=1data["metadata"]=metadata_attrA[external_ids]=datareturncls(embedding=embedding,index_uri=index_uri,metric=metric,config=config,**kwargs,)
[docs]defdelete(self,ids:Optional[List[str]]=None,timestamp:int=0,**kwargs:Any)->Optional[bool]:"""Delete by vector ID or other criteria. Args: ids: List of ids to delete. timestamp: Optional timestamp to delete with. **kwargs: Other keyword arguments that subclasses might use. Returns: Optional[bool]: True if deletion is successful, False otherwise, None if not implemented. """external_ids=np.array(ids).astype(np.uint64)self.vector_index.delete_batch(external_ids=external_ids,timestamp=timestampiftimestamp!=0elseNone)returnTrue
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,timestamp:int=0,**kwargs:Any,)->List[str]:"""Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional ids of each text object. timestamp: Optional timestamp to write new texts with. kwargs: vectorstore specific parameters Returns: List of ids from adding the texts into the vectorstore. """tiledb=guard_import("tiledb")embeddings=self.embedding.embed_documents(list(texts))ifidsisNone:ids=[str(random.randint(0,MAX_UINT64-1))for_intexts]external_ids=np.array(ids).astype(np.uint64)vectors=np.empty((len(embeddings)),dtype="O")foriinrange(len(embeddings)):vectors[i]=np.array(embeddings[i],dtype=np.float32)self.vector_index.update_batch(vectors=vectors,external_ids=external_ids,timestamp=timestampiftimestamp!=0elseNone,)docs={}docs["text"]=np.array(texts)ifmetadatasisnotNone:metadata_attr=np.empty([len(metadatas)],dtype=object)i=0formetadatainmetadatas:metadata_attr[i]=np.frombuffer(pickle.dumps(metadata),dtype=np.uint8)i+=1docs["metadata"]=metadata_attrdocs_array=tiledb.open(self.docs_array_uri,"w",timestamp=timestampiftimestamp!=0elseNone,config=self.config,)docs_array[external_ids]=docsdocs_array.close()returnids
[docs]@classmethoddeffrom_texts(cls,texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,metric:str=DEFAULT_METRIC,index_uri:str="/tmp/tiledb_array",index_type:str="FLAT",config:Optional[Mapping[str,Any]]=None,index_timestamp:int=0,**kwargs:Any,)->TileDB:"""Construct a TileDB index from raw documents. Args: texts: List of documents to index. embedding: Embedding function to use. metadatas: List of metadata dictionaries to associate with documents. ids: Optional ids of each text object. metric: Metric to use for indexing. Defaults to "euclidean". index_uri: The URI to write the TileDB arrays index_type: Optional, Vector index type ("FLAT", IVF_FLAT") config: Optional, TileDB config index_timestamp: Optional, timestamp to write new texts with. Example: .. code-block:: python from langchain_community import TileDB from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() index = TileDB.from_texts(texts, embeddings) """embeddings=[]embeddings=embedding.embed_documents(texts)returncls.__from(texts=texts,embeddings=embeddings,embedding=embedding,metadatas=metadatas,ids=ids,metric=metric,index_uri=index_uri,index_type=index_type,config=config,index_timestamp=index_timestamp,**kwargs,)
[docs]@classmethoddeffrom_embeddings(cls,text_embeddings:List[Tuple[str,List[float]]],embedding:Embeddings,index_uri:str,*,metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,metric:str=DEFAULT_METRIC,index_type:str="FLAT",config:Optional[Mapping[str,Any]]=None,index_timestamp:int=0,**kwargs:Any,)->TileDB:"""Construct TileDB index from embeddings. Args: text_embeddings: List of tuples of (text, embedding) embedding: Embedding function to use. index_uri: The URI to write the TileDB arrays metadatas: List of metadata dictionaries to associate with documents. metric: Optional, Metric to use for indexing. Defaults to "euclidean". index_type: Optional, Vector index type ("FLAT", IVF_FLAT") config: Optional, TileDB config index_timestamp: Optional, timestamp to write new texts with. Example: .. code-block:: python from langchain_community import TileDB from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) db = TileDB.from_embeddings(text_embedding_pairs, embeddings) """texts=[t[0]fortintext_embeddings]embeddings=[t[1]fortintext_embeddings]returncls.__from(texts=texts,embeddings=embeddings,embedding=embedding,metadatas=metadatas,ids=ids,metric=metric,index_uri=index_uri,index_type=index_type,config=config,index_timestamp=index_timestamp,**kwargs,)
[docs]@classmethoddefload(cls,index_uri:str,embedding:Embeddings,*,metric:str=DEFAULT_METRIC,config:Optional[Mapping[str,Any]]=None,timestamp:Any=None,**kwargs:Any,)->TileDB:"""Load a TileDB index from a URI. Args: index_uri: The URI of the TileDB vector index. embedding: Embeddings to use when generating queries. metric: Optional, Metric to use for indexing. Defaults to "euclidean". config: Optional, TileDB config timestamp: Optional, timestamp to use for opening the arrays. """returncls(embedding=embedding,index_uri=index_uri,metric=metric,config=config,timestamp=timestamp,**kwargs,)