Source code for langchain_community.vectorstores.pgvector

from __future__ import annotations

import contextlib
import enum
import json
import logging
import uuid
from typing import (
    Any,
    Callable,
    Dict,
    Generator,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
)

import numpy as np
import sqlalchemy
from langchain_core._api import deprecated, warn_deprecated
from sqlalchemy import SQLColumnExpression, delete, func
from sqlalchemy.dialects.postgresql import JSON, JSONB, UUID
from sqlalchemy.orm import Session, relationship

try:
    from sqlalchemy.orm import declarative_base
except ImportError:
    from sqlalchemy.ext.declarative import declarative_base

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore

from langchain_community.vectorstores.utils import maximal_marginal_relevance


[docs]class DistanceStrategy(str, enum.Enum): """距离策略的枚举器。""" EUCLIDEAN = "l2" COSINE = "cosine" MAX_INNER_PRODUCT = "inner"
DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE Base = declarative_base() # type: Any _LANGCHAIN_DEFAULT_COLLECTION_NAME = "langchain"
[docs]class BaseModel(Base): """SQL存储的基本模型。""" __abstract__ = True uuid = sqlalchemy.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
_classes: Any = None COMPARISONS_TO_NATIVE = { "$eq": "==", "$ne": "!=", "$lt": "<", "$lte": "<=", "$gt": ">", "$gte": ">=", } SPECIAL_CASED_OPERATORS = { "$in", "$nin", "$between", } TEXT_OPERATORS = { "$like", "$ilike", } LOGICAL_OPERATORS = {"$and", "$or"} SUPPORTED_OPERATORS = ( set(COMPARISONS_TO_NATIVE) .union(TEXT_OPERATORS) .union(LOGICAL_OPERATORS) .union(SPECIAL_CASED_OPERATORS) ) def _get_embedding_collection_store( vector_dimension: Optional[int] = None, *, use_jsonb: bool = True ) -> Any: global _classes if _classes is not None: return _classes from pgvector.sqlalchemy import Vector class CollectionStore(BaseModel): """集合存储。""" __tablename__ = "langchain_pg_collection" name = sqlalchemy.Column(sqlalchemy.String) cmetadata = sqlalchemy.Column(JSON) embeddings = relationship( "EmbeddingStore", back_populates="collection", passive_deletes=True, ) @classmethod def get_by_name( cls, session: Session, name: str ) -> Optional["CollectionStore"]: return session.query(cls).filter(cls.name == name).first() # type: ignore @classmethod def get_or_create( cls, session: Session, name: str, cmetadata: Optional[dict] = None, ) -> Tuple["CollectionStore", bool]: """获取或创建一个集合。 返回[Collection, bool],其中bool为True,如果集合被创建。 """ # noqa: E501 created = False collection = cls.get_by_name(session, name) if collection: return collection, created collection = cls(name=name, cmetadata=cmetadata) session.add(collection) session.commit() created = True return collection, created if use_jsonb: # TODO(PRIOR TO LANDING): Create a gin index on the cmetadata field class EmbeddingStore(BaseModel): """嵌入式存储。""" __tablename__ = "langchain_pg_embedding" collection_id = sqlalchemy.Column( UUID(as_uuid=True), sqlalchemy.ForeignKey( f"{CollectionStore.__tablename__}.uuid", ondelete="CASCADE", ), ) collection = relationship(CollectionStore, back_populates="embeddings") embedding: Vector = sqlalchemy.Column(Vector(vector_dimension)) document = sqlalchemy.Column(sqlalchemy.String, nullable=True) cmetadata = sqlalchemy.Column(JSONB, nullable=True) # custom_id : any user defined id custom_id = sqlalchemy.Column(sqlalchemy.String, nullable=True) __table_args__ = ( sqlalchemy.Index( "ix_cmetadata_gin", "cmetadata", postgresql_using="gin", postgresql_ops={"cmetadata": "jsonb_path_ops"}, ), ) else: # For backwards comaptibilty with older versions of pgvector # This should be removed in the future (remove during migration) class EmbeddingStore(BaseModel): # type: ignore[no-redef] """嵌入式存储。""" __tablename__ = "langchain_pg_embedding" collection_id = sqlalchemy.Column( UUID(as_uuid=True), sqlalchemy.ForeignKey( f"{CollectionStore.__tablename__}.uuid", ondelete="CASCADE", ), ) collection = relationship(CollectionStore, back_populates="embeddings") embedding: Vector = sqlalchemy.Column(Vector(vector_dimension)) document = sqlalchemy.Column(sqlalchemy.String, nullable=True) cmetadata = sqlalchemy.Column(JSON, nullable=True) # custom_id : any user defined id custom_id = sqlalchemy.Column(sqlalchemy.String, nullable=True) _classes = (EmbeddingStore, CollectionStore) return _classes def _results_to_docs(docs_and_scores: Any) -> List[Document]: """从文档和分数中返回文档。""" return [doc for doc, _ in docs_and_scores]
[docs]@deprecated( since="0.0.31", message=( "This class is pending deprecation and may be removed in a future version. " "You can swap to using the `PGVector`" " implementation in `langchain_postgres`. " "Please read the guidelines in the doc-string of this class " "to follow prior to migrating as there are some differences " "between the implementations. " "See https://github.com/langchain-ai/langchain-postgres for details about" "the new implementation." ), alternative="from langchain_postgres import PGVector;", pending=True, ) class PGVector(VectorStore): """`Postgres`/`PGVector` 向量存储。 **已弃用** :此类正在等待弃用,并且可能不会收到更新。在`langchain_postgres`中有一个改进版本的类`PGVector`可用。请改用该类。 迁移时请记住: * 新实现与psycopg3一起工作,而不是与psycopg2一起工作 (此实现不与psycopg3一起工作)。 * 过滤语法已更改为使用$前缀运算符进行JSONB 元数据字段的过滤。(新实现仅使用JSONB字段进行元数据) * 新实现对现有实现进行了一些模式更改以解决问题。 因此,您需要重新创建表并重新索引数据,否则进行手动 迁移。 要使用,您应该安装``pgvector`` python包。 参数: connection_string: Postgres连接字符串。 embedding_function: 任何实现 `langchain.embeddings.base.Embeddings` 接口的嵌入函数。 embedding_length: 嵌入向量的长度。(默认值:None) 注意:这不是强制性的。定义它将防止其他大小的向量 被添加到嵌入表中,但是,如果没有定义它, 则无法对嵌入进行索引。 collection_name: 要使用的集合名称。(默认值:langchain) 注意:这不是表的名称,而是集合的名称。 在初始化存储时将创建表(如果不存在)。 因此,请确保用户具有创建表的权限。 distance_strategy: 要使用的距离策略。(默认值:COSINE) pre_delete_collection: 如果为True,则如果存在,将删除集合。 (默认值:False)。用于测试。 engine_args: SQLAlchemy的创建引擎参数。 use_jsonb: 使用JSONB而不是JSON进行元数据。(默认值:True) 强烈建议不要使用JSON,因为它不够高效 用于查询。 这里提供了与旧版本向后兼容的JSON,将来会被移除。 create_extension: 如果为True,则如果不存在,将创建向量扩展。 当使用只读数据库时,禁用创建是有用的。 示例: .. code-block:: python from langchain_community.vectorstores import PGVector from langchain_community.embeddings.openai import OpenAIEmbeddings CONNECTION_STRING = "postgresql+psycopg2://hwc@localhost:5432/test3" COLLECTION_NAME = "state_of_the_union_test" embeddings = OpenAIEmbeddings() vectorestore = PGVector.from_documents( embedding=embeddings, documents=docs, collection_name=COLLECTION_NAME, connection_string=CONNECTION_STRING, use_jsonb=True, ) """
[docs] def __init__( self, connection_string: str, embedding_function: Embeddings, embedding_length: Optional[int] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, collection_metadata: Optional[dict] = None, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, pre_delete_collection: bool = False, logger: Optional[logging.Logger] = None, relevance_score_fn: Optional[Callable[[float], float]] = None, *, connection: Optional[sqlalchemy.engine.Connection] = None, engine_args: Optional[dict[str, Any]] = None, use_jsonb: bool = False, create_extension: bool = True, ) -> None: """初始化PGVector存储。""" self.connection_string = connection_string self.embedding_function = embedding_function self._embedding_length = embedding_length self.collection_name = collection_name self.collection_metadata = collection_metadata self._distance_strategy = distance_strategy self.pre_delete_collection = pre_delete_collection self.logger = logger or logging.getLogger(__name__) self.override_relevance_score_fn = relevance_score_fn self.engine_args = engine_args or {} self._bind = connection if connection else self._create_engine() self.use_jsonb = use_jsonb self.create_extension = create_extension if not use_jsonb: # Replace with a deprecation warning. warn_deprecated( "0.0.29", pending=True, message=( "Please use JSONB instead of JSON for metadata. " "This change will allow for more efficient querying that " "involves filtering based on metadata." "Please note that filtering operators have been changed " "when using JSOB metadata to be prefixed with a $ sign " "to avoid name collisions with columns. " "If you're using an existing database, you will need to create a" "db migration for your metadata column to be JSONB and update your " "queries to use the new operators. " ), alternative=( "Instantiate with use_jsonb=True to use JSONB instead " "of JSON for metadata." ), ) self.__post_init__()
def __post_init__( self, ) -> None: """初始化商店。""" if self.create_extension: self.create_vector_extension() EmbeddingStore, CollectionStore = _get_embedding_collection_store( self._embedding_length, use_jsonb=self.use_jsonb ) self.CollectionStore = CollectionStore self.EmbeddingStore = EmbeddingStore self.create_tables_if_not_exists() self.create_collection() def __del__(self) -> None: if isinstance(self._bind, sqlalchemy.engine.Connection): self._bind.close() @property def embeddings(self) -> Embeddings: return self.embedding_function def _create_engine(self) -> sqlalchemy.engine.Engine: return sqlalchemy.create_engine(url=self.connection_string, **self.engine_args)
[docs] def create_vector_extension(self) -> None: try: with Session(self._bind) as session: # type: ignore[arg-type] # The advisor lock fixes issue arising from concurrent # creation of the vector extension. # https://github.com/langchain-ai/langchain/issues/12933 # For more information see: # https://www.postgresql.org/docs/16/explicit-locking.html#ADVISORY-LOCKS statement = sqlalchemy.text( "BEGIN;" "SELECT pg_advisory_xact_lock(1573678846307946496);" "CREATE EXTENSION IF NOT EXISTS vector;" "COMMIT;" ) session.execute(statement) session.commit() except Exception as e: raise Exception(f"Failed to create vector extension: {e}") from e
[docs] def create_tables_if_not_exists(self) -> None: with Session(self._bind) as session, session.begin(): # type: ignore[arg-type] Base.metadata.create_all(session.get_bind())
[docs] def drop_tables(self) -> None: with Session(self._bind) as session, session.begin(): # type: ignore[arg-type] Base.metadata.drop_all(session.get_bind())
[docs] def create_collection(self) -> None: if self.pre_delete_collection: self.delete_collection() with Session(self._bind) as session: # type: ignore[arg-type] self.CollectionStore.get_or_create( session, self.collection_name, cmetadata=self.collection_metadata )
[docs] def delete_collection(self) -> None: self.logger.debug("Trying to delete collection") with Session(self._bind) as session: # type: ignore[arg-type] collection = self.get_collection(session) if not collection: self.logger.warning("Collection not found") return session.delete(collection) session.commit()
@contextlib.contextmanager def _make_session(self) -> Generator[Session, None, None]: """为会话创建一个上下文管理器,绑定到_conn字符串。""" yield Session(self._bind) # type: ignore[arg-type]
[docs] def delete( self, ids: Optional[List[str]] = None, collection_only: bool = False, **kwargs: Any, ) -> None: """根据id或uuid删除向量。 参数: ids:要删除的id列表。 collection_only:仅删除集合中的id。 """ with Session(self._bind) as session: # type: ignore[arg-type] if ids is not None: self.logger.debug( "Trying to delete vectors by ids (represented by the model " "using the custom ids field)" ) stmt = delete(self.EmbeddingStore) if collection_only: collection = self.get_collection(session) if not collection: self.logger.warning("Collection not found") return stmt = stmt.where( self.EmbeddingStore.collection_id == collection.uuid ) stmt = stmt.where(self.EmbeddingStore.custom_id.in_(ids)) session.execute(stmt) session.commit()
[docs] def get_collection(self, session: Session) -> Any: return self.CollectionStore.get_by_name(session, self.collection_name)
@classmethod def _from( cls, texts: List[str], embeddings: List[List[float]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, connection_string: Optional[str] = None, pre_delete_collection: bool = False, *, use_jsonb: bool = False, **kwargs: Any, ) -> PGVector: if ids is None: ids = [str(uuid.uuid4()) for _ in texts] if not metadatas: metadatas = [{} for _ in texts] if connection_string is None: connection_string = cls.get_connection_string(kwargs) store = cls( connection_string=connection_string, collection_name=collection_name, embedding_function=embedding, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, use_jsonb=use_jsonb, **kwargs, ) store.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs ) return store
[docs] def add_embeddings( self, texts: Iterable[str], embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """将嵌入添加到向量存储中。 参数: texts:要添加到向量存储中的字符串的可迭代对象。 embeddings:嵌入向量的列表的列表。 metadatas:与文本相关联的元数据列表。 kwargs:向量存储特定参数。 """ if ids is None: ids = [str(uuid.uuid4()) for _ in texts] if not metadatas: metadatas = [{} for _ in texts] with Session(self._bind) as session: # type: ignore[arg-type] collection = self.get_collection(session) if not collection: raise ValueError("Collection not found") documents = [] for text, metadata, embedding, id in zip(texts, metadatas, embeddings, ids): embedding_store = self.EmbeddingStore( embedding=embedding, document=text, cmetadata=metadata, custom_id=id, collection_id=collection.uuid, ) documents.append(embedding_store) session.bulk_save_objects(documents) session.commit() return ids
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """运行更多文本通过嵌入并添加到向量存储。 参数: texts:要添加到向量存储的字符串的可迭代对象。 metadatas:与文本相关联的元数据的可选列表。 kwargs:向量存储特定参数 返回: 将文本添加到向量存储中的ID列表。 """ embeddings = self.embedding_function.embed_documents(list(texts)) return self.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs )
[docs] def similarity_search_with_score( self, query: str, k: int = 4, filter: Optional[dict] = None, ) -> List[Tuple[Document, float]]: """返回与查询最相似的文档。 参数: query:要查找与之相似的文档的文本。 k:要返回的文档数量。默认为4。 filter(可选[Dict[str,str]]):按元数据过滤。默认为None。 返回: 与查询最相似的文档列表,以及每个文档的分数。 """ embedding = self.embedding_function.embed_query(query) docs = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter ) return docs
@property def distance_strategy(self) -> Any: if self._distance_strategy == DistanceStrategy.EUCLIDEAN: return self.EmbeddingStore.embedding.l2_distance elif self._distance_strategy == DistanceStrategy.COSINE: return self.EmbeddingStore.embedding.cosine_distance elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return self.EmbeddingStore.embedding.max_inner_product else: raise ValueError( f"Got unexpected value for distance: {self._distance_strategy}. " f"Should be one of {', '.join([ds.value for ds in DistanceStrategy])}." )
[docs] def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[dict] = None, ) -> List[Tuple[Document, float]]: results = self._query_collection(embedding=embedding, k=k, filter=filter) return self._results_to_docs_and_scores(results)
def _results_to_docs_and_scores(self, results: Any) -> List[Tuple[Document, float]]: """返回结果中的文档和分数。""" docs = [ ( Document( page_content=result.EmbeddingStore.document, metadata=result.EmbeddingStore.cmetadata, ), result.distance if self.embedding_function is not None else None, ) for result in results ] return docs def _handle_field_filter( self, field: str, value: Any, ) -> SQLColumnExpression: """为特定字段创建过滤器。 参数: field:字段名称 value:过滤的值 如果直接提供,则这将是一个相等过滤器 如果提供为字典,则这将是一个过滤器,键将是运算符,值将是要过滤的值 返回: sqlalchemy表达式 """ if not isinstance(field, str): raise ValueError( f"field should be a string but got: {type(field)} with value: {field}" ) if field.startswith("$"): raise ValueError( f"Invalid filter condition. Expected a field but got an operator: " f"{field}" ) # Allow [a-zA-Z0-9_], disallow $ for now until we support escape characters if not field.isidentifier(): raise ValueError( f"Invalid field name: {field}. Expected a valid identifier." ) if isinstance(value, dict): # This is a filter specification if len(value) != 1: raise ValueError( "Invalid filter condition. Expected a value which " "is a dictionary with a single key that corresponds to an operator " f"but got a dictionary with {len(value)} keys. The first few " f"keys are: {list(value.keys())[:3]}" ) operator, filter_value = list(value.items())[0] # Verify that that operator is an operator if operator not in SUPPORTED_OPERATORS: raise ValueError( f"Invalid operator: {operator}. " f"Expected one of {SUPPORTED_OPERATORS}" ) else: # Then we assume an equality operator operator = "$eq" filter_value = value if operator in COMPARISONS_TO_NATIVE: # Then we implement an equality filter # native is trusted input native = COMPARISONS_TO_NATIVE[operator] return func.jsonb_path_match( self.EmbeddingStore.cmetadata, f"$.{field} {native} $value", json.dumps({"value": filter_value}), ) elif operator == "$between": # Use AND with two comparisons low, high = filter_value lower_bound = func.jsonb_path_match( self.EmbeddingStore.cmetadata, f"$.{field} >= $value", json.dumps({"value": low}), ) upper_bound = func.jsonb_path_match( self.EmbeddingStore.cmetadata, f"$.{field} <= $value", json.dumps({"value": high}), ) return sqlalchemy.and_(lower_bound, upper_bound) elif operator in {"$in", "$nin", "$like", "$ilike"}: # We'll do force coercion to text if operator in {"$in", "$nin"}: for val in filter_value: if not isinstance(val, (str, int, float)): raise NotImplementedError( f"Unsupported type: {type(val)} for value: {val}" ) queried_field = self.EmbeddingStore.cmetadata[field].astext if operator in {"$in"}: return queried_field.in_([str(val) for val in filter_value]) elif operator in {"$nin"}: return queried_field.nin_([str(val) for val in filter_value]) elif operator in {"$like"}: return queried_field.like(filter_value) elif operator in {"$ilike"}: return queried_field.ilike(filter_value) else: raise NotImplementedError() else: raise NotImplementedError() def _create_filter_clause_deprecated(self, key, value): # type: ignore[no-untyped-def] """已弃用的功能。 这是为了与基于JSON的元数据模式向后兼容而设计的。 它使用了不正确的运算符语法(运算符没有以$为前缀)。 这个实现不高效,并且存在与处理数值过滤子句的方式相关的错误。 """ IN, NIN, BETWEEN, GT, LT, NE = "in", "nin", "between", "gt", "lt", "ne" EQ, LIKE, CONTAINS, OR, AND = "eq", "like", "contains", "or", "and" value_case_insensitive = {k.lower(): v for k, v in value.items()} if IN in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.in_( value_case_insensitive[IN] ) elif NIN in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.not_in( value_case_insensitive[NIN] ) elif BETWEEN in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.between( str(value_case_insensitive[BETWEEN][0]), str(value_case_insensitive[BETWEEN][1]), ) elif GT in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext > str( value_case_insensitive[GT] ) elif LT in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext < str( value_case_insensitive[LT] ) elif NE in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext != str( value_case_insensitive[NE] ) elif EQ in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext == str( value_case_insensitive[EQ] ) elif LIKE in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.like( value_case_insensitive[LIKE] ) elif CONTAINS in map(str.lower, value): filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext.contains( value_case_insensitive[CONTAINS] ) elif OR in map(str.lower, value): or_clauses = [ self._create_filter_clause_deprecated(key, sub_value) for sub_value in value_case_insensitive[OR] ] filter_by_metadata = sqlalchemy.or_(*or_clauses) elif AND in map(str.lower, value): and_clauses = [ self._create_filter_clause_deprecated(key, sub_value) for sub_value in value_case_insensitive[AND] ] filter_by_metadata = sqlalchemy.and_(*and_clauses) else: filter_by_metadata = None return filter_by_metadata def _create_filter_clause_json_deprecated( self, filter: Any ) -> List[SQLColumnExpression]: """将IR中的过滤器转换为SQL子句。 **已弃用** 该功能将来会被弃用。 它实现了对使用JSON而不是更高效的JSONB字段进行元数据查询的模式的过滤器的转换。 """ filter_clauses = [] for key, value in filter.items(): if isinstance(value, dict): filter_by_metadata = self._create_filter_clause_deprecated(key, value) if filter_by_metadata is not None: filter_clauses.append(filter_by_metadata) else: filter_by_metadata = self.EmbeddingStore.cmetadata[key].astext == str( value ) filter_clauses.append(filter_by_metadata) return filter_clauses def _create_filter_clause(self, filters: Any) -> Any: """将LangChain IR过滤器表示转换为匹配的SQLAlchemy子句。 在顶层,我们仍然不知道我们是在处理字段还是操作符键。确定后,我们可以调用适当的逻辑来处理过滤器的创建。 参数: filters:要应用于查询的过滤器字典。 返回: 适用于查询的SQLAlchemy子句。 """ if isinstance(filters, dict): if len(filters) == 1: # The only operators allowed at the top level are $AND and $OR # First check if an operator or a field key, value = list(filters.items())[0] if key.startswith("$"): # Then it's an operator if key.lower() not in ["$and", "$or"]: raise ValueError( f"Invalid filter condition. Expected $and or $or " f"but got: {key}" ) else: # Then it's a field return self._handle_field_filter(key, filters[key]) # Here we handle the $and and $or operators if not isinstance(value, list): raise ValueError( f"Expected a list, but got {type(value)} for value: {value}" ) if key.lower() == "$and": and_ = [self._create_filter_clause(el) for el in value] if len(and_) > 1: return sqlalchemy.and_(*and_) elif len(and_) == 1: return and_[0] else: raise ValueError( "Invalid filter condition. Expected a dictionary " "but got an empty dictionary" ) elif key.lower() == "$or": or_ = [self._create_filter_clause(el) for el in value] if len(or_) > 1: return sqlalchemy.or_(*or_) elif len(or_) == 1: return or_[0] else: raise ValueError( "Invalid filter condition. Expected a dictionary " "but got an empty dictionary" ) else: raise ValueError( f"Invalid filter condition. Expected $and or $or " f"but got: {key}" ) elif len(filters) > 1: # Then all keys have to be fields (they cannot be operators) for key in filters.keys(): if key.startswith("$"): raise ValueError( f"Invalid filter condition. Expected a field but got: {key}" ) # These should all be fields and combined using an $and operator and_ = [self._handle_field_filter(k, v) for k, v in filters.items()] if len(and_) > 1: return sqlalchemy.and_(*and_) elif len(and_) == 1: return and_[0] else: raise ValueError( "Invalid filter condition. Expected a dictionary " "but got an empty dictionary" ) else: raise ValueError("Got an empty dictionary for filters.") else: raise ValueError( f"Invalid type: Expected a dictionary but got type: {type(filters)}" ) def _query_collection( self, embedding: List[float], k: int = 4, filter: Optional[Dict[str, str]] = None, ) -> List[Any]: """查询集合。""" with Session(self._bind) as session: # type: ignore[arg-type] collection = self.get_collection(session) if not collection: raise ValueError("Collection not found") filter_by = [self.EmbeddingStore.collection_id == collection.uuid] if filter: if self.use_jsonb: filter_clauses = self._create_filter_clause(filter) if filter_clauses is not None: filter_by.append(filter_clauses) else: # Old way of doing things filter_clauses = self._create_filter_clause_json_deprecated(filter) filter_by.extend(filter_clauses) _type = self.EmbeddingStore results: List[Any] = ( session.query( self.EmbeddingStore, self.distance_strategy(embedding).label("distance"), # type: ignore ) .filter(*filter_by) .order_by(sqlalchemy.asc("distance")) .join( self.CollectionStore, self.EmbeddingStore.collection_id == self.CollectionStore.uuid, ) .limit(k) .all() ) return results
[docs] def similarity_search_by_vector( self, embedding: List[float], k: int = 4, filter: Optional[dict] = None, **kwargs: Any, ) -> List[Document]: """返回与嵌入向量最相似的文档。 参数: embedding: 要查找与之相似的文档的嵌入。 k: 要返回的文档数量。默认为4。 filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。 返回: 返回与查询向量最相似的文档列表。 """ docs_and_scores = self.similarity_search_with_score_by_vector( embedding=embedding, k=k, filter=filter ) return _results_to_docs(docs_and_scores)
[docs] @classmethod def from_texts( cls: Type[PGVector], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, *, use_jsonb: bool = False, **kwargs: Any, ) -> PGVector: """返回从文本和嵌入初始化的VectorStore。 需要Postgres连接字符串。 "可以作为参数传递 或设置PGVECTOR_CONNECTION_STRING环境变量。 """ embeddings = embedding.embed_documents(list(texts)) return cls._from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, collection_name=collection_name, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, use_jsonb=use_jsonb, **kwargs, )
[docs] @classmethod def from_embeddings( cls, text_embeddings: List[Tuple[str, List[float]]], embedding: Embeddings, metadatas: Optional[List[dict]] = None, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, **kwargs: Any, ) -> PGVector: """从原始文档和预生成的嵌入构建PGVector包装器。 从文档和嵌入初始化VectorStore。 需要Postgres连接字符串 “可以作为参数传递 或设置PGVECTOR_CONNECTION_STRING环境变量。 示例: .. code-block:: python from langchain_community.vectorstores import PGVector from langchain_community.embeddings import OpenAIEmbeddings embeddings = OpenAIEmbeddings() text_embeddings = embeddings.embed_documents(texts) text_embedding_pairs = list(zip(texts, text_embeddings)) faiss = PGVector.from_embeddings(text_embedding_pairs, embeddings) """ texts = [t[0] for t in text_embeddings] embeddings = [t[1] for t in text_embeddings] return cls._from( texts, embeddings, embedding, metadatas=metadatas, ids=ids, collection_name=collection_name, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, **kwargs, )
[docs] @classmethod def from_existing_index( cls: Type[PGVector], embedding: Embeddings, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, pre_delete_collection: bool = False, **kwargs: Any, ) -> PGVector: """获取现有PGVector存储的实例。此方法将返回存储的实例,而不会插入任何新的嵌入。 """ connection_string = cls.get_connection_string(kwargs) store = cls( connection_string=connection_string, collection_name=collection_name, embedding_function=embedding, distance_strategy=distance_strategy, pre_delete_collection=pre_delete_collection, ) return store
[docs] @classmethod def get_connection_string(cls, kwargs: Dict[str, Any]) -> str: connection_string: str = get_from_dict_or_env( data=kwargs, key="connection_string", env_key="PGVECTOR_CONNECTION_STRING", ) if not connection_string: raise ValueError( "Postgres connection string is required" "Either pass it as a parameter" "or set the PGVECTOR_CONNECTION_STRING environment variable." ) return connection_string
[docs] @classmethod def from_documents( cls: Type[PGVector], documents: List[Document], embedding: Embeddings, collection_name: str = _LANGCHAIN_DEFAULT_COLLECTION_NAME, distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, ids: Optional[List[str]] = None, pre_delete_collection: bool = False, *, use_jsonb: bool = False, **kwargs: Any, ) -> PGVector: """返回从文档和嵌入初始化的VectorStore。 需要Postgres连接字符串 “可以作为参数传递 或设置PGVECTOR_CONNECTION_STRING环境变量。 """ texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] connection_string = cls.get_connection_string(kwargs) kwargs["connection_string"] = connection_string return cls.from_texts( texts=texts, pre_delete_collection=pre_delete_collection, embedding=embedding, distance_strategy=distance_strategy, metadatas=metadatas, ids=ids, collection_name=collection_name, use_jsonb=use_jsonb, **kwargs, )
[docs] @classmethod def connection_string_from_db_params( cls, driver: str, host: str, port: int, database: str, user: str, password: str, ) -> str: """从数据库参数返回连接字符串。""" return f"postgresql+{driver}://{user}:{password}@{host}:{port}/{database}"
def _select_relevance_score_fn(self) -> Callable[[float], float]: """“正确”的相关性函数可能会有所不同,取决于一些因素,包括: - 向量存储中使用的距离/相似度度量 - 嵌入的规模(OpenAI的是单位规范化的。许多其他嵌入不是!) - 嵌入的维度 - 等等。 """ if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided # in vectorstore constructor if self._distance_strategy == DistanceStrategy.COSINE: return self._cosine_relevance_score_fn elif self._distance_strategy == DistanceStrategy.EUCLIDEAN: return self._euclidean_relevance_score_fn elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: return self._max_inner_product_relevance_score_fn else: raise ValueError( "No supported normalization function" f" for distance_strategy of {self._distance_strategy}." "Consider providing relevance_score_fn to PGVector constructor." )
[docs] def max_marginal_relevance_search_with_score_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """使用最大边际相关性和分数返回所选文档的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding: 要查找相似文档的嵌入。 k (int): 要返回的文档数量。默认为4。 fetch_k (int): 要获取以传递给MMR算法的文档数量。默认为20。 lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度,其中0对应最大多样性,1对应最小多样性。默认为0.5。 filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。 返回: List[Tuple[Document, float]]: 通过最大边际相关性选择的文档列表,以及每个文档的得分。 """ results = self._query_collection(embedding=embedding, k=fetch_k, filter=filter) embedding_list = [result.EmbeddingStore.embedding for result in results] mmr_selected = maximal_marginal_relevance( np.array(embedding, dtype=np.float32), embedding_list, k=k, lambda_mult=lambda_mult, ) candidates = self._results_to_docs_and_scores(results) return [r for i, r in enumerate(candidates) if i in mmr_selected]
[docs] def max_marginal_relevance_search_with_score( self, query: str, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[dict] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """返回使用最大边际相关性和分数选择的文档。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: query (str): 要查找类似文档的文本。 k (int): 要返回的文档数量。默认为4。 fetch_k (int): 要获取以传递给MMR算法的文档数量。 默认为20。 lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度, 0对应最大多样性,1对应最小多样性。 默认为0.5。 filter (Optional[Dict[str, str]]): 按元数据过滤。默认为None。 返回: List[Tuple[Document, float]]: 通过最大边际相关性选择的文档列表, 以及每个文档的得分。 """ embedding = self.embedding_function.embed_query(query) docs = self.max_marginal_relevance_search_with_score_by_vector( embedding=embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, **kwargs, ) return docs
[docs] def max_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档到嵌入向量。 最大边际相关性优化了与查询的相似性和所选文档之间的多样性。 参数: embedding (str): 要查找类似文档的文本。 k (int): 要返回的文档数量。默认为4。 fetch_k (int): 要获取以传递给MMR算法的文档数量。 默认为20。 lambda_mult (float): 0到1之间的数字,确定结果之间多样性的程度, 0对应最大多样性,1对应最小多样性。 默认为0.5。 filter (Optional[Dict[str, str]]): 按元数据筛选。默认为None。 返回: List[Document]: 通过最大边际相关性选择的文档列表。 """ docs_and_scores = self.max_marginal_relevance_search_with_score_by_vector( embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, **kwargs, ) return _results_to_docs(docs_and_scores)
[docs] async def amax_marginal_relevance_search_by_vector( self, embedding: List[float], k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, filter: Optional[Dict[str, str]] = None, **kwargs: Any, ) -> List[Document]: """返回使用最大边际相关性选择的文档。""" # This is a temporary workaround to make the similarity search # asynchronous. The proper solution is to make the similarity search # asynchronous in the vector store implementations. return await run_in_executor( None, self.max_marginal_relevance_search_by_vector, embedding, k=k, fetch_k=fetch_k, lambda_mult=lambda_mult, filter=filter, **kwargs, )