from __future__ import annotations
import json
import uuid
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
)
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
if TYPE_CHECKING:
import marqo
[docs]class Marqo(VectorStore):
"""`Marqo`向量存储。
Marqo索引具有与之关联的自己的模型,用于生成您的嵌入。这意味着您可以从一系列不同的模型中进行选择,还可以使用CLIP模型创建多模态索引,将图像和文本结合在一起。
Marqo还支持使用多个加权项进行更高级的查询,请参见https://docs.marqo.ai/latest/#searching-using-weights-in-queries。
此类可以在其相似性搜索方法中灵活地接受字符串或字典作为加权查询。
要使用,您应该安装`marqo` python包,可以使用`pip install marqo`来实现。
示例:
.. code-block:: python
import marqo
from langchain_community.vectorstores import Marqo
client = marqo.Client(url=os.environ["MARQO_URL"], ...)
vectorstore = Marqo(client, index_name)"""
[docs] def __init__(
self,
client: marqo.Client,
index_name: str,
add_documents_settings: Optional[Dict[str, Any]] = None,
searchable_attributes: Optional[List[str]] = None,
page_content_builder: Optional[Callable[[Dict[str, Any]], str]] = None,
):
"""使用Marqo客户端进行初始化。"""
try:
import marqo
except ImportError:
raise ImportError(
"Could not import marqo python package. "
"Please install it with `pip install marqo`."
)
if not isinstance(client, marqo.Client):
raise ValueError(
f"client should be an instance of marqo.Client, got {type(client)}"
)
self._client = client
self._index_name = index_name
self._add_documents_settings = (
{} if add_documents_settings is None else add_documents_settings
)
self._searchable_attributes = searchable_attributes
self.page_content_builder = page_content_builder
self.tensor_fields = ["text"]
self._document_batch_size = 1024
@property
def embeddings(self) -> Optional[Embeddings]:
return None
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> List[str]:
"""将带有元数据(属性)的文本上传到Marqo。
您可以让Marqo为每个文档生成ID,也可以通过在元数据对象中包含一个"_id"字段来提供自己的ID。
参数:
texts(Iterable[str]):文本的迭代器 - 假定保留与元数据匹配的顺序。
metadatas(Optional[List[dict]],可选):元数据列表。
引发:
ValueError:如果提供了元数据,并且元数据的数量与文本的数量不同。
返回:
List[str]:已添加的ID列表。
"""
if self._client.index(self._index_name).get_settings()["index_defaults"][
"treat_urls_and_pointers_as_images"
]:
raise ValueError(
"Marqo.add_texts is disabled for multimodal indexes. To add documents "
"with a multimodal index use the Python client for Marqo directly."
)
documents: List[Dict[str, str]] = []
num_docs = 0
for i, text in enumerate(texts):
doc = {
"text": text,
"metadata": json.dumps(metadatas[i]) if metadatas else json.dumps({}),
}
documents.append(doc)
num_docs += 1
ids = []
for i in range(0, num_docs, self._document_batch_size):
response = self._client.index(self._index_name).add_documents(
documents[i : i + self._document_batch_size],
tensor_fields=self.tensor_fields,
**self._add_documents_settings,
)
if response["errors"]:
err_msg = (
f"Error in upload for documents in index range [{i},"
f"{i + self._document_batch_size}], "
f"check Marqo logs."
)
raise RuntimeError(err_msg)
ids += [item["_id"] for item in response["items"]]
return ids
[docs] def similarity_search(
self,
query: Union[str, Dict[str, float]],
k: int = 4,
**kwargs: Any,
) -> List[Document]:
"""在marqo索引中搜索最相似的文档。
参数:
query(Union[str, Dict[str, float]):搜索的查询,可以是字符串或加权查询。
k(int,可选):要返回的文档数量。默认为4。
返回:
List[Document]:按照匹配程度从高到低排序的k个文档。
"""
results = self.marqo_similarity_search(query=query, k=k)
documents = self._construct_documents_from_results_without_score(results)
return documents
[docs] def similarity_search_with_score(
self,
query: Union[str, Dict[str, float]],
k: int = 4,
) -> List[Tuple[Document, float]]:
"""返回与查询相似的Marqo文档以及它们的分数。
参数:
query(str):要搜索的查询,可以是字符串或加权查询。
k(int,可选):要返回的文档数量。默认为4。
返回:
List[Tuple[Document, float]]:匹配的文档及其分数,按分数降序排列。
"""
results = self.marqo_similarity_search(query=query, k=k)
scored_documents = self._construct_documents_from_results_with_score(results)
return scored_documents
[docs] def bulk_similarity_search(
self,
queries: Iterable[Union[str, Dict[str, float]]],
k: int = 4,
**kwargs: Any,
) -> List[List[Document]]:
"""在marqo索引中使用多个查询批量搜索最相似的文档。
参数:
queries (Iterable[Union[str, Dict[str, float]]]): 一个要批量执行的查询可迭代对象,列表中的查询可以是字符串或带有加权查询的字典。
k (int, optional): 每个查询返回的文档数量。默认为4。
返回:
List[List[Document]]: 每个查询的结果列表。
"""
bulk_results = self.marqo_bulk_similarity_search(queries=queries, k=k)
bulk_documents: List[List[Document]] = []
for results in bulk_results["result"]:
documents = self._construct_documents_from_results_without_score(results)
bulk_documents.append(documents)
return bulk_documents
[docs] def bulk_similarity_search_with_score(
self,
queries: Iterable[Union[str, Dict[str, float]]],
k: int = 4,
**kwargs: Any,
) -> List[List[Tuple[Document, float]]]:
"""返回与Marqo中的查询相似的文档以及它们的分数,使用一批查询。
参数:
query(Iterable[Union[str, Dict[str, float]]]):要批量执行的查询的可迭代对象,列表中的查询可以是字符串或加权查询的字典。
k(int,可选):要返回的文档数量。默认为4。
返回:
List[Tuple[Document, float]]:每个查询的匹配文档及其分数的列表列表。
"""
bulk_results = self.marqo_bulk_similarity_search(queries=queries, k=k)
bulk_documents: List[List[Tuple[Document, float]]] = []
for results in bulk_results["result"]:
documents = self._construct_documents_from_results_with_score(results)
bulk_documents.append(documents)
return bulk_documents
def _construct_documents_from_results_with_score(
self, results: Dict[str, List[Dict[str, str]]]
) -> List[Tuple[Document, Any]]:
"""将Marqo结果转换为文档的辅助函数。
参数:
results(List[dict]):具有“hits”的Marqo结果对象。
include_scores(bool,可选):是否在文档旁边包含分数。
默认为False。
返回:
Union[List[Document],List[Tuple[Document,float]]]:如果`include_scores`为真,则为文档或文档分数对。
"""
documents: List[Tuple[Document, Any]] = []
for res in results["hits"]:
if self.page_content_builder is None:
text = res["text"]
else:
text = self.page_content_builder(res)
metadata = json.loads(res.get("metadata", "{}"))
documents.append(
(Document(page_content=text, metadata=metadata), res["_score"])
)
return documents
def _construct_documents_from_results_without_score(
self, results: Dict[str, List[Dict[str, str]]]
) -> List[Document]:
"""将Marqo结果转换为文档的辅助函数。
参数:
results(List[dict]):具有“hits”的Marqo结果对象。
include_scores(bool,可选):是否在文档旁边包含分数。
默认为False。
返回:
Union[List[Document],List[Tuple[Document,float]]]:如果`include_scores`为真,则为文档或文档分数对。
"""
documents: List[Document] = []
for res in results["hits"]:
if self.page_content_builder is None:
text = res["text"]
else:
text = self.page_content_builder(res)
metadata = json.loads(res.get("metadata", "{}"))
documents.append(Document(page_content=text, metadata=metadata))
return documents
[docs] def marqo_similarity_search(
self,
query: Union[str, Dict[str, float]],
k: int = 4,
) -> Dict[str, List[Dict[str, str]]]:
"""从Marqo返回暴露Marqo输出的文档
参数:
query (str): 要搜索的查询。
k (int, 可选): 要返回的文档数量。默认为4。
返回:
List[Dict[str, Any]]: 来自marqo的结果。
"""
results = self._client.index(self._index_name).search(
q=query, searchable_attributes=self._searchable_attributes, limit=k
)
return results
[docs] def marqo_bulk_similarity_search(
self, queries: Iterable[Union[str, Dict[str, float]]], k: int = 4
) -> Dict[str, List[Dict[str, List[Dict[str, str]]]]]:
"""使用批量搜索从Marqo返回文档,直接暴露Marqo的输出
参数:
queries (Iterable[Union[str, Dict[str, float]]]): 查询列表。
k (int, optional): 每个查询返回的文档数量。默认为4。
返回:
Dict[str, Dict[List[Dict[str, Dict[str, Any]]]]]: 一个批量搜索结果对象
"""
bulk_results = {
"result": [
self._client.index(self._index_name).search(
q=query, searchable_attributes=self._searchable_attributes, limit=k
)
for query in queries
]
}
return bulk_results
[docs] @classmethod
def from_documents(
cls: Type[Marqo],
documents: List[Document],
embedding: Union[Embeddings, None] = None,
**kwargs: Any,
) -> Marqo:
"""从文档初始化的VectorStore。请注意,Marqo不需要嵌入,我们保留参数以遵循Liskov替换原则。
参数:
documents (List[Document]): 输入文档
embedding (Any, optional): 嵌入(不是必需的)。默认为None。
返回:
VectorStore: 一个Marqo向量存储器
"""
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
return cls.from_texts(texts, metadatas=metadatas, **kwargs)
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Any = None,
metadatas: Optional[List[dict]] = None,
index_name: str = "",
url: str = "http://localhost:8882",
api_key: str = "",
add_documents_settings: Optional[Dict[str, Any]] = None,
searchable_attributes: Optional[List[str]] = None,
page_content_builder: Optional[Callable[[Dict[str, str]], str]] = None,
index_settings: Optional[Dict[str, Any]] = None,
verbose: bool = True,
**kwargs: Any,
) -> Marqo:
"""从文本返回初始化的 Marqo。请注意,Marqo 不需要嵌入,我们保留该参数以遵循 Liskov 替换原则。
这是使用 marqo 快速入门的方法 - 只需提供您的文本和元数据,就可以创建数据存储的实例并对提供的数据进行索引。
要使用此方法知道文档的 id,您需要在每个文本的元数据中将它们包含在键 "_id" 下。
示例:
.. code-block:: python
from langchain_community.vectorstores import Marqo
datastore = Marqo(texts=['text'], index_name='my-first-index',
url='http://localhost:8882')
参数:
texts (List[str]): 要在创建时索引到 marqo 中的文本列表。
embedding (Any, optional): 嵌入(非必需)。默认为 None。
index_name (str, optional): 要使用的索引的名称,如果未提供,则将使用 UUID 创建一个。默认为 None。
url (str, optional): Marqo 的 URL。默认为 "http://localhost:8882"。
api_key (str, optional): Marqo 的 API 密钥。默认为 ""。
metadatas (Optional[List[dict]], optional): 一个元数据列表,用于陪同文本。默认为 None。
仅在创建新索引时使用。默认为 "cpu"。可以是 "cpu" 或 "cuda"。
add_documents_settings (Optional[Dict[str, Any]], optional): 添加文档的设置,请参阅
https://docs.marqo.ai/0.0.16/API-Reference/documents/#query-parameters。默认为 {}。
index_settings (Optional[Dict[str, Any]], optional): 如果索引不存在,则使用的索引设置,请参阅
https://docs.marqo.ai/0.0.16/API-Reference/indexes/#index-defaults-object。默认为 {}。
返回:
Marqo: Marqo 向量存储的实例
"""
try:
import marqo
except ImportError:
raise ImportError(
"Could not import marqo python package. "
"Please install it with `pip install marqo`."
)
if not index_name:
index_name = str(uuid.uuid4())
client = marqo.Client(url=url, api_key=api_key)
try:
client.create_index(index_name, settings_dict=index_settings or {})
if verbose:
print(f"Created {index_name} successfully.") # noqa: T201
except Exception:
if verbose:
print(f"Index {index_name} exists.") # noqa: T201
instance: Marqo = cls(
client,
index_name,
searchable_attributes=searchable_attributes,
add_documents_settings=add_documents_settings or {},
page_content_builder=page_content_builder,
)
instance.add_texts(texts, metadatas)
return instance
[docs] def get_indexes(self) -> List[Dict[str, str]]:
"""帮助查看marqo中可用的索引,如果使用from_texts方法而没有指定索引名称,则很有用
返回:
List[Dict[str, str]]: 索引列表
"""
return self._client.get_indexes()["results"]
[docs] def get_number_of_documents(self) -> int:
"""返回索引中文档的数量
返回:
int:文档的数量
"""
return self._client.index(self._index_name).get_stats()["numberOfDocuments"]