Source code for langchain_community.embeddings.mosaicml

from typing import Any, Dict, List, Mapping, Optional, Tuple

import requests
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.utils import get_from_dict_or_env


[docs]class MosaicMLInstructorEmbeddings(BaseModel, Embeddings): """MosaicML嵌入式服务。 要使用,您应该设置环境变量``MOSAICML_API_TOKEN``为您的API令牌,或将其作为命名参数传递给构造函数。 示例: .. code-block:: python from langchain_community.llms import MosaicMLInstructorEmbeddings endpoint_url = ( "https://models.hosted-on.mosaicml.hosting/instructor-large/v1/predict" ) mosaic_llm = MosaicMLInstructorEmbeddings( endpoint_url=endpoint_url, mosaicml_api_token="my-api-key" ) """ endpoint_url: str = ( "https://models.hosted-on.mosaicml.hosting/instructor-xl/v1/predict" ) """要使用的端点URL。""" embed_instruction: str = "Represent the document for retrieval: " """用于嵌入文档的指令。""" query_instruction: str = ( "Represent the question for retrieving supporting documents: " ) """用于嵌入查询的指令。""" retry_sleep: float = 1.0 """遇到速率限制时尝试休眠的时间长度""" mosaicml_api_token: Optional[str] = None class Config: """此pydantic对象的配置。""" extra = Extra.forbid @root_validator() def validate_environment(cls, values: Dict) -> Dict: """验证环境中是否存在API密钥和Python包。""" mosaicml_api_token = get_from_dict_or_env( values, "mosaicml_api_token", "MOSAICML_API_TOKEN" ) values["mosaicml_api_token"] = mosaicml_api_token return values @property def _identifying_params(self) -> Mapping[str, Any]: """获取识别参数。""" return {"endpoint_url": self.endpoint_url} def _embed( self, input: List[Tuple[str, str]], is_retry: bool = False ) -> List[List[float]]: payload = {"inputs": input} # HTTP headers for authorization headers = { "Authorization": f"{self.mosaicml_api_token}", "Content-Type": "application/json", } # send request try: response = requests.post(self.endpoint_url, headers=headers, json=payload) except requests.exceptions.RequestException as e: raise ValueError(f"Error raised by inference endpoint: {e}") try: if response.status_code == 429: if not is_retry: import time time.sleep(self.retry_sleep) return self._embed(input, is_retry=True) raise ValueError( f"Error raised by inference API: rate limit exceeded.\nResponse: " f"{response.text}" ) parsed_response = response.json() # The inference API has changed a couple of times, so we add some handling # to be robust to multiple response formats. if isinstance(parsed_response, dict): output_keys = ["data", "output", "outputs"] for key in output_keys: if key in parsed_response: output_item = parsed_response[key] break else: raise ValueError( f"No key data or output in response: {parsed_response}" ) if isinstance(output_item, list) and isinstance(output_item[0], list): embeddings = output_item else: embeddings = [output_item] else: raise ValueError(f"Unexpected response type: {parsed_response}") except requests.exceptions.JSONDecodeError as e: raise ValueError( f"Error raised by inference API: {e}.\nResponse: {response.text}" ) return embeddings
[docs] def embed_documents(self, texts: List[str]) -> List[List[float]]: """使用MosaicML部署的教师嵌入模型嵌入文档。 参数: texts:要嵌入的文本列表。 返回: 每个文本的嵌入列表。 """ instruction_pairs = [(self.embed_instruction, text) for text in texts] embeddings = self._embed(instruction_pairs) return embeddings
[docs] def embed_query(self, text: str) -> List[float]: """使用MosaicML部署的嵌入模型嵌入一个查询。 参数: text: 要嵌入的文本。 返回: 文本的嵌入结果。 """ instruction_pair = (self.query_instruction, text) embedding = self._embed([instruction_pair])[0] return embedding