Source code for langchain_community.vectorstores.clarifai

from __future__ import annotations

import logging
import os
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Iterable, List, Optional, Tuple

import requests
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore

logger = logging.getLogger(__name__)


[docs]class Clarifai(VectorStore): """`Clarifai AI`向量存储。 要使用,您应该已安装``clarifai`` python SDK包。 示例: .. code-block:: python from langchain_community.vectorstores import Clarifai clarifai_vector_db = Clarifai( user_id=USER_ID, app_id=APP_ID, number_of_docs=NUMBER_OF_DOCS, ) """
[docs] def __init__( self, user_id: Optional[str] = None, app_id: Optional[str] = None, number_of_docs: Optional[int] = 4, pat: Optional[str] = None, token: Optional[str] = None, api_base: Optional[str] = "https://api.clarifai.com", ) -> None: """使用Clarifai客户端进行初始化。 参数: user_id(可选[str],可选):用户ID。默认为None。 app_id(可选[str],可选):应用程序ID。默认为None。 pat(可选[str],可选):个人访问令牌。默认为None。 token(可选[str],可选):会话令牌。默认为None。 number_of_docs(可选[int],可选):在向量搜索期间返回的文档数量。默认为None。 api_base(可选[str],可选):API基础。默认为None。 引发: ValueError: 如果未提供用户ID、应用程序ID或个人访问令牌。 """ _user_id = user_id or os.environ.get("CLARIFAI_USER_ID") _app_id = app_id or os.environ.get("CLARIFAI_APP_ID") if _user_id is None or _app_id is None: raise ValueError( "Could not find CLARIFAI_USER_ID " "or CLARIFAI_APP_ID in your environment. " "Please set those env variables with a valid user ID, app ID" ) self._number_of_docs = number_of_docs try: from clarifai.client.search import Search except ImportError as e: raise ImportError( "Could not import clarifai python package. " "Please install it with `pip install clarifai`." ) from e self._auth = Search( user_id=_user_id, app_id=_app_id, top_k=number_of_docs, pat=pat, token=token, base_url=api_base, ).auth_helper
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """将文本添加到Clarifai向量存储中。这将把文本推送到Clarifai应用程序。 应用程序使用一个基本工作流程,为每个文本创建和存储嵌入。 确保您正在使用与文本兼容的基本工作流程(例如语言理解)。 参数: texts(Iterable[str]):要添加到向量存储中的文本。 metadatas(Optional[List[dict],optional):元数据的可选列表。 ids(Optional[List[str],optional):ID的可选列表。 """ try: from clarifai.client.input import Inputs from google.protobuf.struct_pb2 import Struct except ImportError as e: raise ImportError( "Could not import clarifai python package. " "Please install it with `pip install clarifai`." ) from e ltexts = list(texts) length = len(ltexts) assert length > 0, "No texts provided to add to the vectorstore." if metadatas is not None: assert length == len( metadatas ), "Number of texts and metadatas should be the same." if ids is not None: assert len(ltexts) == len( ids ), "Number of text inputs and input ids should be the same." input_obj = Inputs.from_auth_helper(auth=self._auth) batch_size = 32 input_job_ids = [] for idx in range(0, length, batch_size): try: batch_texts = ltexts[idx : idx + batch_size] batch_metadatas = ( metadatas[idx : idx + batch_size] if metadatas else None ) if ids is None: batch_ids = [uuid.uuid4().hex for _ in range(len(batch_texts))] else: batch_ids = ids[idx : idx + batch_size] if batch_metadatas is not None: meta_list = [] for meta in batch_metadatas: meta_struct = Struct() meta_struct.update(meta) meta_list.append(meta_struct) input_batch = [ input_obj.get_text_input( input_id=batch_ids[i], raw_text=text, metadata=meta_list[i] if batch_metadatas else None, ) for i, text in enumerate(batch_texts) ] result_id = input_obj.upload_inputs(inputs=input_batch) input_job_ids.extend(result_id) logger.debug("Input posted successfully.") except Exception as error: logger.warning(f"Post inputs failed: {error}") traceback.print_exc() return input_job_ids
[docs] def similarity_search_with_score( self, query: str, k: Optional[int] = None, filters: Optional[dict] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: """使用Clarifai运行带有分数的相似性搜索。 参数: query (str): 要搜索的查询文本。 k (Optional[int]): 要返回的结果数量。如果未设置, 它将取_number_of_docs。默认为None。 filter (Optional[Dict[str, str]]): 按元数据过滤。 默认为None。 返回: List[Document]: 与查询文本最相似的文档列表。 """ try: from clarifai.client.search import Search from clarifai_grpc.grpc.api import resources_pb2 from google.protobuf import json_format # type: ignore except ImportError as e: raise ImportError( "Could not import clarifai python package. " "Please install it with `pip install clarifai`." ) from e # Get number of docs to return top_k = k or self._number_of_docs search_obj = Search.from_auth_helper(auth=self._auth, top_k=top_k) rank = [{"text_raw": query}] # Add filter by metadata if provided. if filters is not None: search_metadata = {"metadata": filters} search_response = search_obj.query(ranks=rank, filters=[search_metadata]) else: search_response = search_obj.query(ranks=rank) # Retrieve hits hits = [hit for data in search_response for hit in data.hits] executor = ThreadPoolExecutor(max_workers=10) def hit_to_document(hit: resources_pb2.Hit) -> Tuple[Document, float]: metadata = json_format.MessageToDict(hit.input.data.metadata) h = dict(self._auth.metadata) request = requests.get(hit.input.data.text.url, headers=h) # override encoding by real educated guess as provided by chardet request.encoding = request.apparent_encoding requested_text = request.text logger.debug( f"\tScore {hit.score:.2f} for annotation: {hit.annotation.id}\ off input: {hit.input.id}, text: {requested_text[:125]}" ) return (Document(page_content=requested_text, metadata=metadata), hit.score) # Iterate over hits and retrieve metadata and text futures = [executor.submit(hit_to_document, hit) for hit in hits] docs_and_scores = [future.result() for future in futures] return docs_and_scores
[docs] @classmethod def from_texts( cls, texts: List[str], embedding: Optional[Embeddings] = None, metadatas: Optional[List[dict]] = None, user_id: Optional[str] = None, app_id: Optional[str] = None, number_of_docs: Optional[int] = None, pat: Optional[str] = None, token: Optional[str] = None, **kwargs: Any, ) -> Clarifai: """从文本列表创建一个Clarifai向量存储。 参数: user_id (str): 用户ID。 app_id (str): 应用程序ID。 texts (List[str]): 要添加的文本列表。 number_of_docs (Optional[int]): 在向量搜索期间返回的文档数量。默认为None。 pat (Optional[str], optional): 个人访问令牌。默认为None。 token (Optional[str], optional): 会话令牌。默认为None。 metadatas (Optional[List[dict]]): 元数据的可选列表。默认为None。 **kwargs: 要传递给搜索的其他关键字参数。 返回: Clarifai: Clarifai向量存储。 """ clarifai_vector_db = cls( user_id=user_id, app_id=app_id, number_of_docs=number_of_docs, pat=pat, token=token, **kwargs, ) clarifai_vector_db.add_texts(texts=texts, metadatas=metadatas) return clarifai_vector_db
[docs] @classmethod def from_documents( cls, documents: List[Document], embedding: Optional[Embeddings] = None, user_id: Optional[str] = None, app_id: Optional[str] = None, number_of_docs: Optional[int] = None, pat: Optional[str] = None, token: Optional[str] = None, **kwargs: Any, ) -> Clarifai: """从文档列表中创建一个Clarifai向量存储。 参数: user_id (str): 用户ID。 app_id (str): 应用程序ID。 documents (List[Document]): 要添加的文档列表。 number_of_docs (Optional[int]): 在向量搜索期间要返回的文档数量。默认为None。 pat (Optional[str], optional): 个人访问令牌。默认为None。 token (Optional[str], optional): 会话令牌。默认为None。 **kwargs: 要传递给搜索的其他关键字参数。 返回: Clarifai: Clarifai向量存储。 """ texts = [doc.page_content for doc in documents] metadatas = [doc.metadata for doc in documents] return cls.from_texts( user_id=user_id, app_id=app_id, texts=texts, number_of_docs=number_of_docs, pat=pat, metadatas=metadatas, token=token, **kwargs, )