Source code for langchain_community.embeddings.bedrock
import asyncio
import json
import os
from typing import Any, Dict, List, Optional
import numpy as np
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.runnables.config import run_in_executor
[docs]class BedrockEmbeddings(BaseModel, Embeddings):
"""Bedrock嵌入模型。
为了进行身份验证,AWS客户端使用以下方法自动加载凭据:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
如果应该使用特定的凭据配置文件,必须传递
要使用的~/.aws/credentials文件中的配置文件名称。
确保使用的凭据/角色具有访问Bedrock服务所需的策略。"""
""" 示例:
.. code-block:: python
from langchain_community.bedrock_embeddings import BedrockEmbeddings
region_name ="us-east-1"
credentials_profile_name = "default"
model_id = "amazon.titan-embed-text-v1"
be = BedrockEmbeddings(
credentials_profile_name=credentials_profile_name,
region_name=region_name,
model_id=model_id
)"""
client: Any #: :meta private:
"""基岩客户端。"""
region_name: Optional[str] = None
"""AWS区域,例如`us-west-2`。如果未提供,则回退到AWS_DEFAULT_REGION环境变量
或在~/.aws/config中指定的区域。"""
credentials_profile_name: Optional[str] = None
"""~/.aws/credentials 或 ~/.aws/config 文件中配置文件的名称,其中指定了访问密钥或角色信息。
如果未指定,则将使用默认凭据配置文件,或者如果在EC2实例上,则将使用来自IMDS的凭据。
参见:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html"""
model_id: str = "amazon.titan-embed-text-v1"
"""要调用的模型的ID,例如amazon.titan-embed-text-v1,这相当于list-foundation-models api中的modelId属性。"""
model_kwargs: Optional[Dict] = None
"""传递给模型的关键字参数。"""
endpoint_url: Optional[str] = None
"""如果不想使用默认的us-east-1端点,则需要这个。"""
normalize: bool = False
"""是否应将嵌入规范化为单位向量"""
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证AWS凭证和Python包是否存在于环境中。"""
if values["client"] is not None:
return values
try:
import boto3
if values["credentials_profile_name"] is not None:
session = boto3.Session(profile_name=values["credentials_profile_name"])
else:
# use default credentials
session = boto3.Session()
client_params = {}
if values["region_name"]:
client_params["region_name"] = values["region_name"]
if values["endpoint_url"]:
client_params["endpoint_url"] = values["endpoint_url"]
values["client"] = session.client("bedrock-runtime", **client_params)
except ImportError:
raise ImportError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
except Exception as e:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
f"profile name are valid. Bedrock error: {e}"
) from e
return values
def _embedding_func(self, text: str) -> List[float]:
"""调用Bedrock嵌入端点。"""
# replace newlines, which can negatively affect performance.
text = text.replace(os.linesep, " ")
# format input body for provider
provider = self.model_id.split(".")[0]
_model_kwargs = self.model_kwargs or {}
input_body = {**_model_kwargs}
if provider == "cohere":
if "input_type" not in input_body.keys():
input_body["input_type"] = "search_document"
input_body["texts"] = [text]
else:
# includes common provider == "amazon"
input_body["inputText"] = text
body = json.dumps(input_body)
try:
# invoke bedrock API
response = self.client.invoke_model(
body=body,
modelId=self.model_id,
accept="application/json",
contentType="application/json",
)
# format output based on provider
response_body = json.loads(response.get("body").read())
if provider == "cohere":
return response_body.get("embeddings")[0]
else:
# includes common provider == "amazon"
return response_body.get("embedding")
except Exception as e:
raise ValueError(f"Error raised by inference endpoint: {e}")
def _normalize_vector(self, embeddings: List[float]) -> List[float]:
"""将嵌入规范化为单位向量。"""
emb = np.array(embeddings)
norm_emb = emb / np.linalg.norm(emb)
return norm_emb.tolist()
[docs] def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""使用Bedrock模型计算文档嵌入。
参数:
texts:要嵌入的文本列表
返回:
每个文本的嵌入列表。
"""
results = []
for text in texts:
response = self._embedding_func(text)
if self.normalize:
response = self._normalize_vector(response)
results.append(response)
return results
[docs] def embed_query(self, text: str) -> List[float]:
"""使用Bedrock模型计算查询嵌入。
参数:
text:要嵌入的文本。
返回:
文本的嵌入。
"""
embedding = self._embedding_func(text)
if self.normalize:
return self._normalize_vector(embedding)
return embedding
[docs] async def aembed_query(self, text: str) -> List[float]:
"""使用Bedrock模型异步计算查询嵌入。
参数:
text: 要嵌入的文本。
返回:
文本的嵌入。
"""
return await run_in_executor(None, self.embed_query, text)
[docs] async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
"""使用Bedrock模型异步计算文档嵌入。
参数:
texts: 要嵌入的文本列表
返回:
每个文本的嵌入列表。
"""
result = await asyncio.gather(*[self.aembed_query(text) for text in texts])
return list(result)