importosfromllama_index.vector_stores.pineconeimportPineconeVectorStorefrompineconeimportPinecone,ServerlessSpec# Set up Pinecone API keyos.environ["PINECONE_API_KEY"]="<Your Pinecone API key, from app.pinecone.io>"api_key=os.environ["PINECONE_API_KEY"]# Create Pinecone Vector Storepc=Pinecone(api_key=api_key)pc.create_index(name="quickstart",dimension=1536,metric="dotproduct",spec=ServerlessSpec(cloud="aws",region="us-west-2"),)pinecone_index=pc.Index("quickstart")vector_store=PineconeVectorStore(pinecone_index=pinecone_index,)
Source code in llama-index-integrations/vector_stores/llama-index-vector-stores-pinecone/llama_index/vector_stores/pinecone/base.py
classPineconeVectorStore(BasePydanticVectorStore):""" Pinecone Vector Store. In this vector store, embeddings and docs are stored within a Pinecone index. During query time, the index uses Pinecone to query for the top k most similar nodes. Args: pinecone_index (Optional[Union[pinecone.Pinecone.Index, pinecone.Index]]): Pinecone index instance, pinecone.Pinecone.Index for clients >= 3.0.0; pinecone.Index for older clients. insert_kwargs (Optional[Dict]): insert kwargs during `upsert` call. add_sparse_vector (bool): whether to add sparse vector to index. tokenizer (Optional[Callable]): tokenizer to use to generate sparse default_empty_query_vector (Optional[List[float]]): default empty query vector. Defaults to None. If not None, then this vector will be used as the query vector if the query is empty. Examples: `pip install llama-index-vector-stores-pinecone` ```python import os from llama_index.vector_stores.pinecone import PineconeVectorStore from pinecone import Pinecone, ServerlessSpec # Set up Pinecone API key os.environ["PINECONE_API_KEY"] = "<Your Pinecone API key, from app.pinecone.io>" api_key = os.environ["PINECONE_API_KEY"] # Create Pinecone Vector Store pc = Pinecone(api_key=api_key) pc.create_index( name="quickstart", dimension=1536, metric="dotproduct", spec=ServerlessSpec(cloud="aws", region="us-west-2"), ) pinecone_index = pc.Index("quickstart") vector_store = PineconeVectorStore( pinecone_index=pinecone_index, ) ``` """stores_text:bool=Trueflat_metadata:bool=Falseapi_key:Optional[str]index_name:Optional[str]environment:Optional[str]namespace:Optional[str]insert_kwargs:Optional[Dict]add_sparse_vector:booltext_key:strbatch_size:intremove_text_from_metadata:bool_pinecone_index:pinecone.Index=PrivateAttr()_sparse_embedding_model:Optional[BaseSparseEmbedding]=PrivateAttr()def__init__(self,pinecone_index:Optional[Any]=None,# Dynamic import prevents specific type hinting hereapi_key:Optional[str]=None,index_name:Optional[str]=None,environment:Optional[str]=None,namespace:Optional[str]=None,insert_kwargs:Optional[Dict]=None,add_sparse_vector:bool=False,tokenizer:Optional[Callable]=None,text_key:str=DEFAULT_TEXT_KEY,batch_size:int=DEFAULT_BATCH_SIZE,remove_text_from_metadata:bool=False,default_empty_query_vector:Optional[List[float]]=None,sparse_embedding_model:Optional[BaseSparseEmbedding]=None,**kwargs:Any,)->None:insert_kwargs=insert_kwargsor{}ifadd_sparse_vector:ifsparse_embedding_modelisnotNone:sparse_embedding_model=sparse_embedding_modeleliftokenizerisnotNone:sparse_embedding_model=DefaultPineconeSparseEmbedding(tokenizer=tokenizer)else:sparse_embedding_model=DefaultPineconeSparseEmbedding()else:sparse_embedding_model=Nonesuper().__init__(index_name=index_name,environment=environment,api_key=api_key,namespace=namespace,insert_kwargs=insert_kwargs,add_sparse_vector=add_sparse_vector,text_key=text_key,batch_size=batch_size,remove_text_from_metadata=remove_text_from_metadata,)self._sparse_embedding_model=sparse_embedding_model# TODO: Make following instance check stronger -- check if pinecone_index is not pinecone.Index, else raise# ValueErrorifisinstance(pinecone_index,str):raiseValueError("`pinecone_index` cannot be of type `str`; should be an instance of pinecone.Index, ")self._pinecone_index=pinecone_indexorself._initialize_pinecone_client(api_key,index_name,environment,**kwargs)@classmethoddef_initialize_pinecone_client(cls,api_key:Optional[str],index_name:Optional[str],environment:Optional[str],**kwargs:Any,)->Any:""" Initialize Pinecone client based on version. If client version <3.0.0, use pods-based initialization; else, use serverless initialization. """ifnotindex_name:raiseValueError("`index_name` is required for Pinecone client initialization")pinecone=_import_pinecone()if(not_is_pinecone_v3()):# If old version of Pinecone client (version bifurcation temporary):ifnotenvironment:raiseValueError("environment is required for Pinecone client < 3.0.0")pinecone.init(api_key=api_key,environment=environment)returnpinecone.Index(index_name)else:# If new version of Pinecone client (serverless):pinecone_instance=pinecone.Pinecone(api_key=api_key,source_tag="llamaindex")returnpinecone_instance.Index(index_name)@classmethoddeffrom_params(cls,api_key:Optional[str]=None,index_name:Optional[str]=None,environment:Optional[str]=None,namespace:Optional[str]=None,insert_kwargs:Optional[Dict]=None,add_sparse_vector:bool=False,tokenizer:Optional[Callable]=None,text_key:str=DEFAULT_TEXT_KEY,batch_size:int=DEFAULT_BATCH_SIZE,remove_text_from_metadata:bool=False,default_empty_query_vector:Optional[List[float]]=None,**kwargs:Any,)->"PineconeVectorStore":pinecone_index=cls._initialize_pinecone_client(api_key,index_name,environment,**kwargs)returncls(pinecone_index=pinecone_index,api_key=api_key,index_name=index_name,environment=environment,namespace=namespace,insert_kwargs=insert_kwargs,add_sparse_vector=add_sparse_vector,tokenizer=tokenizer,text_key=text_key,batch_size=batch_size,remove_text_from_metadata=remove_text_from_metadata,default_empty_query_vector=default_empty_query_vector,**kwargs,)@classmethoddefclass_name(cls)->str:return"PinconeVectorStore"defadd(self,nodes:List[BaseNode],**add_kwargs:Any,)->List[str]:""" Add nodes to index. Args: nodes: List[BaseNode]: list of nodes with embeddings """ids=[]entries=[]sparse_inputs=[]fornodeinnodes:node_id=node.node_idmetadata=node_to_metadata_dict(node,remove_text=self.remove_text_from_metadata,flat_metadata=self.flat_metadata,)ifself.add_sparse_vectorandself._sparse_embedding_modelisnotNone:sparse_inputs.append(node.get_content(metadata_mode=MetadataMode.EMBED))ifnode.ref_doc_idisnotNone:node_id=f"{node.ref_doc_id}#{node_id}"ids.append(node_id)entry={ID_KEY:node_id,VECTOR_KEY:node.get_embedding(),METADATA_KEY:metadata,}entries.append(entry)# batch sparse embedding generationifsparse_inputs:sparse_vectors=self._sparse_embedding_model.get_text_embedding_batch(sparse_inputs)fori,sparse_vectorinenumerate(sparse_vectors):entries[i][SPARSE_VECTOR_KEY]={"indices":list(sparse_vector.keys()),"values":list(sparse_vector.values()),}self._pinecone_index.upsert(entries,namespace=self.namespace,batch_size=self.batch_size,**self.insert_kwargs,)returnidsdefget_nodes(self,node_ids:Optional[List[str]]=None,filters:Optional[List[MetadataFilters]]=None,limit:int=100,include_values:bool=False,)->List[BaseNode]:filter=NoneiffiltersisnotNone:filter=_to_pinecone_filter(filters)ifnode_idsisnotNone:raiseValueError("Getting nodes by node id not supported by Pinecone at the time of writing.")ifnode_idsisNoneandfiltersisNone:raiseValueError("Filters must be specified")# Pinecone requires a query vector, so default to 0s if not providedquery_vector=[0.0]*self._pinecone_index.describe_index_stats()["dimension"]response=self._pinecone_index.query(top_k=limit,vector=query_vector,namespace=self.namespace,filter=filter,include_values=include_values,include_metadata=True,)nodes=[metadata_dict_to_node(match.metadata)formatchinresponse.matches]ifinclude_values:fornode,matchinzip(nodes,response.matches):node.embedding=match.valuesreturnnodesdefdelete(self,ref_doc_id:str,**delete_kwargs:Any)->None:""" Delete nodes using with ref_doc_id. Args: ref_doc_id (str): The doc_id of the document to delete. """try:# delete by filtering on the doc_id metadataself._pinecone_index.delete(filter={"doc_id":{"$eq":ref_doc_id}},namespace=self.namespace,**delete_kwargs,)exceptException:# fallback to deleting by prefix for serverless# TODO: this is a bit of a hack, we should find a better way to handle thisid_gen=self._pinecone_index.list(prefix=ref_doc_id,namespace=self.namespace)ids_to_delete=list(id_gen)ifids_to_delete:self._pinecone_index.delete(ids=ids_to_delete,namespace=self.namespace,**delete_kwargs)defdelete_nodes(self,node_ids:Optional[List[str]]=None,filters:Optional[MetadataFilters]=None,**delete_kwargs:Any,)->None:""" Deletes nodes using their ids. Args: node_ids (Optional[List[str]], optional): List of node IDs. Defaults to None. filters (Optional[MetadataFilters], optional): Metadata filters. Defaults to None. """node_ids=node_idsor[]iffiltersisnotNone:filter=_to_pinecone_filter(filters)else:filter=Noneself._pinecone_index.delete(ids=node_ids,namespace=self.namespace,filter=filter,**delete_kwargs)defclear(self)->None:"""Clears the index."""self._pinecone_index.delete(namespace=self.namespace,delete_all=True)@propertydefclient(self)->Any:"""Return Pinecone client."""returnself._pinecone_indexdefquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:""" Query index for top k most similar nodes. Args: query_embedding (List[float]): query embedding similarity_top_k (int): top k most similar nodes """pinecone_sparse_vector=Noneif(query.modein(VectorStoreQueryMode.SPARSE,VectorStoreQueryMode.HYBRID)andself._sparse_embedding_modelisnotNone):ifquery.query_strisNone:raiseValueError("query_str must be specified if mode is SPARSE or HYBRID.")sparse_vector=self._sparse_embedding_model.get_query_embedding(query.query_str)ifquery.alphaisnotNone:pinecone_sparse_vector={"indices":list(sparse_vector.keys()),"values":[v*(1-query.alpha)forvinsparse_vector.values()],}else:pinecone_sparse_vector={"indices":list(sparse_vector.keys()),"values":list(sparse_vector.values()),}# pinecone requires a query embedding, so default to 0s if not providedifquery.query_embeddingisnotNone:dimension=len(query.query_embedding)else:dimension=self._pinecone_index.describe_index_stats()["dimension"]query_embedding=[0.0]*dimensionifquery.modein(VectorStoreQueryMode.DEFAULT,VectorStoreQueryMode.HYBRID):query_embedding=cast(List[float],query.query_embedding)ifquery.alphaisnotNone:query_embedding=[v*query.alphaforvinquery_embedding]ifquery.filtersisnotNone:if"filter"inkwargsor"pinecone_query_filters"inkwargs:raiseValueError("Cannot specify filter via both query and kwargs. ""Use kwargs only for pinecone specific items that are ""not supported via the generic query interface.")filter=_to_pinecone_filter(query.filters)elif"pinecone_query_filters"inkwargs:filter=kwargs.pop("pinecone_query_filters")else:filter=kwargs.pop("filter",{})response=self._pinecone_index.query(vector=query_embedding,sparse_vector=pinecone_sparse_vector,top_k=query.similarity_top_k,include_values=kwargs.pop("include_values",True),include_metadata=kwargs.pop("include_metadata",True),namespace=self.namespace,filter=filter,**kwargs,)top_k_nodes=[]top_k_ids=[]top_k_scores=[]formatchinresponse.matches:try:node=metadata_dict_to_node(match.metadata)node.embedding=match.valuesexceptException:# NOTE: deprecated legacy logic for backward compatibility_logger.debug("Failed to parse Node metadata, fallback to legacy logic.")metadata,node_info,relationships=legacy_metadata_dict_to_node(match.metadata,text_key=self.text_key)text=match.metadata[self.text_key]id=match.idnode=TextNode(text=text,id_=id,metadata=metadata,start_char_idx=node_info.get("start",None),end_char_idx=node_info.get("end",None),relationships=relationships,)top_k_ids.append(match.id)top_k_nodes.append(node)top_k_scores.append(match.score)returnVectorStoreQueryResult(nodes=top_k_nodes,similarities=top_k_scores,ids=top_k_ids)
defadd(self,nodes:List[BaseNode],**add_kwargs:Any,)->List[str]:""" Add nodes to index. Args: nodes: List[BaseNode]: list of nodes with embeddings """ids=[]entries=[]sparse_inputs=[]fornodeinnodes:node_id=node.node_idmetadata=node_to_metadata_dict(node,remove_text=self.remove_text_from_metadata,flat_metadata=self.flat_metadata,)ifself.add_sparse_vectorandself._sparse_embedding_modelisnotNone:sparse_inputs.append(node.get_content(metadata_mode=MetadataMode.EMBED))ifnode.ref_doc_idisnotNone:node_id=f"{node.ref_doc_id}#{node_id}"ids.append(node_id)entry={ID_KEY:node_id,VECTOR_KEY:node.get_embedding(),METADATA_KEY:metadata,}entries.append(entry)# batch sparse embedding generationifsparse_inputs:sparse_vectors=self._sparse_embedding_model.get_text_embedding_batch(sparse_inputs)fori,sparse_vectorinenumerate(sparse_vectors):entries[i][SPARSE_VECTOR_KEY]={"indices":list(sparse_vector.keys()),"values":list(sparse_vector.values()),}self._pinecone_index.upsert(entries,namespace=self.namespace,batch_size=self.batch_size,**self.insert_kwargs,)returnids
defdelete(self,ref_doc_id:str,**delete_kwargs:Any)->None:""" Delete nodes using with ref_doc_id. Args: ref_doc_id (str): The doc_id of the document to delete. """try:# delete by filtering on the doc_id metadataself._pinecone_index.delete(filter={"doc_id":{"$eq":ref_doc_id}},namespace=self.namespace,**delete_kwargs,)exceptException:# fallback to deleting by prefix for serverless# TODO: this is a bit of a hack, we should find a better way to handle thisid_gen=self._pinecone_index.list(prefix=ref_doc_id,namespace=self.namespace)ids_to_delete=list(id_gen)ifids_to_delete:self._pinecone_index.delete(ids=ids_to_delete,namespace=self.namespace,**delete_kwargs)
defdelete_nodes(self,node_ids:Optional[List[str]]=None,filters:Optional[MetadataFilters]=None,**delete_kwargs:Any,)->None:""" Deletes nodes using their ids. Args: node_ids (Optional[List[str]], optional): List of node IDs. Defaults to None. filters (Optional[MetadataFilters], optional): Metadata filters. Defaults to None. """node_ids=node_idsor[]iffiltersisnotNone:filter=_to_pinecone_filter(filters)else:filter=Noneself._pinecone_index.delete(ids=node_ids,namespace=self.namespace,filter=filter,**delete_kwargs)
defquery(self,query:VectorStoreQuery,**kwargs:Any)->VectorStoreQueryResult:""" Query index for top k most similar nodes. Args: query_embedding (List[float]): query embedding similarity_top_k (int): top k most similar nodes """pinecone_sparse_vector=Noneif(query.modein(VectorStoreQueryMode.SPARSE,VectorStoreQueryMode.HYBRID)andself._sparse_embedding_modelisnotNone):ifquery.query_strisNone:raiseValueError("query_str must be specified if mode is SPARSE or HYBRID.")sparse_vector=self._sparse_embedding_model.get_query_embedding(query.query_str)ifquery.alphaisnotNone:pinecone_sparse_vector={"indices":list(sparse_vector.keys()),"values":[v*(1-query.alpha)forvinsparse_vector.values()],}else:pinecone_sparse_vector={"indices":list(sparse_vector.keys()),"values":list(sparse_vector.values()),}# pinecone requires a query embedding, so default to 0s if not providedifquery.query_embeddingisnotNone:dimension=len(query.query_embedding)else:dimension=self._pinecone_index.describe_index_stats()["dimension"]query_embedding=[0.0]*dimensionifquery.modein(VectorStoreQueryMode.DEFAULT,VectorStoreQueryMode.HYBRID):query_embedding=cast(List[float],query.query_embedding)ifquery.alphaisnotNone:query_embedding=[v*query.alphaforvinquery_embedding]ifquery.filtersisnotNone:if"filter"inkwargsor"pinecone_query_filters"inkwargs:raiseValueError("Cannot specify filter via both query and kwargs. ""Use kwargs only for pinecone specific items that are ""not supported via the generic query interface.")filter=_to_pinecone_filter(query.filters)elif"pinecone_query_filters"inkwargs:filter=kwargs.pop("pinecone_query_filters")else:filter=kwargs.pop("filter",{})response=self._pinecone_index.query(vector=query_embedding,sparse_vector=pinecone_sparse_vector,top_k=query.similarity_top_k,include_values=kwargs.pop("include_values",True),include_metadata=kwargs.pop("include_metadata",True),namespace=self.namespace,filter=filter,**kwargs,)top_k_nodes=[]top_k_ids=[]top_k_scores=[]formatchinresponse.matches:try:node=metadata_dict_to_node(match.metadata)node.embedding=match.valuesexceptException:# NOTE: deprecated legacy logic for backward compatibility_logger.debug("Failed to parse Node metadata, fallback to legacy logic.")metadata,node_info,relationships=legacy_metadata_dict_to_node(match.metadata,text_key=self.text_key)text=match.metadata[self.text_key]id=match.idnode=TextNode(text=text,id_=id,metadata=metadata,start_char_idx=node_info.get("start",None),end_char_idx=node_info.get("end",None),relationships=relationships,)top_k_ids.append(match.id)top_k_nodes.append(node)top_k_scores.append(match.score)returnVectorStoreQueryResult(nodes=top_k_nodes,similarities=top_k_scores,ids=top_k_ids)