Source code for langchain_community.embeddings.mosaicml
from typing import Any, Dict, List, Mapping, Optional, Tuple
import requests
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.utils import get_from_dict_or_env
[docs]class MosaicMLInstructorEmbeddings(BaseModel, Embeddings):
"""MosaicML嵌入式服务。
要使用,您应该设置环境变量``MOSAICML_API_TOKEN``为您的API令牌,或将其作为命名参数传递给构造函数。
示例:
.. code-block:: python
from langchain_community.llms import MosaicMLInstructorEmbeddings
endpoint_url = (
"https://models.hosted-on.mosaicml.hosting/instructor-large/v1/predict"
)
mosaic_llm = MosaicMLInstructorEmbeddings(
endpoint_url=endpoint_url,
mosaicml_api_token="my-api-key"
)
"""
endpoint_url: str = (
"https://models.hosted-on.mosaicml.hosting/instructor-xl/v1/predict"
)
"""要使用的端点URL。"""
embed_instruction: str = "Represent the document for retrieval: "
"""用于嵌入文档的指令。"""
query_instruction: str = (
"Represent the question for retrieving supporting documents: "
)
"""用于嵌入查询的指令。"""
retry_sleep: float = 1.0
"""遇到速率限制时尝试休眠的时间长度"""
mosaicml_api_token: Optional[str] = None
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证环境中是否存在API密钥和Python包。"""
mosaicml_api_token = get_from_dict_or_env(
values, "mosaicml_api_token", "MOSAICML_API_TOKEN"
)
values["mosaicml_api_token"] = mosaicml_api_token
return values
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""获取识别参数。"""
return {"endpoint_url": self.endpoint_url}
def _embed(
self, input: List[Tuple[str, str]], is_retry: bool = False
) -> List[List[float]]:
payload = {"inputs": input}
# HTTP headers for authorization
headers = {
"Authorization": f"{self.mosaicml_api_token}",
"Content-Type": "application/json",
}
# send request
try:
response = requests.post(self.endpoint_url, headers=headers, json=payload)
except requests.exceptions.RequestException as e:
raise ValueError(f"Error raised by inference endpoint: {e}")
try:
if response.status_code == 429:
if not is_retry:
import time
time.sleep(self.retry_sleep)
return self._embed(input, is_retry=True)
raise ValueError(
f"Error raised by inference API: rate limit exceeded.\nResponse: "
f"{response.text}"
)
parsed_response = response.json()
# The inference API has changed a couple of times, so we add some handling
# to be robust to multiple response formats.
if isinstance(parsed_response, dict):
output_keys = ["data", "output", "outputs"]
for key in output_keys:
if key in parsed_response:
output_item = parsed_response[key]
break
else:
raise ValueError(
f"No key data or output in response: {parsed_response}"
)
if isinstance(output_item, list) and isinstance(output_item[0], list):
embeddings = output_item
else:
embeddings = [output_item]
else:
raise ValueError(f"Unexpected response type: {parsed_response}")
except requests.exceptions.JSONDecodeError as e:
raise ValueError(
f"Error raised by inference API: {e}.\nResponse: {response.text}"
)
return embeddings
[docs] def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""使用MosaicML部署的教师嵌入模型嵌入文档。
参数:
texts:要嵌入的文本列表。
返回:
每个文本的嵌入列表。
"""
instruction_pairs = [(self.embed_instruction, text) for text in texts]
embeddings = self._embed(instruction_pairs)
return embeddings
[docs] def embed_query(self, text: str) -> List[float]:
"""使用MosaicML部署的嵌入模型嵌入一个查询。
参数:
text: 要嵌入的文本。
返回:
文本的嵌入结果。
"""
instruction_pair = (self.query_instruction, text)
embedding = self._embed([instruction_pair])[0]
return embedding