from __future__ import annotations
import logging
import os
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Iterable, List, Optional, Tuple
import requests
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
logger = logging.getLogger(__name__)
[docs]class Clarifai(VectorStore):
"""`Clarifai AI`向量存储。
要使用,您应该已安装``clarifai`` python SDK包。
示例:
.. code-block:: python
from langchain_community.vectorstores import Clarifai
clarifai_vector_db = Clarifai(
user_id=USER_ID,
app_id=APP_ID,
number_of_docs=NUMBER_OF_DOCS,
)
"""
[docs] def __init__(
self,
user_id: Optional[str] = None,
app_id: Optional[str] = None,
number_of_docs: Optional[int] = 4,
pat: Optional[str] = None,
token: Optional[str] = None,
api_base: Optional[str] = "https://api.clarifai.com",
) -> None:
"""使用Clarifai客户端进行初始化。
参数:
user_id(可选[str],可选):用户ID。默认为None。
app_id(可选[str],可选):应用程序ID。默认为None。
pat(可选[str],可选):个人访问令牌。默认为None。
token(可选[str],可选):会话令牌。默认为None。
number_of_docs(可选[int],可选):在向量搜索期间返回的文档数量。默认为None。
api_base(可选[str],可选):API基础。默认为None。
引发:
ValueError: 如果未提供用户ID、应用程序ID或个人访问令牌。
"""
_user_id = user_id or os.environ.get("CLARIFAI_USER_ID")
_app_id = app_id or os.environ.get("CLARIFAI_APP_ID")
if _user_id is None or _app_id is None:
raise ValueError(
"Could not find CLARIFAI_USER_ID "
"or CLARIFAI_APP_ID in your environment. "
"Please set those env variables with a valid user ID, app ID"
)
self._number_of_docs = number_of_docs
try:
from clarifai.client.search import Search
except ImportError as e:
raise ImportError(
"Could not import clarifai python package. "
"Please install it with `pip install clarifai`."
) from e
self._auth = Search(
user_id=_user_id,
app_id=_app_id,
top_k=number_of_docs,
pat=pat,
token=token,
base_url=api_base,
).auth_helper
[docs] def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""将文本添加到Clarifai向量存储中。这将把文本推送到Clarifai应用程序。
应用程序使用一个基本工作流程,为每个文本创建和存储嵌入。
确保您正在使用与文本兼容的基本工作流程(例如语言理解)。
参数:
texts(Iterable[str]):要添加到向量存储中的文本。
metadatas(Optional[List[dict],optional):元数据的可选列表。
ids(Optional[List[str],optional):ID的可选列表。
"""
try:
from clarifai.client.input import Inputs
from google.protobuf.struct_pb2 import Struct
except ImportError as e:
raise ImportError(
"Could not import clarifai python package. "
"Please install it with `pip install clarifai`."
) from e
ltexts = list(texts)
length = len(ltexts)
assert length > 0, "No texts provided to add to the vectorstore."
if metadatas is not None:
assert length == len(
metadatas
), "Number of texts and metadatas should be the same."
if ids is not None:
assert len(ltexts) == len(
ids
), "Number of text inputs and input ids should be the same."
input_obj = Inputs.from_auth_helper(auth=self._auth)
batch_size = 32
input_job_ids = []
for idx in range(0, length, batch_size):
try:
batch_texts = ltexts[idx : idx + batch_size]
batch_metadatas = (
metadatas[idx : idx + batch_size] if metadatas else None
)
if ids is None:
batch_ids = [uuid.uuid4().hex for _ in range(len(batch_texts))]
else:
batch_ids = ids[idx : idx + batch_size]
if batch_metadatas is not None:
meta_list = []
for meta in batch_metadatas:
meta_struct = Struct()
meta_struct.update(meta)
meta_list.append(meta_struct)
input_batch = [
input_obj.get_text_input(
input_id=batch_ids[i],
raw_text=text,
metadata=meta_list[i] if batch_metadatas else None,
)
for i, text in enumerate(batch_texts)
]
result_id = input_obj.upload_inputs(inputs=input_batch)
input_job_ids.extend(result_id)
logger.debug("Input posted successfully.")
except Exception as error:
logger.warning(f"Post inputs failed: {error}")
traceback.print_exc()
return input_job_ids
[docs] def similarity_search_with_score(
self,
query: str,
k: Optional[int] = None,
filters: Optional[dict] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""使用Clarifai运行带有分数的相似性搜索。
参数:
query (str): 要搜索的查询文本。
k (Optional[int]): 要返回的结果数量。如果未设置,
它将取_number_of_docs。默认为None。
filter (Optional[Dict[str, str]]): 按元数据过滤。
默认为None。
返回:
List[Document]: 与查询文本最相似的文档列表。
"""
try:
from clarifai.client.search import Search
from clarifai_grpc.grpc.api import resources_pb2
from google.protobuf import json_format # type: ignore
except ImportError as e:
raise ImportError(
"Could not import clarifai python package. "
"Please install it with `pip install clarifai`."
) from e
# Get number of docs to return
top_k = k or self._number_of_docs
search_obj = Search.from_auth_helper(auth=self._auth, top_k=top_k)
rank = [{"text_raw": query}]
# Add filter by metadata if provided.
if filters is not None:
search_metadata = {"metadata": filters}
search_response = search_obj.query(ranks=rank, filters=[search_metadata])
else:
search_response = search_obj.query(ranks=rank)
# Retrieve hits
hits = [hit for data in search_response for hit in data.hits]
executor = ThreadPoolExecutor(max_workers=10)
def hit_to_document(hit: resources_pb2.Hit) -> Tuple[Document, float]:
metadata = json_format.MessageToDict(hit.input.data.metadata)
h = dict(self._auth.metadata)
request = requests.get(hit.input.data.text.url, headers=h)
# override encoding by real educated guess as provided by chardet
request.encoding = request.apparent_encoding
requested_text = request.text
logger.debug(
f"\tScore {hit.score:.2f} for annotation: {hit.annotation.id}\
off input: {hit.input.id}, text: {requested_text[:125]}"
)
return (Document(page_content=requested_text, metadata=metadata), hit.score)
# Iterate over hits and retrieve metadata and text
futures = [executor.submit(hit_to_document, hit) for hit in hits]
docs_and_scores = [future.result() for future in futures]
return docs_and_scores
[docs] def similarity_search(
self,
query: str,
k: Optional[int] = None,
**kwargs: Any,
) -> List[Document]:
"""运行使用Clarifai进行相似性搜索。
参数:
query: 要查找与之相似的文档的文本。
k: 要返回的文档数量。
如果未设置,将使用_number_of_docs。默认为None。
返回:
与查询最相似的文档列表,以及每个文档的分数。
"""
docs_and_scores = self.similarity_search_with_score(query, k=k, **kwargs)
return [doc for doc, _ in docs_and_scores]
[docs] @classmethod
def from_texts(
cls,
texts: List[str],
embedding: Optional[Embeddings] = None,
metadatas: Optional[List[dict]] = None,
user_id: Optional[str] = None,
app_id: Optional[str] = None,
number_of_docs: Optional[int] = None,
pat: Optional[str] = None,
token: Optional[str] = None,
**kwargs: Any,
) -> Clarifai:
"""从文本列表创建一个Clarifai向量存储。
参数:
user_id (str): 用户ID。
app_id (str): 应用程序ID。
texts (List[str]): 要添加的文本列表。
number_of_docs (Optional[int]): 在向量搜索期间返回的文档数量。默认为None。
pat (Optional[str], optional): 个人访问令牌。默认为None。
token (Optional[str], optional): 会话令牌。默认为None。
metadatas (Optional[List[dict]]): 元数据的可选列表。默认为None。
**kwargs: 要传递给搜索的其他关键字参数。
返回:
Clarifai: Clarifai向量存储。
"""
clarifai_vector_db = cls(
user_id=user_id,
app_id=app_id,
number_of_docs=number_of_docs,
pat=pat,
token=token,
**kwargs,
)
clarifai_vector_db.add_texts(texts=texts, metadatas=metadatas)
return clarifai_vector_db
[docs] @classmethod
def from_documents(
cls,
documents: List[Document],
embedding: Optional[Embeddings] = None,
user_id: Optional[str] = None,
app_id: Optional[str] = None,
number_of_docs: Optional[int] = None,
pat: Optional[str] = None,
token: Optional[str] = None,
**kwargs: Any,
) -> Clarifai:
"""从文档列表中创建一个Clarifai向量存储。
参数:
user_id (str): 用户ID。
app_id (str): 应用程序ID。
documents (List[Document]): 要添加的文档列表。
number_of_docs (Optional[int]): 在向量搜索期间要返回的文档数量。默认为None。
pat (Optional[str], optional): 个人访问令牌。默认为None。
token (Optional[str], optional): 会话令牌。默认为None。
**kwargs: 要传递给搜索的其他关键字参数。
返回:
Clarifai: Clarifai向量存储。
"""
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return cls.from_texts(
user_id=user_id,
app_id=app_id,
texts=texts,
number_of_docs=number_of_docs,
pat=pat,
metadatas=metadatas,
token=token,
**kwargs,
)