import asyncio
import json
import warnings
from abc import ABC
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Dict,
Iterator,
List,
Mapping,
Optional,
Tuple,
)
from langchain_core._api.deprecation import deprecated
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
from langchain_core.pydantic_v1 import BaseModel, Extra, Field, root_validator
from langchain_core.utils import get_from_dict_or_env
from langchain_community.llms.utils import enforce_stop_tokens
from langchain_community.utilities.anthropic import (
get_num_tokens_anthropic,
get_token_ids_anthropic,
)
AMAZON_BEDROCK_TRACE_KEY = "amazon-bedrock-trace"
GUARDRAILS_BODY_KEY = "amazon-bedrock-guardrailAssessment"
HUMAN_PROMPT = "\n\nHuman:"
ASSISTANT_PROMPT = "\n\nAssistant:"
ALTERNATION_ERROR = (
"Error: Prompt must alternate between '\n\nHuman:' and '\n\nAssistant:'."
)
def _add_newlines_before_ha(input_text: str) -> str:
new_text = input_text
for word in ["Human:", "Assistant:"]:
new_text = new_text.replace(word, "\n\n" + word)
for i in range(2):
new_text = new_text.replace("\n\n\n" + word, "\n\n" + word)
return new_text
def _human_assistant_format(input_text: str) -> str:
if input_text.count("Human:") == 0 or (
input_text.find("Human:") > input_text.find("Assistant:")
and "Assistant:" in input_text
):
input_text = HUMAN_PROMPT + " " + input_text # SILENT CORRECTION
if input_text.count("Assistant:") == 0:
input_text = input_text + ASSISTANT_PROMPT # SILENT CORRECTION
if input_text[: len("Human:")] == "Human:":
input_text = "\n\n" + input_text
input_text = _add_newlines_before_ha(input_text)
count = 0
# track alternation
for i in range(len(input_text)):
if input_text[i : i + len(HUMAN_PROMPT)] == HUMAN_PROMPT:
if count % 2 == 0:
count += 1
else:
warnings.warn(ALTERNATION_ERROR + f" Received {input_text}")
if input_text[i : i + len(ASSISTANT_PROMPT)] == ASSISTANT_PROMPT:
if count % 2 == 1:
count += 1
else:
warnings.warn(ALTERNATION_ERROR + f" Received {input_text}")
if count % 2 == 1: # Only saw Human, no Assistant
input_text = input_text + ASSISTANT_PROMPT # SILENT CORRECTION
return input_text
def _stream_response_to_generation_chunk(
stream_response: Dict[str, Any],
) -> GenerationChunk:
"""将流响应转换为生成块。"""
if not stream_response["delta"]:
return GenerationChunk(text="")
return GenerationChunk(
text=stream_response["delta"]["text"],
generation_info=dict(
finish_reason=stream_response.get("stop_reason", None),
),
)
[docs]class BedrockBase(BaseModel, ABC):
"""Bedrock模型的基类。"""
client: Any = Field(exclude=True) #: :meta private:
region_name: Optional[str] = None
"""AWS区域,例如`us-west-2`。如果未提供,则回退到AWS_DEFAULT_REGION环境变量
或在~/.aws/config中指定的区域。"""
credentials_profile_name: Optional[str] = Field(default=None, exclude=True)
"""~/.aws/credentials 或 ~/.aws/config 文件中配置文件的名称,其中指定了访问密钥或角色信息。
如果未指定,则将使用默认凭据配置文件,或者如果在EC2实例上,则将使用来自IMDS的凭据。
参见:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html"""
config: Any = None
"""一个可选的botocore.config.Config实例,用于传递给客户端。"""
provider: Optional[str] = None
"""模型提供者,例如,亚马逊,cohere,ai21等。当未提供时,提供者从模型ID的第一部分中提取,例如,在'amazon.titan-text-express-v1'中提取'亚马逊'。对于没有提供者的模型ID,例如具有与其关联的ARN的自定义和预配模型,应提供此值。"""
model_id: str
"""要调用的模型的ID,例如amazon.titan-text-express-v1,这相当于list-foundation-models api中的modelId属性。对于自定义和预置模型,预期值为ARN。"""
model_kwargs: Optional[Dict] = None
"""传递给模型的关键字参数。"""
endpoint_url: Optional[str] = None
"""如果不想使用默认的us-east-1端点,则需要这个。"""
streaming: bool = False
"""是否流式传输结果。"""
provider_stop_sequence_key_name_map: Mapping[str, str] = {
"anthropic": "stop_sequences",
"amazon": "stopSequences",
"ai21": "stop_sequences",
"cohere": "stop_sequences",
"mistral": "stop",
}
guardrails: Optional[Mapping[str, Any]] = {
"id": None,
"version": None,
"trace": False,
}
"""一个可选的字典,用于配置Bedrock的防护栏。
该字段'guardrails'包含两个键:'id'和'version',应为字符串,但初始化为None。它用于确定特定的防护栏是否已启用并正确设置。
类型:
Optional[Mapping[str, str]]:具有'id'和'version'键的映射。
示例:
llm = Bedrock(model_id="<model_id>", client=<bedrock_client>,
model_kwargs={},
guardrails={
"id": "<guardrail_id>",
"version": "<guardrail_version>"})
要为防护栏启用跟踪,请将'trace'键设置为True,并将回调处理程序传递给'generate'、'_call'方法的'run_manager'参数。
示例:
llm = Bedrock(model_id="<model_id>", client=<bedrock_client>,
model_kwargs={},
guardrails={
"id": "<guardrail_id>",
"version": "<guardrail_version>",
"trace": True},
callbacks=[BedrockAsyncCallbackHandler()])
有关回调处理程序的更多信息,请访问[https://python.langchain.com/docs/modules/callbacks/]。
class BedrockAsyncCallbackHandler(AsyncCallbackHandler):
async def on_llm_error(
self,
error: BaseException,
**kwargs: Any,
) -> Any:
reason = kwargs.get("reason")
if reason == "GUARDRAIL_INTERVENED":
...处理防护栏干预的逻辑...""" # noqa: E501
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""验证AWS凭证和Python包是否存在于环境中。"""
# Skip creating new client if passed in constructor
if values["client"] is not None:
return values
try:
import boto3
if values["credentials_profile_name"] is not None:
session = boto3.Session(profile_name=values["credentials_profile_name"])
else:
# use default credentials
session = boto3.Session()
values["region_name"] = get_from_dict_or_env(
values,
"region_name",
"AWS_DEFAULT_REGION",
default=session.region_name,
)
client_params = {}
if values["region_name"]:
client_params["region_name"] = values["region_name"]
if values["endpoint_url"]:
client_params["endpoint_url"] = values["endpoint_url"]
if values["config"]:
client_params["config"] = values["config"]
values["client"] = session.client("bedrock-runtime", **client_params)
except ImportError:
raise ImportError(
"Could not import boto3 python package. "
"Please install it with `pip install boto3`."
)
except ValueError as e:
raise ValueError(f"Error raised by bedrock service: {e}")
except Exception as e:
raise ValueError(
"Could not load credentials to authenticate with AWS client. "
"Please check that credentials in the specified "
f"profile name are valid. Bedrock error: {e}"
) from e
return values
@property
def _identifying_params(self) -> Mapping[str, Any]:
"""获取识别参数。"""
_model_kwargs = self.model_kwargs or {}
return {
**{"model_kwargs": _model_kwargs},
}
def _get_provider(self) -> str:
if self.provider:
return self.provider
if self.model_id.startswith("arn"):
raise ValueError(
"Model provider should be supplied when passing a model ARN as "
"model_id"
)
return self.model_id.split(".")[0]
@property
def _model_is_anthropic(self) -> bool:
return self._get_provider() == "anthropic"
@property
def _guardrails_enabled(self) -> bool:
"""确定是否启用并正确配置了防护栏。
检查'guardrails'是否是一个具有非空'id'和'version'键的字典。
检查'guardrails.trace'是否为true。
返回:
布尔值:如果防护栏配置正确,则为True,否则为False。
引发:
TypeError:如果'guardrails'缺少'id'或'version'键。
"""
try:
return (
isinstance(self.guardrails, dict)
and bool(self.guardrails["id"])
and bool(self.guardrails["version"])
)
except KeyError as e:
raise TypeError(
"Guardrails must be a dictionary with 'id' and 'version' keys."
) from e
def _get_guardrails_canonical(self) -> Dict[str, Any]:
"""将护栏传递给基岩服务的规范方式遵循以下格式:
"amazon-bedrock-guardrailDetails": {
"guardrailId": "string",
"guardrailVersion": "string"
}
"""
return {
"amazon-bedrock-guardrailDetails": {
"guardrailId": self.guardrails.get("id"), # type: ignore[union-attr]
"guardrailVersion": self.guardrails.get("version"), # type: ignore[union-attr]
}
}
def _prepare_input_and_invoke(
self,
prompt: Optional[str] = None,
system: Optional[str] = None,
messages: Optional[List[Dict]] = None,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Tuple[str, Dict[str, Any]]:
_model_kwargs = self.model_kwargs or {}
provider = self._get_provider()
params = {**_model_kwargs, **kwargs}
if self._guardrails_enabled:
params.update(self._get_guardrails_canonical())
input_body = LLMInputOutputAdapter.prepare_input(
provider=provider,
model_kwargs=params,
prompt=prompt,
system=system,
messages=messages,
)
body = json.dumps(input_body)
accept = "application/json"
contentType = "application/json"
request_options = {
"body": body,
"modelId": self.model_id,
"accept": accept,
"contentType": contentType,
}
if self._guardrails_enabled:
request_options["guardrail"] = "ENABLED"
if self.guardrails.get("trace"): # type: ignore[union-attr]
request_options["trace"] = "ENABLED"
try:
response = self.client.invoke_model(**request_options)
text, body, usage_info = LLMInputOutputAdapter.prepare_output(
provider, response
).values()
except Exception as e:
raise ValueError(f"Error raised by bedrock service: {e}")
if stop is not None:
text = enforce_stop_tokens(text, stop)
# Verify and raise a callback error if any intervention occurs or a signal is
# sent from a Bedrock service,
# such as when guardrails are triggered.
services_trace = self._get_bedrock_services_signal(body) # type: ignore[arg-type]
if services_trace.get("signal") and run_manager is not None:
run_manager.on_llm_error(
Exception(
f"Error raised by bedrock service: {services_trace.get('reason')}"
),
**services_trace,
)
return text, usage_info
def _get_bedrock_services_signal(self, body: dict) -> dict:
"""这个函数检查响应主体中的中断标志或消息,指示Bedrock服务是否干预了处理流程。主要用于识别这些服务在与大型语言模型(LLM)的请求-响应周期中强加的修改或中断。
""" # noqa: E501
if (
self._guardrails_enabled
and self.guardrails.get("trace") # type: ignore[union-attr]
and self._is_guardrails_intervention(body)
):
return {
"signal": True,
"reason": "GUARDRAIL_INTERVENED",
"trace": body.get(AMAZON_BEDROCK_TRACE_KEY),
}
return {
"signal": False,
"reason": None,
"trace": None,
}
def _is_guardrails_intervention(self, body: dict) -> bool:
return body.get(GUARDRAILS_BODY_KEY) == "GUARDRAIL_INTERVENED"
def _prepare_input_and_invoke_stream(
self,
prompt: Optional[str] = None,
system: Optional[str] = None,
messages: Optional[List[Dict]] = None,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
_model_kwargs = self.model_kwargs or {}
provider = self._get_provider()
if stop:
if provider not in self.provider_stop_sequence_key_name_map:
raise ValueError(
f"Stop sequence key name for {provider} is not supported."
)
# stop sequence from _generate() overrides
# stop sequences in the class attribute
_model_kwargs[self.provider_stop_sequence_key_name_map.get(provider)] = stop
if provider == "cohere":
_model_kwargs["stream"] = True
params = {**_model_kwargs, **kwargs}
if self._guardrails_enabled:
params.update(self._get_guardrails_canonical())
input_body = LLMInputOutputAdapter.prepare_input(
provider=provider,
prompt=prompt,
system=system,
messages=messages,
model_kwargs=params,
)
body = json.dumps(input_body)
request_options = {
"body": body,
"modelId": self.model_id,
"accept": "application/json",
"contentType": "application/json",
}
if self._guardrails_enabled:
request_options["guardrail"] = "ENABLED"
if self.guardrails.get("trace"): # type: ignore[union-attr]
request_options["trace"] = "ENABLED"
try:
response = self.client.invoke_model_with_response_stream(**request_options)
except Exception as e:
raise ValueError(f"Error raised by bedrock service: {e}")
for chunk in LLMInputOutputAdapter.prepare_output_stream(
provider, response, stop, True if messages else False
):
yield chunk
# verify and raise callback error if any middleware intervened
self._get_bedrock_services_signal(chunk.generation_info) # type: ignore[arg-type]
if run_manager is not None:
run_manager.on_llm_new_token(chunk.text, chunk=chunk)
async def _aprepare_input_and_invoke_stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[GenerationChunk]:
_model_kwargs = self.model_kwargs or {}
provider = self._get_provider()
if stop:
if provider not in self.provider_stop_sequence_key_name_map:
raise ValueError(
f"Stop sequence key name for {provider} is not supported."
)
_model_kwargs[self.provider_stop_sequence_key_name_map.get(provider)] = stop
if provider == "cohere":
_model_kwargs["stream"] = True
params = {**_model_kwargs, **kwargs}
input_body = LLMInputOutputAdapter.prepare_input(
provider=provider, prompt=prompt, model_kwargs=params
)
body = json.dumps(input_body)
response = await asyncio.get_running_loop().run_in_executor(
None,
lambda: self.client.invoke_model_with_response_stream(
body=body,
modelId=self.model_id,
accept="application/json",
contentType="application/json",
),
)
async for chunk in LLMInputOutputAdapter.aprepare_output_stream(
provider, response, stop
):
yield chunk
if run_manager is not None and asyncio.iscoroutinefunction(
run_manager.on_llm_new_token
):
await run_manager.on_llm_new_token(chunk.text, chunk=chunk)
elif run_manager is not None:
run_manager.on_llm_new_token(chunk.text, chunk=chunk) # type: ignore[unused-coroutine]
[docs]@deprecated(
since="0.0.34", removal="0.3", alternative_import="langchain_aws.BedrockLLM"
)
class Bedrock(LLM, BedrockBase):
"""Bedrock模型。
为了进行身份验证,AWS客户端使用以下方法自动加载凭据:
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
如果应该使用特定的凭据配置文件,必须传递
要使用的~/.aws/credentials文件中的配置文件名称。
确保使用的凭据/角色具有访问Bedrock服务所需的策略。"""
""" 示例:
.. code-block:: python
from bedrock_langchain.bedrock_llm import BedrockLLM
llm = BedrockLLM(
credentials_profile_name="default",
model_id="amazon.titan-text-express-v1",
streaming=True
)"""
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
model_id = values["model_id"]
if model_id.startswith("anthropic.claude-3"):
raise ValueError(
"Claude v3 models are not supported by this LLM."
"Please use `from langchain_community.chat_models import BedrockChat` "
"instead."
)
return super().validate_environment(values)
@property
def _llm_type(self) -> str:
"""llm的返回类型。"""
return "amazon_bedrock"
[docs] @classmethod
def is_lc_serializable(cls) -> bool:
"""返回此模型是否可以被Langchain序列化。"""
return True
[docs] @classmethod
def get_lc_namespace(cls) -> List[str]:
"""获取langchain对象的命名空间。"""
return ["langchain", "llms", "bedrock"]
@property
def lc_attributes(self) -> Dict[str, Any]:
attributes: Dict[str, Any] = {}
if self.region_name:
attributes["region_name"] = self.region_name
return attributes
class Config:
"""此pydantic对象的配置。"""
extra = Extra.forbid
def _stream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[GenerationChunk]:
"""调用Bedrock服务进行流式处理。
参数:
prompt (str): 传递给模型的提示信息
stop (Optional[List[str]], optional): 停止序列。这些将覆盖`model_kwargs`属性中的任何停止序列。默认为None。
run_manager (Optional[CallbackManagerForLLMRun], optional): 用于处理输出的回调运行管理器。默认为None。
返回:
Iterator[GenerationChunk]: 生成器,产生流式响应。
产出:
Iterator[GenerationChunk]: 模型的响应。
"""
return self._prepare_input_and_invoke_stream(
prompt=prompt, stop=stop, run_manager=run_manager, **kwargs
)
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""调用Bedrock服务模型。
参数:
prompt: 传递给模型的提示。
stop: 生成时可选的停止词列表。
返回:
模型生成的字符串。
示例:
.. code-block:: python
response = llm.invoke("告诉我一个笑话。")
"""
if self.streaming:
completion = ""
for chunk in self._stream(
prompt=prompt, stop=stop, run_manager=run_manager, **kwargs
):
completion += chunk.text
return completion
text, _ = self._prepare_input_and_invoke(
prompt=prompt, stop=stop, run_manager=run_manager, **kwargs
)
return text
async def _astream(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncGenerator[GenerationChunk, None]:
"""使用流式传输调用Bedrock服务。
参数:
prompt (str): 传递给模型的提示
stop (Optional[List[str]], optional): 停止序列。这些将覆盖`model_kwargs`属性中的任何停止序列。默认为None。
run_manager (Optional[CallbackManagerForLLMRun], optional): 回调运行管理器用于处理输出。默认为None。
生成:
AsyncGenerator[GenerationChunk, None]: 生成器,异步地产生流式响应。
"""
async for chunk in self._aprepare_input_and_invoke_stream(
prompt=prompt, stop=stop, run_manager=run_manager, **kwargs
):
yield chunk
async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""调用Bedrock服务模型。
参数:
prompt: 传递给模型的提示。
stop: 生成时可选的停止词列表。
返回:
模型生成的字符串。
示例:
.. code-block:: python
response = await llm._acall("Tell me a joke.")
"""
if not self.streaming:
raise ValueError("Streaming must be set to True for async operations. ")
chunks = [
chunk.text
async for chunk in self._astream(
prompt=prompt, stop=stop, run_manager=run_manager, **kwargs
)
]
return "".join(chunks)
[docs] def get_num_tokens(self, text: str) -> int:
if self._model_is_anthropic:
return get_num_tokens_anthropic(text)
else:
return super().get_num_tokens(text)
[docs] def get_token_ids(self, text: str) -> List[int]:
if self._model_is_anthropic:
return get_token_ids_anthropic(text)
else:
return super().get_token_ids(text)