Source code for langchain_community.vectorstores.timescalevector

"""VectorStore封装了一个基于Postgres-TimescaleVector数据库的数据库。"""
from __future__ import annotations

import enum
import logging
import uuid
from datetime import timedelta
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    Union,
)

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import DistanceStrategy

if TYPE_CHECKING:
    from timescale_vector import Predicates


DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE

ADA_TOKEN_COUNT = 1536

_LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain_store"


[docs]class TimescaleVector(VectorStore): """时序Postgres向量存储 要使用,您应该安装``timescale_vector`` python包。 参数: service_url: Timescale云上的服务URL。 embedding: 任何实现`langchain.embeddings.base.Embeddings`接口的嵌入函数。 collection_name: 要使用的集合的名称。(默认值:langchain_store) 这将成为用于该集合的表名称。 distance_strategy: 要使用的距离策略。(默认值:COSINE) pre_delete_collection: 如果为True,则如果存在,将删除该集合。(默认值:False)。用于测试。 示例: .. code-block:: python from langchain_community.vectorstores import TimescaleVector from langchain_community.embeddings.openai import OpenAIEmbeddings SERVICE_URL = "postgres://tsdbadmin:<password>@<id>.tsdb.cloud.timescale.com:<port>/tsdb?sslmode=require" COLLECTION_NAME = "state_of_the_union_test" embeddings = OpenAIEmbeddings() vectorestore = TimescaleVector.from_documents( embedding=embeddings, documents=docs, collection_name=COLLECTION_NAME, service_url=SERVICE_URL, )""" # noqa: E501
[docs] def __init__( self, service_url: str, embedding: Embeddings, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, num_dimensions: int = ADA_TOKEN_COUNT, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, pre_delete_collection: bool = False, logger: Optional[logging.Logger] = None, relevance_score_fn: Optional[Callable[[float], float]] = None, time_partition_interval: Optional[timedelta] = None, **kwargs: Any, ) -> None: try: from timescale_vector import client except ImportError: raise ImportError( "Could not import timescale_vector python package. " "Please install it with `pip install timescale-vector`." ) self.service_url = service_url self.embedding = embedding self.collection_name = collection_name self.num_dimensions = num_dimensions self._distance_strategy = distance_strategy self.pre_delete_collection = pre_delete_collection self.logger = logger or logging.getLogger(__name__) self.override_relevance_score_fn = relevance_score_fn self._time_partition_interval = time_partition_interval self.sync_client = client.Sync( self.service_url, self.collection_name, self.num_dimensions, self._distance_strategy.value.lower(), time_partition_interval=self._time_partition_interval, **kwargs, ) self.async_client = client.Async( self.service_url, self.collection_name, self.num_dimensions, self._distance_strategy.value.lower(), time_partition_interval=self._time_partition_interval, **kwargs, ) self.__post_init__()
def __post_init__( self, ) -> None: """ 初始化商店。 """ self.sync_client.create_tables() if self.pre_delete_collection: self.sync_client.delete_all() @property def embeddings(self) -> Embeddings: return self.embedding
[docs] def drop_tables(self) -> None: self.sync_client.drop_table()
@classmethod def __from( cls, texts: List[str], embeddings: List[List[float]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, service_url: Optional[str] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> TimescaleVector: num_dimensions = len(embeddings[0]) if ids is None: ids = [str(uuid.uuid4()) for _ in texts] if not metadatas: metadatas = [{} for _ in texts] if service_url is None: service_url = cls.get_service_url(kwargs) store = cls( service_url=service_url, num_dimensions=num_dimensions, collection_name=collection_name, embedding=embedding, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, ) store.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs ) return store @classmethod async def __afrom( cls, texts: List[str], embeddings: List[List[float]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, service_url: Optional[str] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> TimescaleVector: num_dimensions = len(embeddings[0]) if ids is None: ids = [str(uuid.uuid4()) for _ in texts] if not metadatas: metadatas = [{} for _ in texts] if service_url is None: service_url = cls.get_service_url(kwargs) store = cls( service_url=service_url, num_dimensions=num_dimensions, collection_name=collection_name, embedding=embedding, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, ) await store.aadd_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs ) return store
[docs] def add_embeddings( self, texts: Iterable[str], embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """将嵌入添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 embeddings:嵌入向量的列表的列表。 metadatas:与文本相关联的元数据列表。 kwargs:向量存储特定参数。 """ if ids is None: ids = [str(uuid.uuid4()) for _ in texts] if not metadatas: metadatas = [{} for _ in texts] records = list(zip(ids, metadatas, texts, embeddings)) self.sync_client.upsert(records) return ids
[docs] async def aadd_embeddings( self, texts: Iterable[str], embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """将嵌入添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 embeddings:嵌入向量的列表的列表。 metadatas:与文本相关联的元数据列表。 kwargs:向量存储特定参数。 """ if ids is None: ids = [str(uuid.uuid4()) for _ in texts] if not metadatas: metadatas = [{} for _ in texts] records = list(zip(ids, metadatas, texts, embeddings)) await self.async_client.upsert(records) return ids
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 kwargs:向量存储特定参数 返回: 将文本添加到向量存储中的ID列表。 """ embeddings = self.embedding.embed_documents(list(texts)) return self.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs )
[docs] async def aadd_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 kwargs:向量存储特定参数 返回: 将文本添加到向量存储中的ID列表。 """ embeddings = self.embedding.embed_documents(list(texts)) return await self.aadd_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs )
def _embed_query(self, query: str) -> Optional[List[float]]: # an empty query should not be embedded if query is None or query == "" or query.isspace(): return None else: return self.embedding.embed_query(query)
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Union[dict, list]] = None, predicates: Optional[Predicates] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找相似文档的文本。 k:要返回的文档数量。默认为4。 filter(可选[Dict[str,str]]):按元数据过滤。默认为无。 返回: 返回与查询最相似的文档列表,以及每个文档的得分。 """ embedding = self._embed_query(query) docs = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter, predicates=predicates, **kwargs, ) return docs
[docs] async def asimilarity_search_with_score( self, query: str, k: int = 4, filter: Optional[Union[dict, list]] = None, predicates: Optional[Predicates] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找相似文档的文本。 k:要返回的文档数量。默认为4。 filter(可选[Dict[str,str]]):按元数据过滤。默认为无。 返回: 返回与查询最相似的文档列表,以及每个文档的得分。 """ embedding = self._embed_query(query) return await self.asimilarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter, predicates=predicates, **kwargs, )
[docs] def date_to_range_filter(self, **kwargs: Any) -> Any: constructor_args = { key: kwargs[key] for key in [ "start_date", "end_date", "time_delta", "start_inclusive", "end_inclusive", ] if key in kwargs } if not constructor_args or len(constructor_args) == 0: return None try: from timescale_vector import client except ImportError: raise ImportError( "Could not import timescale_vector python package. " "Please install it with `pip install timescale-vector`." ) return client.UUIDTimeRange(**constructor_args)
[docs] def similarity_search_with_score_by_vector( self, embedding: Optional[List[float]], k: int = 4, filter: Optional[Union[dict, list]] = None, predicates: Optional[Predicates] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: try: from timescale_vector import client except ImportError: raise ImportError( "Could not import timescale_vector python package. " "Please install it with `pip install timescale-vector`." ) results = self.sync_client.search( embedding, limit=k, filter=filter, predicates=predicates, uuid_time_filter=self.date_to_range_filter(**kwargs), ) docs = [ ( Document( page_content=result[client.SEARCH_RESULT_CONTENTS_IDX], metadata=result[client.SEARCH_RESULT_METADATA_IDX], ), result[client.SEARCH_RESULT_DISTANCE_IDX], ) for result in results ] return docs
[docs] async def asimilarity_search_with_score_by_vector( self, embedding: Optional[List[float]], k: int = 4, filter: Optional[Union[dict, list]] = None, predicates: Optional[Predicates] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: try: from timescale_vector import client except ImportError: raise ImportError( "Could not import timescale_vector python package. " "Please install it with `pip install timescale-vector`." ) results = await self.async_client.search( embedding, limit=k, filter=filter, predicates=predicates, uuid_time_filter=self.date_to_range_filter(**kwargs), ) docs = [ ( Document( page_content=result[client.SEARCH_RESULT_CONTENTS_IDX], metadata=result[client.SEARCH_RESULT_METADATA_IDX], ), result[client.SEARCH_RESULT_DISTANCE_IDX], ) for result in results ] return docs
[docs] def similarity_search_by_vector( self, embedding: Optional[List[float]], k: int = 4, filter: Optional[Union[dict, list]] = None, predicates: Optional[Predicates] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。 返回: 返回与查询向量最相似的文档列表。 """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter, predicates=predicates, **kwargs ) return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector( self, embedding: Optional[List[float]], k: int = 4, filter: Optional[Union[dict, list]] = None, predicates: Optional[Predicates] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。 返回: 返回与查询向量最相似的文档列表。 """ docs_and_scores = await self.asimilarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter, predicates=predicates, **kwargs ) return [doc for doc, _ in docs_and_scores]
[docs] @classmethod def from_texts( cls: Type[TimescaleVector], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> TimescaleVector: """返回从文本和嵌入初始化的VectorStore。 需要Postgres连接字符串。 “可以作为参数传递 或设置TIMESCALE_SERVICE_URL环境变量。 """ embeddings = embedding.embed_documents(list(texts)) return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, collection_name=collection_name, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, )
[docs] @classmethod async def afrom_texts( cls: Type[TimescaleVector], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> TimescaleVector: """返回从文本和嵌入初始化的VectorStore。 需要Postgres连接字符串。 “可以作为参数传递 或设置TIMESCALE_SERVICE_URL环境变量。 """ embeddings = embedding.embed_documents(list(texts)) return await cls.__afrom( texts, embeddings, embedding, metadatas=metadatas, ids=ids, collection_name=collection_name, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, )
[docs] @classmethod def from_embeddings( cls, text_embeddings: List[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> TimescaleVector: """从原始文档和预生成的嵌入中构建TimescaleVector包装器。 返回从文档和嵌入初始化的VectorStore。 需要Postgres连接字符串 “可以作为参数传递 或设置TIMESCALE_SERVICE_URL环境变量。 示例: .. code-block:: python from langchain_community.vectorstores import TimescaleVector from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) tvs = TimescaleVector.from_embeddings(text_embedding_pairs, embeddings) """ texts = [t[0] for t in text_embeddings] embeddings = [t[1] for t in text_embeddings] return cls.__from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, collection_name=collection_name, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, )
[docs] @classmethod async def afrom_embeddings( cls, text_embeddings: List[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> TimescaleVector: """从原始文档和预生成的嵌入中构建TimescaleVector包装器。 返回从文档和嵌入初始化的VectorStore。 需要Postgres连接字符串 “可以作为参数传递 或设置TIMESCALE_SERVICE_URL环境变量。 示例: .. code-block:: python from langchain_community.vectorstores import TimescaleVector from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) tvs = TimescaleVector.from_embeddings(text_embedding_pairs, embeddings) """ texts = [t[0] for t in text_embeddings] embeddings = [t[1] for t in text_embeddings] return await cls.__afrom( texts, embeddings, embedding, metadatas=metadatas, ids=ids, collection_name=collection_name, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, )
[docs] @classmethod def from_existing_index( cls: Type[TimescaleVector], embedding: Embeddings, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, pre_delete_collection: bool = False, **kwargs: Any, ) -> TimescaleVector: """获取现有TimescaleVector存储的实例。此方法将返回存储的实例,而不会插入任何新的嵌入。 """ service_url = cls.get_service_url(kwargs) store = cls( service_url=service_url, collection_name=collection_name, embedding=embedding, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, ) return store
[docs] @classmethod def get_service_url(cls, kwargs: Dict[str, Any]) -> str: service_url: str = get_from_dict_or_env( data=kwargs, key="service_url", env_key="TIMESCALE_SERVICE_URL", ) if not service_url: raise ValueError( "Postgres connection string is required" "Either pass it as a parameter" "or set the TIMESCALE_SERVICE_URL environment variable." ) return service_url
[docs] @classmethod def service_url_from_db_params( cls, host: str, port: int, database: str, user: str, password: str, ) -> str: """从数据库参数返回连接字符串。""" return f"postgresql://{user}:{password}@{host}:{port}/{database}"
def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - 向量存储中使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 """ if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided # in vectorstore constructor if self._distance_strategy == DistanceStrategy.COSINE: return self._cosine_relevance_score_fn elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: return self._euclidean_relevance_score_fn elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return self._max_inner_product_relevance_score_fn else: raise ValueError( "No supported normalization function" f" for distance_strategy of {self._distance_strategy}." "Consider providing relevance_score_fn to TimescaleVector constructor." )
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """根据向量ID或其他条件删除。 参数: ids:要删除的ID列表。 **kwargs:子类可能使用的其他关键字参数。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ if ids is None: raise ValueError("No ids provided to delete.") self.sync_client.delete_by_ids(ids) return True
# todo should this be part of delete|()?
[docs] def delete_by_metadata( self, filter: Union[Dict[str, str], List[Dict[str, str]]], **kwargs: Any ) -> Optional[bool]: """根据向量ID或其他条件删除。 参数: ids:要删除的ID列表。 **kwargs:子类可能使用的其他关键字参数。 返回: Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。 """ self.sync_client.delete_by_metadata(filter) return True
class IndexType(str, enum.Enum): """支持的索引类型的枚举器""" TIMESCALE_VECTOR = "tsv" PGVECTOR_IVFFLAT = "ivfflat" PGVECTOR_HNSW = "hnsw" DEFAULT_INDEX_TYPE = IndexType.TIMESCALE_VECTOR
[docs] def create_index( self, index_type: Union[IndexType, str] = DEFAULT_INDEX_TYPE, **kwargs: Any ) -> None: try: from timescale_vector import client except ImportError: raise ImportError( "Could not import timescale_vector python package. " "Please install it with `pip install timescale-vector`." ) index_type = ( index_type.value if isinstance(index_type, self.IndexType) else index_type ) if index_type == self.IndexType.PGVECTOR_IVFFLAT.value: self.sync_client.create_embedding_index(client.IvfflatIndex(**kwargs)) if index_type == self.IndexType.PGVECTOR_HNSW.value: self.sync_client.create_embedding_index(client.HNSWIndex(**kwargs)) if index_type == self.IndexType.TIMESCALE_VECTOR.value: self.sync_client.create_embedding_index( client.TimescaleVectorIndex(**kwargs) )
[docs] def drop_index(self) -> None: self.sync_client.drop_embedding_index()