Source code for langchain_community.vectorstores.marqo

from __future__ import annotations

import json
import uuid
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    Union,
)

from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

if TYPE_CHECKING:
    import marqo


[docs]class Marqo(VectorStore): """`Marqo`向量存储。 Marqo索引具有与之关联的自己的模型,用于生成您的嵌入。这意味着您可以从一系列不同的模型中进行选择,还可以使用CLIP模型创建多模态索引,将图像和文本结合在一起。 Marqo还支持使用多个加权项进行更高级的查询,请参见https://docs.marqo.ai/latest/#searching-using-weights-in-queries。 此类可以在其相似性搜索方法中灵活地接受字符串或字典作为加权查询。 要使用,您应该安装`marqo` python包,可以使用`pip install marqo`来实现。 示例: .. code-block:: python import marqo from langchain_community.vectorstores import Marqo client = marqo.Client(url=os.environ["MARQO_URL"], ...) vectorstore = Marqo(client, index_name)"""
[docs] def __init__( self, client: marqo.Client, index_name: str, add_documents_settings: Optional[Dict[str, Any]] = None, searchable_attributes: Optional[List[str]] = None, page_content_builder: Optional[Callable[[Dict[str, Any]], str]] = None, ): """使用Marqo客户端进行初始化。""" try: import marqo except ImportError: raise ImportError( "Could not import marqo python package. " "Please install it with `pip install marqo`." ) if not isinstance(client, marqo.Client): raise ValueError( f"client should be an instance of marqo.Client, got {type(client)}" ) self._client = client self._index_name = index_name self._add_documents_settings = ( {} if add_documents_settings is None else add_documents_settings ) self._searchable_attributes = searchable_attributes self.page_content_builder = page_content_builder self.tensor_fields = ["text"] self._document_batch_size = 1024
@property def embeddings(self) -> Optional[Embeddings]: return None
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, **kwargs: Any, ) -> List[str]: """将带有元数据(属性)的文本上传到Marqo。 您可以让Marqo为每个文档生成ID,也可以通过在元数据对象中包含一个"_id"字段来提供自己的ID。 参数: texts(Iterable[str]):文本的迭代器 - 假定保留与元数据匹配的顺序。 metadatas(Optional[List[dict]],可选):元数据列表。 引发: ValueError:如果提供了元数据,并且元数据的数量与文本的数量不同。 返回: List[str]:已添加的ID列表。 """ if self._client.index(self._index_name).get_settings()["index_defaults"][ "treat_urls_and_pointers_as_images" ]: raise ValueError( "Marqo.add_texts is disabled for multimodal indexes. To add documents " "with a multimodal index use the Python client for Marqo directly." ) documents: List[Dict[str, str]] = [] num_docs = 0 for i, text in enumerate(texts): doc = { "text": text, "metadata": json.dumps(metadatas[i]) if metadatas else json.dumps({}), } documents.append(doc) num_docs += 1 ids = [] for i in range(0, num_docs, self._document_batch_size): response = self._client.index(self._index_name).add_documents( documents[i : i + self._document_batch_size], tensor_fields=self.tensor_fields, **self._add_documents_settings, ) if response["errors"]: err_msg = ( f"Error in upload for documents in index range [{i}," f"{i + self._document_batch_size}], " f"check Marqo logs." ) raise RuntimeError(err_msg) ids += [item["_id"] for item in response["items"]] return ids
[docs] def similarity_search_with_score( self, query: Union[str, Dict[str, float]], k: int = 4, ) -> List[Tuple[Document, float]]: """返回与查询相似的Marqo文档以及它们的分数。 参数: query(str):要搜索的查询,可以是字符串或加权查询。 k(int,可选):要返回的文档数量。默认为4。 返回: List[Tuple[Document, float]]:匹配的文档及其分数,按分数降序排列。 """ results = self.marqo_similarity_search(query=query, k=k) scored_documents = self._construct_documents_from_results_with_score(results) return scored_documents
[docs] def bulk_similarity_search_with_score( self, queries: Iterable[Union[str, Dict[str, float]]], k: int = 4, **kwargs: Any, ) -> List[List[Tuple[Document, float]]]: """返回与Marqo中的查询相似的文档以及它们的分数,使用一批查询。 参数: query(Iterable[Union[str, Dict[str, float]]]):要批量执行的查询的可迭代对象,列表中的查询可以是字符串或加权查询的字典。 k(int,可选):要返回的文档数量。默认为4。 返回: List[Tuple[Document, float]]:每个查询的匹配文档及其分数的列表列表。 """ bulk_results = self.marqo_bulk_similarity_search(queries=queries, k=k) bulk_documents: List[List[Tuple[Document, float]]] = [] for results in bulk_results["result"]: documents = self._construct_documents_from_results_with_score(results) bulk_documents.append(documents) return bulk_documents
def _construct_documents_from_results_with_score( self, results: Dict[str, List[Dict[str, str]]] ) -> List[Tuple[Document, Any]]: """将Marqo结果转换为文档的辅助函数。 参数: results(List[dict]):具有“hits”的Marqo结果对象。 include_scores(bool,可选):是否在文档旁边包含分数。 默认为False。 返回: Union[List[Document],List[Tuple[Document,float]]]:如果`include_scores`为真,则为文档或文档分数对。 """ documents: List[Tuple[Document, Any]] = [] for res in results["hits"]: if self.page_content_builder is None: text = res["text"] else: text = self.page_content_builder(res) metadata = json.loads(res.get("metadata", "{}")) documents.append( (Document(page_content=text, metadata=metadata), res["_score"]) ) return documents def _construct_documents_from_results_without_score( self, results: Dict[str, List[Dict[str, str]]] ) -> List[Document]: """将Marqo结果转换为文档的辅助函数。 参数: results(List[dict]):具有“hits”的Marqo结果对象。 include_scores(bool,可选):是否在文档旁边包含分数。 默认为False。 返回: Union[List[Document],List[Tuple[Document,float]]]:如果`include_scores`为真,则为文档或文档分数对。 """ documents: List[Document] = [] for res in results["hits"]: if self.page_content_builder is None: text = res["text"] else: text = self.page_content_builder(res) metadata = json.loads(res.get("metadata", "{}")) documents.append(Document(page_content=text, metadata=metadata)) return documents
[docs] @classmethod def from_documents( cls: Type[Marqo], documents: List[Document], embedding: Union[Embeddings, None] = None, **kwargs: Any, ) -> Marqo: """从文档初始化的VectorStore。请注意,Marqo不需要嵌入,我们保留参数以遵循Liskov替换原则。 参数: documents (List[Document]): 输入文档 embedding (Any, optional): 嵌入(不是必需的)。默认为None。 返回: VectorStore: 一个Marqo向量存储器 """ texts = [d.page_content for d in documents] metadatas = [d.metadata for d in documents] return cls.from_texts(texts, metadatas=metadatas, **kwargs)
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Any = None, metadatas: Optional[List[dict]] = None, index_name: str = "", url: str = "http://localhost:8882", api_key: str = "", add_documents_settings: Optional[Dict[str, Any]] = None, searchable_attributes: Optional[List[str]] = None, page_content_builder: Optional[Callable[[Dict[str, str]], str]] = None, index_settings: Optional[Dict[str, Any]] = None, verbose: bool = True, **kwargs: Any, ) -> Marqo: """从文本返回初始化的 Marqo。请注意,Marqo 不需要嵌入,我们保留该参数以遵循 Liskov 替换原则。 这是使用 marqo 快速入门的方法 - 只需提供您的文本和元数据,就可以创建数据存储的实例并对提供的数据进行索引。 要使用此方法知道文档的 id,您需要在每个文本的元数据中将它们包含在键 "_id" 下。 示例: .. code-block:: python from langchain_community.vectorstores import Marqo datastore = Marqo(texts=['text'], index_name='my-first-index', url='http://localhost:8882') 参数: texts (List[str]): 要在创建时索引到 marqo 中的文本列表。 embedding (Any, optional): 嵌入(非必需)。默认为 None。 index_name (str, optional): 要使用的索引的名称,如果未提供,则将使用 UUID 创建一个。默认为 None。 url (str, optional): Marqo 的 URL。默认为 "http://localhost:8882"。 api_key (str, optional): Marqo 的 API 密钥。默认为 ""。 metadatas (Optional[List[dict]], optional): 一个元数据列表,用于陪同文本。默认为 None。 仅在创建新索引时使用。默认为 "cpu"。可以是 "cpu" 或 "cuda"。 add_documents_settings (Optional[Dict[str, Any]], optional): 添加文档的设置,请参阅 https://docs.marqo.ai/0.0.16/API-Reference/documents/#query-parameters。默认为 {}。 index_settings (Optional[Dict[str, Any]], optional): 如果索引不存在,则使用的索引设置,请参阅 https://docs.marqo.ai/0.0.16/API-Reference/indexes/#index-defaults-object。默认为 {}。 返回: Marqo: Marqo 向量存储的实例 """ try: import marqo except ImportError: raise ImportError( "Could not import marqo python package. " "Please install it with `pip install marqo`." ) if not index_name: index_name = str(uuid.uuid4()) client = marqo.Client(url=url, api_key=api_key) try: client.create_index(index_name, settings_dict=index_settings or {}) if verbose: print(f"Created {index_name} successfully.") # noqa: T201 except Exception: if verbose: print(f"Index {index_name} exists.") # noqa: T201 instance: Marqo = cls( client, index_name, searchable_attributes=searchable_attributes, add_documents_settings=add_documents_settings or {}, page_content_builder=page_content_builder, ) instance.add_texts(texts, metadatas) return instance
[docs] def get_indexes(self) -> List[Dict[str, str]]: """帮助查看marqo中可用的索引,如果使用from_texts方法而没有指定索引名称,则很有用 返回: List[Dict[str, str]]: 索引列表 """ return self._client.get_indexes()["results"]
[docs] def get_number_of_documents(self) -> int: """返回索引中文档的数量 返回: int:文档的数量 """ return self._client.index(self._index_name).get_stats()["numberOfDocuments"]