from __future__ import annotations
import contextlib
import enum
import json
import logging
import uuid
from typing import (
Any,
Callable,
Dict,
Generator,
Iterable,
List,
Optional,
Tuple,
Type,
)
import numpy as np
import sqlalchemy
from langchain_core._api import deprecated, warn_deprecated
from sqlalchemy import SQLColumnExpression, delete, func
from sqlalchemy.dialects.postgresql import JSON, JSONB, UUID
from sqlalchemy.orm import Session, relationship
try:
from sqlalchemy.orm import declarative_base
except ImportError:
from sqlalchemy.ext.declarative import declarative_base
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore
from langchain_community.vectorstores.utils import maximal_marginal_relevance
[docs]class DistanceStrategy(str, enum.Enum):
"""距离策略的枚举器。"""
EUCLIDEAN = "l2"
COSINE = "cosine"
MAX_INNER_PRODUCT = "inner"
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE
Base = declarative_base() # type: Any
_LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain"
[docs]class BaseModel(Base):
"""SQL存储的基本模型。"""
__abstract__ = True
uuid = sqlalchemy.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
_classes: Any = None
COMPARISONS_TO_NATIVE = {
"$eq": "==",
"$ne": "!=",
"$lt": "<",
"$lte": "<=",
"$gt": ">",
"$gte": ">=",
}
SPECIAL_CASED_OPERATORS = {
"$in",
"$nin",
"$between",
}
TEXT_OPERATORS = {
"$like",
"$ilike",
}
LOGICAL_OPERATORS = {"$and", "$or"}
SUPPORTED_OPERATORS = (
set(COMPARISONS_TO_NATIVE)
.union(TEXT_OPERATORS)
.union(LOGICAL_OPERATORS)
.union(SPECIAL_CASED_OPERATORS)
)
def _get_embedding_collection_store(
vector_dimension: Optional[int] = None, *, use_jsonb: bool = True
) -> Any:
global _classes
if _classes is not None:
return _classes
from pgvector.sqlalchemy import Vector
class CollectionStore(BaseModel):
"""集合存储。"""
__tablename__ = "langchain_pg_collection"
name = sqlalchemy.Column(sqlalchemy.String)
cmetadata = sqlalchemy.Column(JSON)
embeddings = relationship(
"EmbeddingStore",
back_populates="collection",
passive_deletes=True,
)
@classmethod
def get_by_name(
cls, session: Session, name: str
) -> Optional["CollectionStore"]:
return session.query(cls).filter(cls.name == name).first() # type: ignore
@classmethod
def get_or_create(
cls,
session: Session,
name: str,
cmetadata: Optional[dict] = None,
) -> Tuple["CollectionStore", bool]:
"""获取或创建一个集合。
返回[Collection, bool],其中bool为True,如果集合被创建。
""" # noqa: E501
created = False
collection = cls.get_by_name(session, name)
if collection:
return collection, created
collection = cls(name=name, cmetadata=cmetadata)
session.add(collection)
session.commit()
created = True
return collection, created
if use_jsonb:
# TODO(PRIOR TO LANDING): Create a gin index on the cmetadata field
class EmbeddingStore(BaseModel):
"""嵌入式存储。"""
__tablename__ = "langchain_pg_embedding"
collection_id = sqlalchemy.Column(
UUID(as_uuid=True),
sqlalchemy.ForeignKey(
f"{CollectionStore.__tablename__}.uuid",
ondelete="CASCADE",
),
)
collection = relationship(CollectionStore, back_populates="embeddings")
embedding: Vector = sqlalchemy.Column(Vector(vector_dimension))
document = sqlalchemy.Column(sqlalchemy.String, nullable=True)
cmetadata = sqlalchemy.Column(JSONB, nullable=True)
# custom_id : any user defined id
custom_id = sqlalchemy.Column(sqlalchemy.String, nullable=True)
__table_args__ = (
sqlalchemy.Index(
"ix_cmetadata_gin",
"cmetadata",
postgresql_using="gin",
postgresql_ops={"cmetadata": "jsonb_path_ops"},
),
)
else:
# For backwards comaptibilty with older versions of pgvector
# This should be removed in the future (remove during migration)
class EmbeddingStore(BaseModel): # type: ignore[no-redef]
"""嵌入式存储。"""
__tablename__ = "langchain_pg_embedding"
collection_id = sqlalchemy.Column(
UUID(as_uuid=True),
sqlalchemy.ForeignKey(
f"{CollectionStore.__tablename__}.uuid",
ondelete="CASCADE",
),
)
collection = relationship(CollectionStore, back_populates="embeddings")
embedding: Vector = sqlalchemy.Column(Vector(vector_dimension))
document = sqlalchemy.Column(sqlalchemy.String, nullable=True)
cmetadata = sqlalchemy.Column(JSON, nullable=True)
# custom_id : any user defined id
custom_id = sqlalchemy.Column(sqlalchemy.String, nullable=True)
_classes = (EmbeddingStore, CollectionStore)
return _classes
def _results_to_docs(docs_and_scores: Any) -> List[Document]:
"""从文档和分数中返回文档。"""
return [doc for doc, _ in docs_and_scores]
[docs]@deprecated(
since="0.0.31",
message=(
"This class is pending deprecation and may be removed in a future version. "
"You can swap to using the `PGVector`"
" implementation in `langchain_postgres`. "
"Please read the guidelines in the doc-string of this class "
"to follow prior to migrating as there are some differences "
"between the implementations. "
"See https://github.com/langchain-ai/langchain-postgres for details about"
"the new implementation."
),
alternative="from langchain_postgres import PGVector;",
pending=True,
)
class PGVector(VectorStore):
"""`Postgres`/`PGVector` 向量存储。
**已弃用** :此类正在等待弃用,并且可能不会收到更新。在`langchain_postgres`中有一个改进版本的类`PGVector`可用。请改用该类。
迁移时请记住:
* 新实现与psycopg3一起工作,而不是与psycopg2一起工作
(此实现不与psycopg3一起工作)。
* 过滤语法已更改为使用$前缀运算符进行JSONB
元数据字段的过滤。(新实现仅使用JSONB字段进行元数据)
* 新实现对现有实现进行了一些模式更改以解决问题。
因此,您需要重新创建表并重新索引数据,否则进行手动
迁移。
要使用,您应该安装``pgvector`` python包。
参数:
connection_string: Postgres连接字符串。
embedding_function: 任何实现
`langchain.embeddings.base.Embeddings` 接口的嵌入函数。
embedding_length: 嵌入向量的长度。(默认值:None)
注意:这不是强制性的。定义它将防止其他大小的向量
被添加到嵌入表中,但是,如果没有定义它,
则无法对嵌入进行索引。
collection_name: 要使用的集合名称。(默认值:langchain)
注意:这不是表的名称,而是集合的名称。
在初始化存储时将创建表(如果不存在)。
因此,请确保用户具有创建表的权限。
distance_strategy: 要使用的距离策略。(默认值:COSINE)
pre_delete_collection: 如果为True,则如果存在,将删除集合。
(默认值:False)。用于测试。
engine_args: SQLAlchemy的创建引擎参数。
use_jsonb: 使用JSONB而不是JSON进行元数据。(默认值:True)
强烈建议不要使用JSON,因为它不够高效
用于查询。
这里提供了与旧版本向后兼容的JSON,将来会被移除。
create_extension: 如果为True,则如果不存在,将创建向量扩展。
当使用只读数据库时,禁用创建是有用的。
示例:
.. code-block:: python
from langchain_community.vectorstores import PGVector
from langchain_community.embeddings.openai import OpenAIEmbeddings
CONNECTION_STRING = "postgresql+psycopg2://hwc@localhost:5432/test3"
COLLECTION_NAME = "state_of_the_union_test"
embeddings = OpenAIEmbeddings()
vectorestore = PGVector.from_documents(
embedding=embeddings,
documents=docs,
collection_name=COLLECTION_NAME,
connection_string=CONNECTION_STRING,
use_jsonb=True,
)
"""
[docs] def __init__(
self,
connection_string: str,
embedding_function: Embeddings,
embedding_length: Optional[int] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
collection_metadata: Optional[dict] = None,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
pre_delete_collection: bool = False,
logger: Optional[logging.Logger] = None,
relevance_score_fn: Optional[Callable[[float], float]] = None,
*,
connection: Optional[sqlalchemy.engine.Connection] = None,
engine_args: Optional[dict[str, Any]] = None,
use_jsonb: bool = False,
create_extension: bool = True,
) -> None:
"""初始化PGVector存储。"""
self.connection_string = connection_string
self.embedding_function = embedding_function
self._embedding_length = embedding_length
self.collection_name = collection_name
self.collection_metadata = collection_metadata
self._distance_strategy = distance_strategy
self.pre_delete_collection = pre_delete_collection
self.logger = logger or logging.getLogger(__name__)
self.override_relevance_score_fn = relevance_score_fn
self.engine_args = engine_args or {}
self._bind = connection if connection else self._create_engine()
self.use_jsonb = use_jsonb
self.create_extension = create_extension
if not use_jsonb:
# Replace with a deprecation warning.
warn_deprecated(
"0.0.29",
pending=True,
message=(
"Please use JSONB instead of JSON for metadata. "
"This change will allow for more efficient querying that "
"involves filtering based on metadata."
"Please note that filtering operators have been changed "
"when using JSOB metadata to be prefixed with a $ sign "
"to avoid name collisions with columns. "
"If you're using an existing database, you will need to create a"
"db migration for your metadata column to be JSONB and update your "
"queries to use the new operators. "
),
alternative=(
"Instantiate with use_jsonb=True to use JSONB instead "
"of JSON for metadata."
),
)
self.__post_init__()
def __post_init__(
self,
) -> None:
"""初始化商店。"""
if self.create_extension:
self.create_vector_extension()
EmbeddingStore, CollectionStore = _get_embedding_collection_store(
self._embedding_length, use_jsonb=self.use_jsonb
)
self.CollectionStore = CollectionStore
self.EmbeddingStore = EmbeddingStore
self.create_tables_if_not_exists()
self.create_collection()
def __del__(self) -> None:
if isinstance(self._bind, sqlalchemy.engine.Connection):
self._bind.close()
@property
def embeddings(self) -> Embeddings:
return self.embedding_function
def _create_engine(self) -> sqlalchemy.engine.Engine:
return sqlalchemy.create_engine(url=self.connection_string, **self.engine_args)
[docs] def create_vector_extension(self) -> None:
try:
with Session(self._bind) as session: # type: ignore[arg-type]
# The advisor lock fixes issue arising from concurrent
# creation of the vector extension.
# https://github.com/langchain-ai/langchain/issues/12933
# For more information see:
# https://www.postgresql.org/docs/16/explicit-locking.html#ADVISORY-LOCKS
statement = sqlalchemy.text(
"BEGIN;"
"SELECT pg_advisory_xact_lock(1573678846307946496);"
"CREATE EXTENSION IF NOT EXISTS vector;"
"COMMIT;"
)
session.execute(statement)
session.commit()
except Exception as e:
raise Exception(f"Failed to create vector extension: {e}") from e
[docs] def create_tables_if_not_exists(self) -> None:
with Session(self._bind) as session, session.begin(): # type: ignore[arg-type]
Base.metadata.create_all(session.get_bind())
[docs] def drop_tables(self) -> None:
with Session(self._bind) as session, session.begin(): # type: ignore[arg-type]
Base.metadata.drop_all(session.get_bind())
[docs] def create_collection(self) -> None:
if self.pre_delete_collection:
self.delete_collection()
with Session(self._bind) as session: # type: ignore[arg-type]
self.CollectionStore.get_or_create(
session, self.collection_name, cmetadata=self.collection_metadata
)
[docs] def delete_collection(self) -> None:
self.logger.debug("Trying to delete collection")
with Session(self._bind) as session: # type: ignore[arg-type]
collection = self.get_collection(session)
if not collection:
self.logger.warning("Collection not found")
return
session.delete(collection)
session.commit()
@contextlib.contextmanager
def _make_session(self) -> Generator[Session, None, None]:
"""为会话创建一个上下文管理器,绑定到_conn字符串。"""
yield Session(self._bind) # type: ignore[arg-type]
[docs] def delete(
self,
ids: Optional[List[str]] = None,
collection_only: bool = False,
**kwargs: Any,
) -> None:
"""根据id或uuid删除向量。
参数:
ids:要删除的id列表。
collection_only:仅删除集合中的id。
"""
with Session(self._bind) as session: # type: ignore[arg-type]
if ids is not None:
self.logger.debug(
"Trying to delete vectors by ids (represented by the model "
"using the custom ids field)"
)
stmt = delete(self.EmbeddingStore)
if collection_only:
collection = self.get_collection(session)
if not collection:
self.logger.warning("Collection not found")
return
stmt = stmt.where(
self.EmbeddingStore.collection_id == collection.uuid
)
stmt = stmt.where(self.EmbeddingStore.custom_id.in_(ids))
session.execute(stmt)
session.commit()
[docs] def get_collection(self, session: Session) -> Any:
return self.CollectionStore.get_by_name(session, self.collection_name)
@classmethod
def _from(
cls,
texts: List[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
connection_string: Optional[str] = None,
pre_delete_collection: bool = False,
*,
use_jsonb: bool = False,
**kwargs: Any,
) -> PGVector:
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
if connection_string is None:
connection_string = cls.get_connection_string(kwargs)
store = cls(
connection_string=connection_string,
collection_name=collection_name,
embedding_function=embedding,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
use_jsonb=use_jsonb,
**kwargs,
)
store.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
return store
[docs] def add_embeddings(
self,
texts: Iterable[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""将嵌入添加到向量存储中。
参数:
texts:要添加到向量存储中的字符串的可迭代对象。
embeddings:嵌入向量的列表的列表。
metadatas:与文本相关联的元数据列表。
kwargs:向量存储特定参数。
"""
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
if not metadatas:
metadatas = [{} for _ in texts]
with Session(self._bind) as session: # type: ignore[arg-type]
collection = self.get_collection(session)
if not collection:
raise ValueError("Collection not found")
documents = []
for text, metadata, embedding, id in zip(texts, metadatas, embeddings, ids):
embedding_store = self.EmbeddingStore(
embedding=embedding,
document=text,
cmetadata=metadata,
custom_id=id,
collection_id=collection.uuid,
)
documents.append(embedding_store)
session.bulk_save_objects(documents)
session.commit()
return ids
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""运行更多文本通过嵌入并添加到向量存储。
参数:
texts:要添加到向量存储的字符串的可迭代对象。
metadatas:与文本相关联的元数据的可选列表。
kwargs:向量存储特定参数
返回:
将文本添加到向量存储中的ID列表。
"""
embeddings = self.embedding_function.embed_documents(list(texts))
return self.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
[docs] def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[dict] = None,
**kwargs: Any,
) -> List[Document]:
"""使用PGVector进行带距离的相似性搜索。
参数:
query(str):要搜索的查询文本。
k(int):要返回的结果数量。默认为4。
filter(Optional[Dict[str, str]]):按元数据进行过滤。默认为None。
返回:
与查询最相似的文档列表。
"""
embedding = self.embedding_function.embed_query(text=query)
return self.similarity_search_by_vector(
embedding=embedding,
k=k,
filter=filter,
)
[docs] def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[dict] = None,
) -> List[Tuple[Document, float]]:
"""返回与查询最相似的文档。
参数:
query:要查找与之相似的文档的文本。
k:要返回的文档数量。默认为4。
filter(可选[Dict[str,str]]):按元数据过滤。默认为None。
返回:
与查询最相似的文档列表,以及每个文档的分数。
"""
embedding = self.embedding_function.embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter
)
return docs
@property
def distance_strategy(self) -> Any:
if self._distance_strategy == DistanceStrategy.EUCLIDEAN:
return self.EmbeddingStore.embedding.l2_distance
elif self._distance_strategy == DistanceStrategy.COSINE:
return self.EmbeddingStore.embedding.cosine_distance
elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self.EmbeddingStore.embedding.max_inner_product
else:
raise ValueError(
f"Got unexpected value for distance: {self._distance_strategy}. "
f"Should be one of {', '.join([ds.value for ds in DistanceStrategy])}."
)
[docs] def similarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[dict] = None,
) -> List[Tuple[Document, float]]:
results = self._query_collection(embedding=embedding, k=k, filter=filter)
return self._results_to_docs_and_scores(results)
def _results_to_docs_and_scores(self, results: Any) -> List[Tuple[Document, float]]:
"""返回结果中的文档和分数。"""
docs = [
(
Document(
page_content=result.EmbeddingStore.document,
metadata=result.EmbeddingStore.cmetadata,
),
result.distance if self.embedding_function is not None else None,
)
for result in results
]
return docs
def _handle_field_filter(
self,
field: str,
value: Any,
) -> SQLColumnExpression:
"""为特定字段创建过滤器。
参数:
field:字段名称
value:过滤的值
如果直接提供,则这将是一个相等过滤器
如果提供为字典,则这将是一个过滤器,键将是运算符,值将是要过滤的值
返回:
sqlalchemy表达式
"""
if not isinstance(field, str):
raise ValueError(
f"field should be a string but got: {type(field)} with value: {field}"
)
if field.startswith("$"):
raise ValueError(
f"Invalid filter condition. Expected a field but got an operator: "
f"{field}"
)
# Allow [a-zA-Z0-9_], disallow $ for now until we support escape characters
if not field.isidentifier():
raise ValueError(
f"Invalid field name: {field}. Expected a valid identifier."
)
if isinstance(value, dict):
# This is a filter specification
if len(value) != 1:
raise ValueError(
"Invalid filter condition. Expected a value which "
"is a dictionary with a single key that corresponds to an operator "
f"but got a dictionary with {len(value)} keys. The first few "
f"keys are: {list(value.keys())[:3]}"
)
operator, filter_value = list(value.items())[0]
# Verify that that operator is an operator
if operator not in SUPPORTED_OPERATORS:
raise ValueError(
f"Invalid operator: {operator}. "
f"Expected one of {SUPPORTED_OPERATORS}"
)
else: # Then we assume an equality operator
operator = "$eq"
filter_value = value
if operator in COMPARISONS_TO_NATIVE:
# Then we implement an equality filter
# native is trusted input
native = COMPARISONS_TO_NATIVE[operator]
return func.jsonb_path_match(
self.EmbeddingStore.cmetadata,
f"$.{field} {native} $value",
json.dumps({"value": filter_value}),
)
elif operator == "$between":
# Use AND with two comparisons
low, high = filter_value
lower_bound = func.jsonb_path_match(
self.EmbeddingStore.cmetadata,
f"$.{field} >= $value",
json.dumps({"value": low}),
)
upper_bound = func.jsonb_path_match(
self.EmbeddingStore.cmetadata,
f"$.{field} <= $value",
json.dumps({"value": high}),
)
return sqlalchemy.and_(lower_bound, upper_bound)
elif operator in {"$in", "$nin", "$like", "$ilike"}:
# We'll do force coercion to text
if operator in {"$in", "$nin"}:
for val in filter_value:
if not isinstance(val, (str, int, float)):
raise NotImplementedError(
f"Unsupported type: {type(val)} for value: {val}"
)
queried_field = self.EmbeddingStore.cmetadata[field].astext
if operator in {"$in"}:
return queried_field.in_([str(val) for val in filter_value])
elif operator in {"$nin"}:
return queried_field.nin_([str(val) for val in filter_value])
elif operator in {"$like"}:
return queried_field.like(filter_value)
elif operator in {"$ilike"}:
return queried_field.ilike(filter_value)
else:
raise NotImplementedError()
else:
raise NotImplementedError()
def _create_filter_clause_deprecated(self, key, value): # type: ignore[no-untyped-def]
"""已弃用的功能。
这是为了与基于JSON的元数据模式向后兼容而设计的。
它使用了不正确的运算符语法(运算符没有以$为前缀)。
这个实现不高效,并且存在与处理数值过滤子句的方式相关的错误。
"""
IN, NIN, BETWEEN, GT, LT, NE = "in", "nin", "between", "gt", "lt", "ne"
EQ, LIKE, CONTAINS, OR, AND = "eq", "like", "contains", "or", "and"
value_case_insensitive = {k.lower(): v for k, v in value.items()}
if IN in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.in_(
value_case_insensitive[IN]
)
elif NIN in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.not_in(
value_case_insensitive[NIN]
)
elif BETWEEN in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.between(
str(value_case_insensitive[BETWEEN][0]),
str(value_case_insensitive[BETWEEN][1]),
)
elif GT in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext > str(
value_case_insensitive[GT]
)
elif LT in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext < str(
value_case_insensitive[LT]
)
elif NE in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext != str(
value_case_insensitive[NE]
)
elif EQ in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext == str(
value_case_insensitive[EQ]
)
elif LIKE in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.like(
value_case_insensitive[LIKE]
)
elif CONTAINS in map(str.lower, value):
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.contains(
value_case_insensitive[CONTAINS]
)
elif OR in map(str.lower, value):
or_clauses = [
self._create_filter_clause_deprecated(key, sub_value)
for sub_value in value_case_insensitive[OR]
]
filter_by_metadata = sqlalchemy.or_(*or_clauses)
elif AND in map(str.lower, value):
and_clauses = [
self._create_filter_clause_deprecated(key, sub_value)
for sub_value in value_case_insensitive[AND]
]
filter_by_metadata = sqlalchemy.and_(*and_clauses)
else:
filter_by_metadata = None
return filter_by_metadata
def _create_filter_clause_json_deprecated(
self, filter: Any
) -> List[SQLColumnExpression]:
"""将IR中的过滤器转换为SQL子句。
**已弃用** 该功能将来会被弃用。
它实现了对使用JSON而不是更高效的JSONB字段进行元数据查询的模式的过滤器的转换。
"""
filter_clauses = []
for key, value in filter.items():
if isinstance(value, dict):
filter_by_metadata = self._create_filter_clause_deprecated(key, value)
if filter_by_metadata is not None:
filter_clauses.append(filter_by_metadata)
else:
filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext == str(
value
)
filter_clauses.append(filter_by_metadata)
return filter_clauses
def _create_filter_clause(self, filters: Any) -> Any:
"""将LangChain IR过滤器表示转换为匹配的SQLAlchemy子句。
在顶层,我们仍然不知道我们是在处理字段还是操作符键。确定后,我们可以调用适当的逻辑来处理过滤器的创建。
参数:
filters:要应用于查询的过滤器字典。
返回:
适用于查询的SQLAlchemy子句。
"""
if isinstance(filters, dict):
if len(filters) == 1:
# The only operators allowed at the top level are $AND and $OR
# First check if an operator or a field
key, value = list(filters.items())[0]
if key.startswith("$"):
# Then it's an operator
if key.lower() not in ["$and", "$or"]:
raise ValueError(
f"Invalid filter condition. Expected $and or $or "
f"but got: {key}"
)
else:
# Then it's a field
return self._handle_field_filter(key, filters[key])
# Here we handle the $and and $or operators
if not isinstance(value, list):
raise ValueError(
f"Expected a list, but got {type(value)} for value: {value}"
)
if key.lower() == "$and":
and_ = [self._create_filter_clause(el) for el in value]
if len(and_) > 1:
return sqlalchemy.and_(*and_)
elif len(and_) == 1:
return and_[0]
else:
raise ValueError(
"Invalid filter condition. Expected a dictionary "
"but got an empty dictionary"
)
elif key.lower() == "$or":
or_ = [self._create_filter_clause(el) for el in value]
if len(or_) > 1:
return sqlalchemy.or_(*or_)
elif len(or_) == 1:
return or_[0]
else:
raise ValueError(
"Invalid filter condition. Expected a dictionary "
"but got an empty dictionary"
)
else:
raise ValueError(
f"Invalid filter condition. Expected $and or $or "
f"but got: {key}"
)
elif len(filters) > 1:
# Then all keys have to be fields (they cannot be operators)
for key in filters.keys():
if key.startswith("$"):
raise ValueError(
f"Invalid filter condition. Expected a field but got: {key}"
)
# These should all be fields and combined using an $and operator
and_ = [self._handle_field_filter(k, v) for k, v in filters.items()]
if len(and_) > 1:
return sqlalchemy.and_(*and_)
elif len(and_) == 1:
return and_[0]
else:
raise ValueError(
"Invalid filter condition. Expected a dictionary "
"but got an empty dictionary"
)
else:
raise ValueError("Got an empty dictionary for filters.")
else:
raise ValueError(
f"Invalid type: Expected a dictionary but got type: {type(filters)}"
)
def _query_collection(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, str]] = None,
) -> List[Any]:
"""查询集合。"""
with Session(self._bind) as session: # type: ignore[arg-type]
collection = self.get_collection(session)
if not collection:
raise ValueError("Collection not found")
filter_by = [self.EmbeddingStore.collection_id == collection.uuid]
if filter:
if self.use_jsonb:
filter_clauses = self._create_filter_clause(filter)
if filter_clauses is not None:
filter_by.append(filter_clauses)
else:
# Old way of doing things
filter_clauses = self._create_filter_clause_json_deprecated(filter)
filter_by.extend(filter_clauses)
_type = self.EmbeddingStore
results: List[Any] = (
session.query(
self.EmbeddingStore,
self.distance_strategy(embedding).label("distance"), # type: ignore
)
.filter(*filter_by)
.order_by(sqlalchemy.asc("distance"))
.join(
self.CollectionStore,
self.EmbeddingStore.collection_id == self.CollectionStore.uuid,
)
.limit(k)
.all()
)
return results
[docs] def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[dict] = None,
**kwargs: Any,
) -> List[Document]:
"""返回与嵌入向量最相似的文档。
参数:
embedding: 要查找与之相似的文档的嵌入。
k: 要返回的文档数量。默认为4。
filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。
返回:
返回与查询向量最相似的文档列表。
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding=embedding, k=k, filter=filter
)
return _results_to_docs(docs_and_scores)
[docs] @classmethod
def from_texts(
cls: Type[PGVector],
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
*,
use_jsonb: bool = False,
**kwargs: Any,
) -> PGVector:
"""返回从文本和嵌入初始化的VectorStore。
需要Postgres连接字符串。
"可以作为参数传递
或设置PGVECTOR_CONNECTION_STRING环境变量。
"""
embeddings = embedding.embed_documents(list(texts))
return cls._from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
use_jsonb=use_jsonb,
**kwargs,
)
[docs] @classmethod
def from_embeddings(
cls,
text_embeddings: List[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> PGVector:
"""从原始文档和预生成的嵌入构建PGVector包装器。
从文档和嵌入初始化VectorStore。
需要Postgres连接字符串
“可以作为参数传递
或设置PGVECTOR_CONNECTION_STRING环境变量。
示例:
.. code-block:: python
from langchain_community.vectorstores import PGVector
from langchain_community.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
text_embeddings = embeddings.embed_documents(texts)
text_embedding_pairs = list(zip(texts, text_embeddings))
faiss = PGVector.from_embeddings(text_embedding_pairs, embeddings)
"""
texts = [t[0] for t in text_embeddings]
embeddings = [t[1] for t in text_embeddings]
return cls._from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
**kwargs,
)
[docs] @classmethod
def from_existing_index(
cls: Type[PGVector],
embedding: Embeddings,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
pre_delete_collection: bool = False,
**kwargs: Any,
) -> PGVector:
"""获取现有PGVector存储的实例。此方法将返回存储的实例,而不会插入任何新的嵌入。
"""
connection_string = cls.get_connection_string(kwargs)
store = cls(
connection_string=connection_string,
collection_name=collection_name,
embedding_function=embedding,
distance_strategy=distance_strategy,
pre_delete_collection=pre_delete_collection,
)
return store
[docs] @classmethod
def get_connection_string(cls, kwargs: Dict[str, Any]) -> str:
connection_string: str = get_from_dict_or_env(
data=kwargs,
key="connection_string",
env_key="PGVECTOR_CONNECTION_STRING",
)
if not connection_string:
raise ValueError(
"Postgres connection string is required"
"Either pass it as a parameter"
"or set the PGVECTOR_CONNECTION_STRING environment variable."
)
return connection_string
[docs] @classmethod
def from_documents(
cls: Type[PGVector],
documents: List[Document],
embedding: Embeddings,
collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME,
distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY,
ids: Optional[List[str]] = None,
pre_delete_collection: bool = False,
*,
use_jsonb: bool = False,
**kwargs: Any,
) -> PGVector:
"""返回从文档和嵌入初始化的VectorStore。
需要Postgres连接字符串
“可以作为参数传递
或设置PGVECTOR_CONNECTION_STRING环境变量。
"""
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
connection_string = cls.get_connection_string(kwargs)
kwargs["connection_string"] = connection_string
return cls.from_texts(
texts=texts,
pre_delete_collection=pre_delete_collection,
embedding=embedding,
distance_strategy=distance_strategy,
metadatas=metadatas,
ids=ids,
collection_name=collection_name,
use_jsonb=use_jsonb,
**kwargs,
)
[docs] @classmethod
def connection_string_from_db_params(
cls,
driver: str,
host: str,
port: int,
database: str,
user: str,
password: str,
) -> str:
"""从数据库参数返回连接字符串。"""
return f"postgresql+{driver}://{user}:{password}@{host}:{port}/{database}"
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""“正确”的相关性函数可能会有所不同,取决于一些因素,包括:
- 向量存储中使用的距离/相似度度量
- 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!)
- 嵌入的维度
- 等等。
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
# Default strategy is to rely on distance strategy provided
# in vectorstore constructor
if self._distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
elif self._distance_strategy == DistanceStrategy.EUCLIDEAN:
return self._euclidean_relevance_score_fn
elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
else:
raise ValueError(
"No supported normalization function"
f" for distance_strategy of {self._distance_strategy}."
"Consider providing relevance_score_fn to PGVector constructor."
)
[docs] def max_marginal_relevance_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""使用最大边际相关性和分数返回所选文档的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding: 要查找相似文档的嵌入。
k (int): 要返回的文档数量。默认为4。
fetch_k (int): 要获取以传递给MMR算法的文档数量。默认为20。
lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。
filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。
返回:
List[Tuple[Document, float]]: 通过最大边际相关性选择的文档列表,以及每个文档的得分。
"""
results = self._query_collection(embedding=embedding, k=fetch_k, filter=filter)
embedding_list = [result.EmbeddingStore.embedding for result in results]
mmr_selected = maximal_marginal_relevance(
np.array(embedding, dtype=np.float32),
embedding_list,
k=k,
lambda_mult=lambda_mult,
)
candidates = self._results_to_docs_and_scores(results)
return [r for i, r in enumerate(candidates) if i in mmr_selected]
[docs] def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query (str): 要查找类似文档的文本。
k (int): 要返回的文档数量。默认为4。
fetch_k (int): 要获取以传递给MMR算法的文档数量。
默认为20。
lambda_mult (float): 介于0和1之间的数字,确定结果之间多样性的程度,
其中0对应于最大多样性,1对应于最小多样性。
默认为0.5。
filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。
返回:
List[Document]: 通过最大边际相关性选择的文档列表。
"""
embedding = self.embedding_function.embed_query(query)
return self.max_marginal_relevance_search_by_vector(
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
[docs] def max_marginal_relevance_search_with_score(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[dict] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""返回使用最大边际相关性和分数选择的文档。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
query (str): 要查找类似文档的文本。
k (int): 要返回的文档数量。默认为4。
fetch_k (int): 要获取以传递给MMR算法的文档数量。
默认为20。
lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度,
0对应最大多样性,1对应最小多样性。
默认为0.5。
filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。
返回:
List[Tuple[Document, float]]: 通过最大边际相关性选择的文档列表,
以及每个文档的得分。
"""
embedding = self.embedding_function.embed_query(query)
docs = self.max_marginal_relevance_search_with_score_by_vector(
embedding=embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
return docs
[docs] def max_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档到嵌入向量。
最大边际相关性优化了与查询的相似性和所选文档之间的多样性。
参数:
embedding (str): 要查找类似文档的文本。
k (int): 要返回的文档数量。默认为4。
fetch_k (int): 要获取以传递给MMR算法的文档数量。
默认为20。
lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度,
0对应最大多样性,1对应最小多样性。
默认为0.5。
filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。
返回:
List[Document]: 通过最大边际相关性选择的文档列表。
"""
docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector(
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)
return _results_to_docs(docs_and_scores)
[docs] async def amax_marginal_relevance_search_by_vector(
self,
embedding: List[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Dict[str, str]] = None,
**kwargs: Any,
) -> List[Document]:
"""返回使用最大边际相关性选择的文档。"""
# This is a temporary workaround to make the similarity search
# asynchronous. The proper solution is to make the similarity search
# asynchronous in the vector store implementations.
return await run_in_executor(
None,
self.max_marginal_relevance_search_by_vector,
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
**kwargs,
)