Source code for langchain_community.llms.bedrock

import asyncio
import json
import warnings
from abc import ABC
from typing import (
    Any,
    AsyncGenerator,
    AsyncIterator,
    Dict,
    Iterator,
    List,
    Mapping,
    Optional,
    Tuple,
)

from langchain_core._api.deprecation import deprecated
from langchain_core.callbacks import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import BaseModel, Extra, Field, root_validator
from langchain_core.utils import get_from_dict_or_env

from langchain_community.llms.utils import enforce_stop_tokens
from langchain_community.utilities.anthropic import (
    get_num_tokens_anthropic,
    get_token_ids_anthropic,
)

AMAZON_BEDROCK_TRACE_KEY = "amazon-bedrock-trace"
GUARDRAILS_BODY_KEY = "amazon-bedrock-guardrailAssessment"
HUMAN_PROMPT = "\n\nHuman:"
ASSISTANT_PROMPT = "\n\nAssistant:"
ALTERNATION_ERROR = (
    "Error: Prompt must alternate between '\n\nHuman:' and '\n\nAssistant:'."
)


def _add_newlines_before_ha(input_text: str) -> str:
    new_text = input_text
    for word in ["Human:", "Assistant:"]:
        new_text = new_text.replace(word, "\n\n" + word)
        for i in range(2):
            new_text = new_text.replace("\n\n\n" + word, "\n\n" + word)
    return new_text


def _human_assistant_format(input_text: str) -> str:
    if input_text.count("Human:") == 0 or (
        input_text.find("Human:") > input_text.find("Assistant:")
        and "Assistant:" in input_text
    ):
        input_text = HUMAN_PROMPT + " " + input_text  # SILENT CORRECTION
    if input_text.count("Assistant:") == 0:
        input_text = input_text + ASSISTANT_PROMPT  # SILENT CORRECTION
    if input_text[: len("Human:")] == "Human:":
        input_text = "\n\n" + input_text
    input_text = _add_newlines_before_ha(input_text)
    count = 0
    # track alternation
    for i in range(len(input_text)):
        if input_text[i : i + len(HUMAN_PROMPT)] == HUMAN_PROMPT:
            if count % 2 == 0:
                count += 1
            else:
                warnings.warn(ALTERNATION_ERROR + f" Received {input_text}")
        if input_text[i : i + len(ASSISTANT_PROMPT)] == ASSISTANT_PROMPT:
            if count % 2 == 1:
                count += 1
            else:
                warnings.warn(ALTERNATION_ERROR + f" Received {input_text}")

    if count % 2 == 1:  # Only saw Human, no Assistant
        input_text = input_text + ASSISTANT_PROMPT  # SILENT CORRECTION

    return input_text


def _stream_response_to_generation_chunk(
    stream_response: Dict[str, Any],
) -> GenerationChunk:
    """将流响应转换为生成块。"""
    if not stream_response["delta"]:
        return GenerationChunk(text="")
    return GenerationChunk(
        text=stream_response["delta"]["text"],
        generation_info=dict(
            finish_reason=stream_response.get("stop_reason", None),
        ),
    )


[docs]class LLMInputOutputAdapter: """适配器类,用于将Langchain的输入准备成LLM模型期望的格式。 它还提供了一个辅助函数,用于从模型响应中提取生成的文本。""" provider_to_output_key_map = { "anthropic": "completion", "amazon": "outputText", "cohere": "text", "meta": "generation", "mistral": "outputs", }
[docs] @classmethod def prepare_input( cls, provider: str, model_kwargs: Dict[str, Any], prompt: Optional[str] = None, system: Optional[str] = None, messages: Optional[List[Dict]] = None, ) -> Dict[str, Any]: input_body = {**model_kwargs} if provider == "anthropic": if messages: input_body["anthropic_version"] = "bedrock-2023-05-31" input_body["messages"] = messages if system: input_body["system"] = system if "max_tokens" not in input_body: input_body["max_tokens"] = 1024 if prompt: input_body["prompt"] = _human_assistant_format(prompt) if "max_tokens_to_sample" not in input_body: input_body["max_tokens_to_sample"] = 1024 elif provider in ("ai21", "cohere", "meta", "mistral"): input_body["prompt"] = prompt elif provider == "amazon": input_body = dict() input_body["inputText"] = prompt input_body["textGenerationConfig"] = {**model_kwargs} else: input_body["inputText"] = prompt return input_body
[docs] @classmethod def prepare_output(cls, provider: str, response: Any) -> dict: text = "" if provider == "anthropic": response_body = json.loads(response.get("body").read().decode()) if "completion" in response_body: text = response_body.get("completion") elif "content" in response_body: content = response_body.get("content") text = content[0].get("text") else: response_body = json.loads(response.get("body").read()) if provider == "ai21": text = response_body.get("completions")[0].get("data").get("text") elif provider == "cohere": text = response_body.get("generations")[0].get("text") elif provider == "meta": text = response_body.get("generation") elif provider == "mistral": text = response_body.get("outputs")[0].get("text") else: text = response_body.get("results")[0].get("outputText") headers = response.get("ResponseMetadata", {}).get("HTTPHeaders", {}) prompt_tokens = int(headers.get("x-amzn-bedrock-input-token-count", 0)) completion_tokens = int(headers.get("x-amzn-bedrock-output-token-count", 0)) return { "text": text, "body": response_body, "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": prompt_tokens + completion_tokens, }, }
[docs] @classmethod def prepare_output_stream( cls, provider: str, response: Any, stop: Optional[List[str]] = None, messages_api: bool = False, ) -> Iterator[GenerationChunk]: stream = response.get("body") if not stream: return if messages_api: output_key = "message" else: output_key = cls.provider_to_output_key_map.get(provider, "") if not output_key: raise ValueError( f"Unknown streaming response output key for provider: {provider}" ) for event in stream: chunk = event.get("chunk") if not chunk: continue chunk_obj = json.loads(chunk.get("bytes").decode()) if provider == "cohere" and ( chunk_obj["is_finished"] or chunk_obj[output_key] == "<EOS_TOKEN>" ): return elif ( provider == "mistral" and chunk_obj.get(output_key, [{}])[0].get("stop_reason", "") == "stop" ): return elif messages_api and (chunk_obj.get("type") == "content_block_stop"): return if messages_api and chunk_obj.get("type") in ( "message_start", "content_block_start", "content_block_delta", ): if chunk_obj.get("type") == "content_block_delta": chk = _stream_response_to_generation_chunk(chunk_obj) yield chk else: continue else: # chunk obj format varies with provider yield GenerationChunk( text=( chunk_obj[output_key] if provider != "mistral" else chunk_obj[output_key][0]["text"] ), generation_info={ GUARDRAILS_BODY_KEY: ( chunk_obj.get(GUARDRAILS_BODY_KEY) if GUARDRAILS_BODY_KEY in chunk_obj else None ), }, )
[docs] @classmethod async def aprepare_output_stream( cls, provider: str, response: Any, stop: Optional[List[str]] = None ) -> AsyncIterator[GenerationChunk]: stream = response.get("body") if not stream: return output_key = cls.provider_to_output_key_map.get(provider, None) if not output_key: raise ValueError( f"Unknown streaming response output key for provider: {provider}" ) for event in stream: chunk = event.get("chunk") if not chunk: continue chunk_obj = json.loads(chunk.get("bytes").decode()) if provider == "cohere" and ( chunk_obj["is_finished"] or chunk_obj[output_key] == "<EOS_TOKEN>" ): return if ( provider == "mistral" and chunk_obj.get(output_key, [{}])[0].get("stop_reason", "") == "stop" ): return yield GenerationChunk( text=( chunk_obj[output_key] if provider != "mistral" else chunk_obj[output_key][0]["text"] ) )
[docs]class BedrockBase(BaseModel, ABC): """Bedrock模型的基类。""" client: Any = Field(exclude=True) #: :meta private: region_name: Optional[str] = None """AWS区域,例如`us-west-2`。如果未提供,则回退到AWS_DEFAULT_REGION环境变量 或在~/.aws/config中指定的区域。""" credentials_profile_name: Optional[str] = Field(default=None, exclude=True) """~/.aws/credentials 或 ~/.aws/config 文件中配置文件的名称,其中指定了访问密钥或角色信息。 如果未指定,则将使用默认凭据配置文件,或者如果在EC2实例上,则将使用来自IMDS的凭据。 参见:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html""" config: Any = None """一个可选的botocore.config.Config实例,用于传递给客户端。""" provider: Optional[str] = None """模型提供者,例如,亚马逊,cohere,ai21等。当未提供时,提供者从模型ID的第一部分中提取,例如,在'amazon.titan-text-express-v1'中提取'亚马逊'。对于没有提供者的模型ID,例如具有与其关联的ARN的自定义和预配模型,应提供此值。""" model_id: str """要调用的模型的ID,例如amazon.titan-text-express-v1,这相当于list-foundation-models api中的modelId属性。对于自定义和预置模型,预期值为ARN。""" model_kwargs: Optional[Dict] = None """传递给模型的关键字参数。""" endpoint_url: Optional[str] = None """如果不想使用默认的us-east-1端点,则需要这个。""" streaming: bool = False """是否流式传输结果。""" provider_stop_sequence_key_name_map: Mapping[str, str] = { "anthropic": "stop_sequences", "amazon": "stopSequences", "ai21": "stop_sequences", "cohere": "stop_sequences", "mistral": "stop", } guardrails: Optional[Mapping[str, Any]] = { "id": None, "version": None, "trace": False, } """一个可选的字典,用于配置Bedrock的防护栏。 该字段'guardrails'包含两个键:'id'和'version',应为字符串,但初始化为None。它用于确定特定的防护栏是否已启用并正确设置。 类型: Optional[Mapping[str, str]]:具有'id'和'version'键的映射。 示例: llm = Bedrock(model_id="<model_id>", client=<bedrock_client>, model_kwargs={}, guardrails={ "id": "<guardrail_id>", "version": "<guardrail_version>"}) 要为防护栏启用跟踪,请将'trace'键设置为True,并将回调处理程序传递给'generate'、'_call'方法的'run_manager'参数。 示例: llm = Bedrock(model_id="<model_id>", client=<bedrock_client>, model_kwargs={}, guardrails={ "id": "<guardrail_id>", "version": "<guardrail_version>", "trace": True}, callbacks=[BedrockAsyncCallbackHandler()]) 有关回调处理程序的更多信息,请访问[https://python.langchain.com/docs/modules/callbacks/]。 class BedrockAsyncCallbackHandler(AsyncCallbackHandler): async def on_llm_error( self, error: BaseException, **kwargs: Any, ) -> Any: reason = kwargs.get("reason") if reason == "GUARDRAIL_INTERVENED": ...处理防护栏干预的逻辑...""" # noqa: E501 @root_validator() def validate_environment(cls, values: Dict) -> Dict: """验证AWS凭证和Python包是否存在于环境中。""" # Skip creating new client if passed in constructor if values["client"] is not None: return values try: import boto3 if values["credentials_profile_name"] is not None: session = boto3.Session(profile_name=values["credentials_profile_name"]) else: # use default credentials session = boto3.Session() values["region_name"] = get_from_dict_or_env( values, "region_name", "AWS_DEFAULT_REGION", default=session.region_name, ) client_params = {} if values["region_name"]: client_params["region_name"] = values["region_name"] if values["endpoint_url"]: client_params["endpoint_url"] = values["endpoint_url"] if values["config"]: client_params["config"] = values["config"] values["client"] = session.client("bedrock-runtime", **client_params) except ImportError: raise ImportError( "Could not import boto3 python package. " "Please install it with `pip install boto3`." ) except ValueError as e: raise ValueError(f"Error raised by bedrock service: {e}") except Exception as e: raise ValueError( "Could not load credentials to authenticate with AWS client. " "Please check that credentials in the specified " f"profile name are valid. Bedrock error: {e}" ) from e return values @property def _identifying_params(self) -> Mapping[str, Any]: """获取识别参数。""" _model_kwargs = self.model_kwargs or {} return { **{"model_kwargs": _model_kwargs}, } def _get_provider(self) -> str: if self.provider: return self.provider if self.model_id.startswith("arn"): raise ValueError( "Model provider should be supplied when passing a model ARN as " "model_id" ) return self.model_id.split(".")[0] @property def _model_is_anthropic(self) -> bool: return self._get_provider() == "anthropic" @property def _guardrails_enabled(self) -> bool: """确定是否启用并正确配置了防护栏。 检查'guardrails'是否是一个具有非空'id'和'version'键的字典。 检查'guardrails.trace'是否为true。 返回: 布尔值:如果防护栏配置正确,则为True,否则为False。 引发: TypeError:如果'guardrails'缺少'id'或'version'键。 """ try: return ( isinstance(self.guardrails, dict) and bool(self.guardrails["id"]) and bool(self.guardrails["version"]) ) except KeyError as e: raise TypeError( "Guardrails must be a dictionary with 'id' and 'version' keys." ) from e def _get_guardrails_canonical(self) -> Dict[str, Any]: """将护栏传递给基岩服务的规范方式遵循以下格式: "amazon-bedrock-guardrailDetails": { "guardrailId": "string", "guardrailVersion": "string" } """ return { "amazon-bedrock-guardrailDetails": { "guardrailId": self.guardrails.get("id"), # type: ignore[union-attr] "guardrailVersion": self.guardrails.get("version"), # type: ignore[union-attr] } } def _prepare_input_and_invoke( self, prompt: Optional[str] = None, system: Optional[str] = None, messages: Optional[List[Dict]] = None, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Tuple[str, Dict[str, Any]]: _model_kwargs = self.model_kwargs or {} provider = self._get_provider() params = {**_model_kwargs, **kwargs} if self._guardrails_enabled: params.update(self._get_guardrails_canonical()) input_body = LLMInputOutputAdapter.prepare_input( provider=provider, model_kwargs=params, prompt=prompt, system=system, messages=messages, ) body = json.dumps(input_body) accept = "application/json" contentType = "application/json" request_options = { "body": body, "modelId": self.model_id, "accept": accept, "contentType": contentType, } if self._guardrails_enabled: request_options["guardrail"] = "ENABLED" if self.guardrails.get("trace"): # type: ignore[union-attr] request_options["trace"] = "ENABLED" try: response = self.client.invoke_model(**request_options) text, body, usage_info = LLMInputOutputAdapter.prepare_output( provider, response ).values() except Exception as e: raise ValueError(f"Error raised by bedrock service: {e}") if stop is not None: text = enforce_stop_tokens(text, stop) # Verify and raise a callback error if any intervention occurs or a signal is # sent from a Bedrock service, # such as when guardrails are triggered. services_trace = self._get_bedrock_services_signal(body) # type: ignore[arg-type] if services_trace.get("signal") and run_manager is not None: run_manager.on_llm_error( Exception( f"Error raised by bedrock service: {services_trace.get('reason')}" ), **services_trace, ) return text, usage_info def _get_bedrock_services_signal(self, body: dict) -> dict: """这个函数检查响应主体中的中断标志或消息,指示Bedrock服务是否干预了处理流程。主要用于识别这些服务在与大型语言模型(LLM)的请求-响应周期中强加的修改或中断。 """ # noqa: E501 if ( self._guardrails_enabled and self.guardrails.get("trace") # type: ignore[union-attr] and self._is_guardrails_intervention(body) ): return { "signal": True, "reason": "GUARDRAIL_INTERVENED", "trace": body.get(AMAZON_BEDROCK_TRACE_KEY), } return { "signal": False, "reason": None, "trace": None, } def _is_guardrails_intervention(self, body: dict) -> bool: return body.get(GUARDRAILS_BODY_KEY) == "GUARDRAIL_INTERVENED" def _prepare_input_and_invoke_stream( self, prompt: Optional[str] = None, system: Optional[str] = None, messages: Optional[List[Dict]] = None, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[GenerationChunk]: _model_kwargs = self.model_kwargs or {} provider = self._get_provider() if stop: if provider not in self.provider_stop_sequence_key_name_map: raise ValueError( f"Stop sequence key name for {provider} is not supported." ) # stop sequence from _generate() overrides # stop sequences in the class attribute _model_kwargs[self.provider_stop_sequence_key_name_map.get(provider)] = stop if provider == "cohere": _model_kwargs["stream"] = True params = {**_model_kwargs, **kwargs} if self._guardrails_enabled: params.update(self._get_guardrails_canonical()) input_body = LLMInputOutputAdapter.prepare_input( provider=provider, prompt=prompt, system=system, messages=messages, model_kwargs=params, ) body = json.dumps(input_body) request_options = { "body": body, "modelId": self.model_id, "accept": "application/json", "contentType": "application/json", } if self._guardrails_enabled: request_options["guardrail"] = "ENABLED" if self.guardrails.get("trace"): # type: ignore[union-attr] request_options["trace"] = "ENABLED" try: response = self.client.invoke_model_with_response_stream(**request_options) except Exception as e: raise ValueError(f"Error raised by bedrock service: {e}") for chunk in LLMInputOutputAdapter.prepare_output_stream( provider, response, stop, True if messages else False ): yield chunk # verify and raise callback error if any middleware intervened self._get_bedrock_services_signal(chunk.generation_info) # type: ignore[arg-type] if run_manager is not None: run_manager.on_llm_new_token(chunk.text, chunk=chunk) async def _aprepare_input_and_invoke_stream( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> AsyncIterator[GenerationChunk]: _model_kwargs = self.model_kwargs or {} provider = self._get_provider() if stop: if provider not in self.provider_stop_sequence_key_name_map: raise ValueError( f"Stop sequence key name for {provider} is not supported." ) _model_kwargs[self.provider_stop_sequence_key_name_map.get(provider)] = stop if provider == "cohere": _model_kwargs["stream"] = True params = {**_model_kwargs, **kwargs} input_body = LLMInputOutputAdapter.prepare_input( provider=provider, prompt=prompt, model_kwargs=params ) body = json.dumps(input_body) response = await asyncio.get_running_loop().run_in_executor( None, lambda: self.client.invoke_model_with_response_stream( body=body, modelId=self.model_id, accept="application/json", contentType="application/json", ), ) async for chunk in LLMInputOutputAdapter.aprepare_output_stream( provider, response, stop ): yield chunk if run_manager is not None and asyncio.iscoroutinefunction( run_manager.on_llm_new_token ): await run_manager.on_llm_new_token(chunk.text, chunk=chunk) elif run_manager is not None: run_manager.on_llm_new_token(chunk.text, chunk=chunk) # type: ignore[unused-coroutine]
[docs]@deprecated( since="0.0.34", removal="0.3", alternative_import="langchain_aws.BedrockLLM" ) class Bedrock(LLM, BedrockBase): """Bedrock模型。 为了进行身份验证,AWS客户端使用以下方法自动加载凭据: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html 如果应该使用特定的凭据配置文件,必须传递 要使用的~/.aws/credentials文件中的配置文件名称。 确保使用的凭据/角色具有访问Bedrock服务所需的策略。""" """ 示例: .. code-block:: python from bedrock_langchain.bedrock_llm import BedrockLLM llm = BedrockLLM( credentials_profile_name="default", model_id="amazon.titan-text-express-v1", streaming=True )""" @root_validator() def validate_environment(cls, values: Dict) -> Dict: model_id = values["model_id"] if model_id.startswith("anthropic.claude-3"): raise ValueError( "Claude v3 models are not supported by this LLM." "Please use `from langchain_community.chat_models import BedrockChat` " "instead." ) return super().validate_environment(values) @property def _llm_type(self) -> str: """llm的返回类型。""" return "amazon_bedrock"
[docs] @classmethod def is_lc_serializable(cls) -> bool: """返回此模型是否可以被Langchain序列化。""" return True
[docs] @classmethod def get_lc_namespace(cls) -> List[str]: """获取langchain对象的命名空间。""" return ["langchain", "llms", "bedrock"]
@property def lc_attributes(self) -> Dict[str, Any]: attributes: Dict[str, Any] = {} if self.region_name: attributes["region_name"] = self.region_name return attributes class Config: """此pydantic对象的配置。""" extra = Extra.forbid def _stream( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Iterator[GenerationChunk]: """调用Bedrock服务进行流式处理。 参数: prompt (str): 传递给模型的提示信息 stop (Optional[List[str]], optional): 停止序列。这些将覆盖`model_kwargs`属性中的任何停止序列。默认为None。 run_manager (Optional[CallbackManagerForLLMRun], optional): 用于处理输出的回调运行管理器。默认为None。 返回: Iterator[GenerationChunk]: 生成器,产生流式响应。 产出: Iterator[GenerationChunk]: 模型的响应。 """ return self._prepare_input_and_invoke_stream( prompt=prompt, stop=stop, run_manager=run_manager, **kwargs ) def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """调用Bedrock服务模型。 参数: prompt: 传递给模型的提示。 stop: 生成时可选的停止词列表。 返回: 模型生成的字符串。 示例: .. code-block:: python response = llm.invoke("告诉我一个笑话。") """ if self.streaming: completion = "" for chunk in self._stream( prompt=prompt, stop=stop, run_manager=run_manager, **kwargs ): completion += chunk.text return completion text, _ = self._prepare_input_and_invoke( prompt=prompt, stop=stop, run_manager=run_manager, **kwargs ) return text async def _astream( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> AsyncGenerator[GenerationChunk, None]: """使用流式传输调用Bedrock服务。 参数: prompt (str): 传递给模型的提示 stop (Optional[List[str]], optional): 停止序列。这些将覆盖`model_kwargs`属性中的任何停止序列。默认为None。 run_manager (Optional[CallbackManagerForLLMRun], optional): 回调运行管理器用于处理输出。默认为None。 生成: AsyncGenerator[GenerationChunk, None]: 生成器,异步地产生流式响应。 """ async for chunk in self._aprepare_input_and_invoke_stream( prompt=prompt, stop=stop, run_manager=run_manager, **kwargs ): yield chunk async def _acall( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: """调用Bedrock服务模型。 参数: prompt: 传递给模型的提示。 stop: 生成时可选的停止词列表。 返回: 模型生成的字符串。 示例: .. code-block:: python response = await llm._acall("Tell me a joke.") """ if not self.streaming: raise ValueError("Streaming must be set to True for async operations. ") chunks = [ chunk.text async for chunk in self._astream( prompt=prompt, stop=stop, run_manager=run_manager, **kwargs ) ] return "".join(chunks)
[docs] def get_num_tokens(self, text: str) -> int: if self._model_is_anthropic: return get_num_tokens_anthropic(text) else: return super().get_num_tokens(text)
[docs] def get_token_ids(self, text: str) -> List[int]: if self._model_is_anthropic: return get_token_ids_anthropic(text) else: return super().get_token_ids(text)