"""VectorStore封装了一个基于Postgres-TimescaleVector数据库的数据库。"""
from __future__ import annotations
import enum
import logging
import uuid
from datetime import timedelta
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
)
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import DistanceStrategy
if TYPE_CHECKING:
from timescale_vector import Predicates
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE
ADA_TOKEN_COUNT = 1536
_LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain_store"
[docs]class TimescaleVector(VectorStore):
"""时序Postgres向量存储
要使用,您应该安装``timescale_vector`` python包。
参数:
service_url: Timescale云上的服务URL。
embedding: 任何实现`langchain.embeddings.base.Embeddings`接口的嵌入函数。
collection_name: 要使用的集合的名称。(默认值:langchain_store)
这将成为用于该集合的表名称。
distance_strategy: 要使用的距离策略。(默认值:COSINE)
pre_delete_collection: 如果为True,则如果存在,将删除该集合。(默认值:False)。用于测试。
示例:
.. code-block:: python
from langchain_community.vectorstores import TimescaleVector
from langchain_community.embeddings.openai import OpenAIEmbeddings
SERVICE_URL = "postgres://tsdbadmin:<password>@<id>.tsdb.cloud.timescale.com:<port>/tsdb?sslmode=require"
COLLECTION_NAME = "state_of_the_union_test"
embeddings = OpenAIEmbeddings()
vectorestore = TimescaleVector.from_documents(
embedding=embeddings,
documents=docs,
collection_name=COLLECTION_NAME,
service_url=SERVICE_URL,
)""" # noqa: E501
[docs] def __init__(
self,
service_url: str,
embedding: Embeddings,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
num_dimensions: int = ADA_TOKEN_COUNT,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
pre_delete_collection: bool = False,
logger: Optional[logging.Logger] = None,
relevance_score_fn: Optional[Callable[[float], float]] = None,
time_partition_interval: Optional[timedelta] = None,
**kwargs: Any,
) -> None:
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
self.service_url = service_url
self.embedding = embedding
self.collection_name = collection_name
self.num_dimensions = num_dimensions
self._distance_strategy = distance_strategy
self.pre_delete_collection = pre_delete_collection
self.logger = logger or logging.getLogger(__name__)
self.override_relevance_score_fn = relevance_score_fn
self._time_partition_interval = time_partition_interval
self.sync_client = client.Sync(
self.service_url,
self.collection_name,
self.num_dimensions,
self._distance_strategy.value.lower(),
time_partition_interval=self._time_partition_interval,
**kwargs,
)
self.async_client = client.Async(
self.service_url,
self.collection_name,
self.num_dimensions,
self._distance_strategy.value.lower(),
time_partition_interval=self._time_partition_interval,
**kwargs,
)
self.__post_init__()
def __post_init__(
self,
) -> None:
"""
初始化商店。
"""
self.sync_client.create_tables()
if self.pre_delete_collection:
self.sync_client.delete_all()
@property
def embeddings(self) -> Embeddings:
return self.embedding
[docs] def drop_tables(self) -> None:
self.sync_client.drop_table()
@classmethod
def __from(
cls,
texts: List[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
service_url: Optional[str] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
num_dimensions = len(embeddings[0])
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
if service_url is None:
service_url = cls.get_service_url(kwargs)
store = cls(
service_url=service_url,
num_dimensions=num_dimensions,
collection_name=collection_name,
embedding=embedding,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
store.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
return store
@classmethod
async def __afrom(
cls,
texts: List[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
service_url: Optional[str] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
num_dimensions = len(embeddings[0])
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
if service_url is None:
service_url = cls.get_service_url(kwargs)
store = cls(
service_url=service_url,
num_dimensions=num_dimensions,
collection_name=collection_name,
embedding=embedding,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
await store.aadd_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
return store
[docs] def add_embeddings(
self,
texts: Iterable[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""将嵌入添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
embeddings:嵌入向量的列表的列表。
metadatas:与文本相关联的元数据列表。
kwargs:向量存储特定参数。
"""
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
records = list(zip(ids, metadatas, texts, embeddings))
self.sync_client.upsert(records)
return ids
[docs] async def aadd_embeddings(
self,
texts: Iterable[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""将嵌入添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
embeddings:嵌入向量的列表的列表。
metadatas:与文本相关联的元数据列表。
kwargs:向量存储特定参数。
"""
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
records = list(zip(ids, metadatas, texts, embeddings))
await self.async_client.upsert(records)
return ids
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
kwargs:向量存储特定参数
返回:
将文本添加到向量存储中的ID列表。
"""
embeddings = self.embedding.embed_documents(list(texts))
return self.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
[docs] async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
kwargs:向量存储特定参数
返回:
将文本添加到向量存储中的ID列表。
"""
embeddings = self.embedding.embed_documents(list(texts))
return await self.aadd_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
def _embed_query(self, query: str) -> Optional[List[float]]:
# an empty query should not be embedded
if query is None or query == "" or query.isspace():
return None
else:
return self.embedding.embed_query(query)
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Document]:
"""使用TimescaleVector进行相似性搜索,带有距离。
参数:
query (str): 要搜索的查询文本。
k (int): 要返回的结果数量。默认为4。
filter (Optional[Dict[str, str]]): 按元数据进行过滤。默认为None。
返回:
与查询最相似的文档列表。
"""
embedding = self._embed_query(query)
return self.similarity_search_by_vector(
embedding=embedding,
k=k,
filter=filter,
predicates=predicates,
**kwargs,
)
[docs] async def asimilarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Document]:
"""使用TimescaleVector进行相似性搜索,带有距离。
参数:
query (str): 要搜索的查询文本。
k (int): 要返回的结果数量。默认为4。
filter (Optional[Dict[str, str]]): 按元数据进行过滤。默认为None。
返回:
与查询最相似的文档列表。
"""
embedding = self._embed_query(query)
return await self.asimilarity_search_by_vector(
embedding=embedding,
k=k,
filter=filter,
predicates=predicates,
**kwargs,
)
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找相似文档的文本。
k:要返回的文档数量。默认为4。
filter(可选[Dict[str,str]]):按元数据过滤。默认为无。
返回:
返回与查询最相似的文档列表,以及每个文档的得分。
"""
embedding = self._embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding=embedding,
k=k,
filter=filter,
predicates=predicates,
**kwargs,
)
return docs
[docs] async def asimilarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找相似文档的文本。
k:要返回的文档数量。默认为4。
filter(可选[Dict[str,str]]):按元数据过滤。默认为无。
返回:
返回与查询最相似的文档列表,以及每个文档的得分。
"""
embedding = self._embed_query(query)
return await self.asimilarity_search_with_score_by_vector(
embedding=embedding,
k=k,
filter=filter,
predicates=predicates,
**kwargs,
)
[docs] def date_to_range_filter(self, **kwargs: Any) -> Any:
constructor_args = {
key: kwargs[key]
for key in [
"start_date",
"end_date",
"time_delta",
"start_inclusive",
"end_inclusive",
]
if key in kwargs
}
if not constructor_args or len(constructor_args) == 0:
return None
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
return client.UUIDTimeRange(**constructor_args)
[docs] def similarity_search_with_score_by_vector(
self,
embedding: Optional[List[float]],
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
results = self.sync_client.search(
embedding,
limit=k,
filter=filter,
predicates=predicates,
uuid_time_filter=self.date_to_range_filter(**kwargs),
)
docs = [
(
Document(
page_content=result[client.SEARCH_RESULT_CONTENTS_IDX],
metadata=result[client.SEARCH_RESULT_METADATA_IDX],
),
result[client.SEARCH_RESULT_DISTANCE_IDX],
)
for result in results
]
return docs
[docs] async def asimilarity_search_with_score_by_vector(
self,
embedding: Optional[List[float]],
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
results = await self.async_client.search(
embedding,
limit=k,
filter=filter,
predicates=predicates,
uuid_time_filter=self.date_to_range_filter(**kwargs),
)
docs = [
(
Document(
page_content=result[client.SEARCH_RESULT_CONTENTS_IDX],
metadata=result[client.SEARCH_RESULT_METADATA_IDX],
),
result[client.SEARCH_RESULT_DISTANCE_IDX],
)
for result in results
]
return docs
[docs] def similarity_search_by_vector(
self,
embedding: Optional[List[float]],
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。
返回:
返回与查询向量最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter, predicates=predicates, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] async def asimilarity_search_by_vector(
self,
embedding: Optional[List[float]],
k: int = 4,
filter: Optional[Union[dict, list]] = None,
predicates: Optional[Predicates] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。
返回:
返回与查询向量最相似的文档列表。
"""
docs_and_scores = await self.asimilarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter, predicates=predicates, **kwargs
)
return [doc for doc, _ in docs_and_scores]
[docs] @classmethod
def from_texts(
cls: Type[TimescaleVector],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""返回从文本和嵌入初始化的VectorStore。
需要Postgres连接字符串。
“可以作为参数传递
或设置TIMESCALE_SERVICE_URL环境变量。
"""
embeddings = embedding.embed_documents(list(texts))
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
[docs] @classmethod
async def afrom_texts(
cls: Type[TimescaleVector],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""返回从文本和嵌入初始化的VectorStore。
需要Postgres连接字符串。
“可以作为参数传递
或设置TIMESCALE_SERVICE_URL环境变量。
"""
embeddings = embedding.embed_documents(list(texts))
return await cls.__afrom(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
[docs] @classmethod
def from_embeddings(
cls,
text_embeddings: List[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""从原始文档和预生成的嵌入中构建TimescaleVector包装器。
返回从文档和嵌入初始化的VectorStore。
需要Postgres连接字符串
“可以作为参数传递
或设置TIMESCALE_SERVICE_URL环境变量。
示例:
.. code-block:: python
from langchain_community.vectorstores import TimescaleVector
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
tvs = TimescaleVector.from_embeddings(text_embedding_pairs, embeddings)
"""
texts = [t[0] for t in text_embeddings]
embeddings = [t[1] for t in text_embeddings]
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
[docs] @classmethod
async def afrom_embeddings(
cls,
text_embeddings: List[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""从原始文档和预生成的嵌入中构建TimescaleVector包装器。
返回从文档和嵌入初始化的VectorStore。
需要Postgres连接字符串
“可以作为参数传递
或设置TIMESCALE_SERVICE_URL环境变量。
示例:
.. code-block:: python
from langchain_community.vectorstores import TimescaleVector
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
tvs = TimescaleVector.from_embeddings(text_embedding_pairs, embeddings)
"""
texts = [t[0] for t in text_embeddings]
embeddings = [t[1] for t in text_embeddings]
return await cls.__afrom(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
[docs] @classmethod
def from_existing_index(
cls: Type[TimescaleVector],
embedding: Embeddings,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> TimescaleVector:
"""获取现有TimescaleVector存储的实例。此方法将返回存储的实例,而不会插入任何新的嵌入。
"""
service_url = cls.get_service_url(kwargs)
store = cls(
service_url=service_url,
collection_name=collection_name,
embedding=embedding,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
)
return store
[docs] @classmethod
def get_service_url(cls, kwargs: Dict[str, Any]) -> str:
service_url: str = get_from_dict_or_env(
data=kwargs,
key="service_url",
env_key="TIMESCALE_SERVICE_URL",
)
if not service_url:
raise ValueError(
"Postgres connection string is required"
"Either pass it as a parameter"
"or set the TIMESCALE_SERVICE_URL environment variable."
)
return service_url
[docs] @classmethod
def service_url_from_db_params(
cls,
host: str,
port: int,
database: str,
user: str,
password: str,
) -> str:
"""从数据库参数返回连接字符串。"""
return f"postgresql://{user}:{password}@{host}:{port}/{database}"
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,取决于一些因素,包括:
- 向量存储中使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!)
- 嵌入的维度
- 等等。
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
# Default strategy is to rely on distance strategy provided
# in vectorstore constructor
if self._distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
return self._euclidean_relevance_score_fn
elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
else:
raise ValueError(
"No supported normalization function"
f" for distance_strategy of {self._distance_strategy}."
"Consider providing relevance_score_fn to TimescaleVector constructor."
)
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""根据向量ID或其他条件删除。
参数:
ids:要删除的ID列表。
**kwargs:子类可能使用的其他关键字参数。
返回:
Optional[bool]:如果删除成功则为True,否则为False,如果未实现则为None。
"""
if ids is None:
raise ValueError("No ids provided to delete.")
self.sync_client.delete_by_ids(ids)
return True
# todo should this be part of delete|()?
class IndexType(str, enum.Enum):
"""支持的索引类型的枚举器"""
TIMESCALE_VECTOR = "tsv"
PGVECTOR_IVFFLAT = "ivfflat"
PGVECTOR_HNSW = "hnsw"
DEFAULT_INDEX_TYPE = IndexType.TIMESCALE_VECTOR
[docs] def create_index(
self, index_type: Union[IndexType, str] = DEFAULT_INDEX_TYPE, **kwargs: Any
) -> None:
try:
from timescale_vector import client
except ImportError:
raise ImportError(
"Could not import timescale_vector python package. "
"Please install it with `pip install timescale-vector`."
)
index_type = (
index_type.value if isinstance(index_type, self.IndexType) else index_type
)
if index_type == self.IndexType.PGVECTOR_IVFFLAT.value:
self.sync_client.create_embedding_index(client.IvfflatIndex(**kwargs))
if index_type == self.IndexType.PGVECTOR_HNSW.value:
self.sync_client.create_embedding_index(client.HNSWIndex(**kwargs))
if index_type == self.IndexType.TIMESCALE_VECTOR.value:
self.sync_client.create_embedding_index(
client.TimescaleVectorIndex(**kwargs)
)
[docs] def drop_index(self) -> None:
self.sync_client.drop_embedding_index()