Source code for langchain_community.embeddings.bedrock

import asyncio
import json
import os
from typing import Any, Dict, List, Optional

import numpy as np
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
from langchain_core.runnables.config import run_in_executor


[docs]class BedrockEmbeddings(BaseModel, Embeddings): """Bedrock嵌入模型。 为了进行身份验证,AWS客户端使用以下方法自动加载凭据: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html 如果应该使用特定的凭据配置文件,必须传递 要使用的~/.aws/credentials文件中的配置文件名称。 确保使用的凭据/角色具有访问Bedrock服务所需的策略。""" """ 示例: .. code-block:: python from langchain_community.bedrock_embeddings import BedrockEmbeddings region_name ="us-east-1" credentials_profile_name = "default" model_id = "amazon.titan-embed-text-v1" be = BedrockEmbeddings( credentials_profile_name=credentials_profile_name, region_name=region_name, model_id=model_id )""" client: Any #: :meta private: """基岩客户端。""" region_name: Optional[str] = None """AWS区域,例如`us-west-2`。如果未提供,则回退到AWS_DEFAULT_REGION环境变量 或在~/.aws/config中指定的区域。""" credentials_profile_name: Optional[str] = None """~/.aws/credentials 或 ~/.aws/config 文件中配置文件的名称,其中指定了访问密钥或角色信息。 如果未指定,则将使用默认凭据配置文件,或者如果在EC2实例上,则将使用来自IMDS的凭据。 参见:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html""" model_id: str = "amazon.titan-embed-text-v1" """要调用的模型的ID,例如amazon.titan-embed-text-v1,这相当于list-foundation-models api中的modelId属性。""" model_kwargs: Optional[Dict] = None """传递给模型的关键字参数。""" endpoint_url: Optional[str] = None """如果不想使用默认的us-east-1端点,则需要这个。""" normalize: bool = False """是否应将嵌入规范化为单位向量""" class Config: """此pydantic对象的配置。""" extra = Extra.forbid @root_validator() def validate_environment(cls, values: Dict) -> Dict: """验证AWS凭证和Python包是否存在于环境中。""" if values["client"] is not None: return values try: import boto3 if values["credentials_profile_name"] is not None: session = boto3.Session(profile_name=values["credentials_profile_name"]) else: # use default credentials session = boto3.Session() client_params = {} if values["region_name"]: client_params["region_name"] = values["region_name"] if values["endpoint_url"]: client_params["endpoint_url"] = values["endpoint_url"] values["client"] = session.client("bedrock-runtime", **client_params) except ImportError: raise ImportError( "Could not import boto3 python package. " "Please install it with `pip install boto3`." ) except Exception as e: raise ValueError( "Could not load credentials to authenticate with AWS client. " "Please check that credentials in the specified " f"profile name are valid. Bedrock error: {e}" ) from e return values def _embedding_func(self, text: str) -> List[float]: """调用Bedrock嵌入端点。""" # replace newlines, which can negatively affect performance. text = text.replace(os.linesep, " ") # format input body for provider provider = self.model_id.split(".")[0] _model_kwargs = self.model_kwargs or {} input_body = {**_model_kwargs} if provider == "cohere": if "input_type" not in input_body.keys(): input_body["input_type"] = "search_document" input_body["texts"] = [text] else: # includes common provider == "amazon" input_body["inputText"] = text body = json.dumps(input_body) try: # invoke bedrock API response = self.client.invoke_model( body=body, modelId=self.model_id, accept="application/json", contentType="application/json", ) # format output based on provider response_body = json.loads(response.get("body").read()) if provider == "cohere": return response_body.get("embeddings")[0] else: # includes common provider == "amazon" return response_body.get("embedding") except Exception as e: raise ValueError(f"Error raised by inference endpoint: {e}") def _normalize_vector(self, embeddings: List[float]) -> List[float]: """将嵌入规范化为单位向量。""" emb = np.array(embeddings) norm_emb = emb / np.linalg.norm(emb) return norm_emb.tolist()
[docs] def embed_documents(self, texts: List[str]) -> List[List[float]]: """使用Bedrock模型计算文档嵌入。 参数: texts:要嵌入的文本列表 返回: 每个文本的嵌入列表。 """ results = [] for text in texts: response = self._embedding_func(text) if self.normalize: response = self._normalize_vector(response) results.append(response) return results
[docs] def embed_query(self, text: str) -> List[float]: """使用Bedrock模型计算查询嵌入。 参数: text:要嵌入的文本。 返回: 文本的嵌入。 """ embedding = self._embedding_func(text) if self.normalize: return self._normalize_vector(embedding) return embedding
[docs] async def aembed_query(self, text: str) -> List[float]: """使用Bedrock模型异步计算查询嵌入。 参数: text: 要嵌入的文本。 返回: 文本的嵌入。 """ return await run_in_executor(None, self.embed_query, text)
[docs] async def aembed_documents(self, texts: List[str]) -> List[List[float]]: """使用Bedrock模型异步计算文档嵌入。 参数: texts: 要嵌入的文本列表 返回: 每个文本的嵌入列表。 """ result = await asyncio.gather(*[self.aembed_query(text) for text in texts]) return list(result)