ChatBedrockConverse#
- class langchain_aws.chat_models.bedrock_converse.ChatBedrockConverse[source]#
基础类:
BaseChatModel
基于Bedrock converse API构建的Bedrock聊天模型集成。
一旦Bedrock converse API与旧版Bedrock API功能相当,此实现将最终取代现有的ChatBedrock实现。具体来说,converse API目前还不支持自定义Bedrock模型。
- Setup:
要使用Amazon Bedrock,请确保您已完成以下所有步骤: https://docs.aws.amazon.com/bedrock/latest/userguide/setting-up.html
完成后,安装LangChain集成:
pip install -U langchain-aws
- Key init args — completion params:
- model: str
使用的BedrockConverse模型名称。
- temperature: float
采样温度。
- max_tokens: Optional[int]
生成的最大令牌数。
- Key init args — client params:
- region_name: Optional[str]
使用的AWS区域,例如‘us-west-2’。
- base_url: Optional[str]
使用的Bedrock端点。如果您不想默认使用us-east-1端点,则需要此设置。
- credentials_profile_name: Optional[str]
配置文件在 ~/.aws/credentials 或 ~/.aws/config 文件中的名称。
请参阅参数部分中支持的初始化参数及其描述的完整列表。
- Instantiate:
from langchain_aws import ChatBedrockConverse llm = ChatBedrockConverse( model="anthropic.claude-3-sonnet-20240229-v1:0", temperature=0, max_tokens=None, # other params... )
- Invoke:
messages = [ ("system", "You are a helpful translator. Translate the user sentence to French."), ("human", "I love programming."), ] llm.invoke(messages)
AIMessage(content=[{'type': 'text', 'text': "J'aime la programmation."}], response_metadata={'ResponseMetadata': {'RequestId': '9ef1e313-a4c1-4f79-b631-171f658d3c0e', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Sat, 15 Jun 2024 01:19:24 GMT', 'content-type': 'application/json', 'content-length': '205', 'connection': 'keep-alive', 'x-amzn-requestid': '9ef1e313-a4c1-4f79-b631-171f658d3c0e'}, 'RetryAttempts': 0}, 'stopReason': 'end_turn', 'metrics': {'latencyMs': 609}}, id='run-754e152b-2b41-4784-9538-d40d71a5c3bc-0', usage_metadata={'input_tokens': 25, 'output_tokens': 11, 'total_tokens': 36})
- Stream:
for chunk in llm.stream(messages): print(chunk)
AIMessageChunk(content=[], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'type': 'text', 'text': 'J', 'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'text': "'", 'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'text': 'a', 'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'text': 'ime', 'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'text': ' la', 'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'text': ' programm', 'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'text': 'ation', 'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'text': '.', 'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[{'index': 0}], id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[], response_metadata={'stopReason': 'end_turn'}, id='run-da3c2606-4792-440a-ac66-72e0d1f6d117') AIMessageChunk(content=[], response_metadata={'metrics': {'latencyMs': 581}}, id='run-da3c2606-4792-440a-ac66-72e0d1f6d117', usage_metadata={'input_tokens': 25, 'output_tokens': 11, 'total_tokens': 36})
stream = llm.stream(messages) full = next(stream) for chunk in stream: full += chunk full
AIMessageChunk(content=[{'type': 'text', 'text': "J'aime la programmation.", 'index': 0}], response_metadata={'stopReason': 'end_turn', 'metrics': {'latencyMs': 554}}, id='run-56a5a5e0-de86-412b-9835-624652dc3539', usage_metadata={'input_tokens': 25, 'output_tokens': 11, 'total_tokens': 36})
- Tool calling:
from pydantic import BaseModel, Field class GetWeather(BaseModel): '''Get the current weather in a given location''' location: str = Field(..., description="The city and state, e.g. San Francisco, CA") class GetPopulation(BaseModel): '''Get the current population in a given location''' location: str = Field(..., description="The city and state, e.g. San Francisco, CA") llm_with_tools = llm.bind_tools([GetWeather, GetPopulation]) ai_msg = llm_with_tools.invoke("Which city is hotter today and which is bigger: LA or NY?") ai_msg.tool_calls
[{'name': 'GetWeather', 'args': {'location': 'Los Angeles, CA'}, 'id': 'tooluse_Mspi2igUTQygp-xbX6XGVw'}, {'name': 'GetWeather', 'args': {'location': 'New York, NY'}, 'id': 'tooluse_tOPHiDhvR2m0xF5_5tyqWg'}, {'name': 'GetPopulation', 'args': {'location': 'Los Angeles, CA'}, 'id': 'tooluse__gcY_klbSC-GqB-bF_pxNg'}, {'name': 'GetPopulation', 'args': {'location': 'New York, NY'}, 'id': 'tooluse_-1HSoGX0TQCSaIg7cdFy8Q'}]
有关更多信息,请参见
ChatBedrockConverse.bind_tools()
方法。- Structured output:
from typing import Optional from pydantic import BaseModel, Field class Joke(BaseModel): '''Joke to tell user.''' setup: str = Field(description="The setup of the joke") punchline: str = Field(description="The punchline to the joke") rating: Optional[int] = Field(description="How funny the joke is, from 1 to 10") structured_llm = llm.with_structured_output(Joke) structured_llm.invoke("Tell me a joke about cats")
Joke(setup='What do you call a cat that gets all dressed up?', punchline='A purrfessional!', rating=7)
更多信息请参见
ChatBedrockConverse.with_structured_output()
。- Image input:
import base64 import httpx from langchain_core.messages import HumanMessage image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg" image_data = base64.b64encode(httpx.get(image_url).content).decode("utf-8") message = HumanMessage( content=[ {"type": "text", "text": "describe the weather in this image"}, { "type": "image", "source": {"type": "base64", "media_type": "image/jpeg", "data": image_data}, }, ], ) ai_msg = llm.invoke([message]) ai_msg.content
[{'type': 'text', 'text': 'The image depicts a sunny day with a partly cloudy sky. The sky is a brilliant blue color with scattered white clouds drifting across. The lighting and cloud patterns suggest pleasant, mild weather conditions. The scene shows an open grassy field or meadow, indicating warm temperatures conducive for vegetation growth. Overall, the weather portrayed in this scenic outdoor image appears to be sunny with some clouds, likely representing a nice, comfortable day.'}]
- Token usage:
ai_msg = llm.invoke(messages) ai_msg.usage_metadata
{'input_tokens': 25, 'output_tokens': 11, 'total_tokens': 36}
- Response metadata
ai_msg = llm.invoke(messages) ai_msg.response_metadata
{'ResponseMetadata': {'RequestId': '776a2a26-5946-45ae-859e-82dc5f12017c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Mon, 17 Jun 2024 01:37:05 GMT', 'content-type': 'application/json', 'content-length': '206', 'connection': 'keep-alive', 'x-amzn-requestid': '776a2a26-5946-45ae-859e-82dc5f12017c'}, 'RetryAttempts': 0}, 'stopReason': 'end_turn', 'metrics': {'latencyMs': 1290}}
注意
ChatBedrockConverse 实现了标准的
Runnable Interface
。🏃Runnable Interface
接口在可运行对象上提供了额外的方法,例如with_types
,with_retry
,assign
,bind
,get_graph
, 等等。- param additional_model_request_fields: Dict[str, Any] | None = None#
模型支持的额外推理参数。
除了Converse在inferenceConfig字段中支持的基本推理参数集之外的参数。
- param additional_model_response_field_paths: List[str] | None = None#
在响应中返回的附加模型参数字段路径。
Converse 返回请求的字段作为 JSON Pointer 对象在 additionalModelResponseFields 字段中。以下是 additionalModelResponseFieldPaths 的示例 JSON。
- param aws_access_key_id: SecretStr | None [Optional]#
AWS访问密钥ID。
如果提供了aws_secret_access_key,则还必须提供aws_secret_access_key。 如果未指定,将使用默认的凭证配置文件,或者如果在EC2实例上,则使用来自IMDS的凭证。 参见:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
如果未提供,将从‘AWS_ACCESS_KEY_ID’环境变量中读取。
- param aws_secret_access_key: SecretStr | None [Optional]#
AWS secret_access_key。
如果提供了aws_secret_access_key,则必须同时提供aws_access_key_id。 如果未指定,将使用默认的凭证配置文件,或者如果在EC2实例上,则使用来自IMDS的凭证。 参见:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
如果未提供,将从‘AWS_SECRET_ACCESS_KEY’环境变量中读取。
- param aws_session_token: SecretStr | None [Optional]#
AWS会话令牌。
如果提供了aws_access_key_id和aws_secret_access_key,则必须同时提供。除非使用临时凭证,否则不需要。参见:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
如果未提供,将从‘AWS_SESSION_TOKEN’环境变量中读取。
- param cache: BaseCache | bool | None = None#
是否缓存响应。
如果为真,将使用全局缓存。
如果为false,将不使用缓存
如果为None,将使用全局缓存(如果已设置),否则不使用缓存。
如果是 BaseCache 的实例,将使用提供的缓存。
目前不支持对模型的流式方法进行缓存。
- param callback_manager: BaseCallbackManager | None = None#
自版本0.1.7起已弃用:请改用
callbacks()
。它将在pydantic==1.0中被移除。回调管理器以添加到运行跟踪中。
- param callbacks: Callbacks = None#
添加到运行跟踪的回调。
- param config: Any = None#
一个可选的botocore.config.Config实例,传递给客户端。
- param credentials_profile_name: str | None = None#
配置文件在 ~/.aws/credentials 或 ~/.aws/config 文件中的名称。
配置文件应指定访问密钥或角色信息。 如果未指定,将使用默认的凭证配置文件,如果在EC2实例上,则使用来自IMDS的凭证。 参见:https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
- param custom_get_token_ids: Callable[[str], list[int]] | None = None#
用于计数标记的可选编码器。
- param disable_streaming: bool | Literal['tool_calling'] = False#
是否禁用此模型的流式传输。
如果流式传输被绕过,那么
stream()/astream()
将依赖于invoke()/ainvoke()
。如果为True,将始终绕过流式传输情况。
如果是“tool_calling”,只有在使用
tools
关键字参数调用模型时,才会绕过流式处理的情况。如果为 False(默认值),将始终使用流式情况(如果可用)。
- param endpoint_url: str | None = None (alias 'base_url')#
如果您不想默认使用 us-east-1 端点,则需要
- param guardrail_config: Dict[str, Any] | None = None (alias 'guardrails')#
请求中要使用的防护栏的配置信息。
- param max_tokens: int | None = None#
生成的最大令牌数。
- param metadata: dict[str, Any] | None = None#
要添加到运行跟踪的元数据。
- param model_id: str [Required] (alias 'model')#
要调用的模型的ID。
例如,
"anthropic.claude-3-sonnet-20240229-v1:0"
。这等同于 list-foundation-models API 中的 modelID 属性。对于自定义和预置 模型,预期为 ARN 值。请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/model-ids.html#model-ids-arns 以获取所有支持的内置模型列表。
- param provider: str = ''#
模型提供者,例如,amazon, cohere, ai21等。
当未提供时,提供者从model_id的第一部分提取,例如在'amazon.titan-text-express-v1'中的'amazon'。对于不包含提供者的模型ID,如具有关联ARN的自定义和预配模型,应提供此值。
- param rate_limiter: BaseRateLimiter | None = None#
一个可选的速率限制器,用于限制请求的数量。
- param region_name: str | None = None#
aws 区域,例如 us-west-2。
如果未在此处提供,则回退到AWS_DEFAULT_REGION环境变量或~/.aws/config中指定的区域。
- param stop_sequences: List[str] | None = None (alias 'stop')#
如果出现任何这些子字符串,则停止生成。
- param supports_tool_choice_values: Sequence[Literal['auto', 'any', 'tool']] | None = None#
模型支持哪些类型的 tool_choice 值。
如果未指定,则推断。如果使用‘claude-3’模型,则推断为(‘auto’,‘any’,‘tool’);如果使用‘mistral-large’模型,则推断为(‘auto’,‘any’);如果使用‘nova’模型,则推断为(‘auto’),否则为空。
- param tags: list[str] | None = None#
要添加到运行跟踪的标签。
- param temperature: float | None = None#
采样温度。必须在0到1之间。
- param top_p: float | None = None#
考虑用于下一个令牌的最可能候选者的百分比。
必须为0到1。
例如,如果您选择topP的值为0.8,模型将从可能的下一个标记的概率分布的前80%中进行选择。
- param verbose: bool [Optional]#
是否打印出响应文本。
- __call__(messages: list[BaseMessage], stop: list[str] | None = None, callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, **kwargs: Any) BaseMessage #
自版本0.1.7起已弃用:请改用
invoke()
。在langchain-core==1.0之前不会移除。- Parameters:
messages (列表[BaseMessage])
stop (列表[字符串] | 无)
callbacks (列表[BaseCallbackHandler] | BaseCallbackManager | 无)
kwargs (Any)
- Return type:
- async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output] #
默认实现使用asyncio.gather并行运行ainvoke。
batch的默认实现对于IO绑定的runnables效果很好。
如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。
- Parameters:
inputs (list[Input]) – Runnable 的输入列表。
config (RunnableConfig | list[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Returns:
Runnable 的输出列表。
- Return type:
列表[输出]
- async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) AsyncIterator[tuple[int, Output | Exception]] #
在输入列表上并行运行ainvoke,在它们完成时产生结果。
- Parameters:
inputs (Sequence[Input]) – Runnable 的输入列表。
config (RunnableConfig | Sequence[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的'tags'、'metadata',用于控制并行工作量的'max_concurrency',以及其他键。有关更多详细信息,请参阅RunnableConfig。默认为None。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
输入索引和Runnable输出的元组。
- Return type:
AsyncIterator[元组[int, Output | 异常]]
- async ainvoke(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) BaseMessage #
ainvoke的默认实现,从线程调用invoke。
默认实现允许使用异步代码,即使Runnable没有实现本地的异步版本的invoke。
如果子类可以异步运行,则应重写此方法。
- Parameters:
输入 (LanguageModelInput)
config (可选[RunnableConfig])
stop (可选[列表[字符串]])
kwargs (Any)
- Return type:
- async astream(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) AsyncIterator[BaseMessageChunk] #
astream的默认实现,调用ainvoke。 如果子类支持流式输出,则应重写此方法。
- Parameters:
input (LanguageModelInput) – Runnable 的输入。
config (可选[RunnableConfig]) – 用于Runnable的配置。默认为None。
kwargs (Any) – 传递给Runnable的额外关键字参数。
stop (可选[列表[字符串]])
- Yields:
Runnable 的输出。
- Return type:
异步迭代器[BaseMessageChunk]
- async astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2'], include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[StandardStreamEvent | CustomStreamEvent] #
生成事件流。
用于创建一个迭代器,遍历提供实时信息的StreamEvents,包括来自中间结果的StreamEvents。
StreamEvent 是一个具有以下模式的字典:
event
: str - 事件名称的格式为格式: on_[runnable_type]_(start|stream|end).
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 与给定执行相关联的随机生成的ID发出事件的Runnable。 作为父Runnable执行的一部分被调用的子Runnable会被分配其自己唯一的ID。
parent_ids
: List[str] - 生成事件的父可运行对象的ID。根可运行对象将有一个空列表。 父ID的顺序是从根到直接父对象。 仅适用于API的v2版本。API的v1版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的Runnable的标签事件。
metadata
: Optional[Dict[str, Any]] - Runnable的元数据生成事件的元数据。
data
: Dict[str, Any]
下表展示了一些可能由不同链发出的事件。为了简洁起见,表中省略了元数据字段。链定义已包含在表后。
注意 此参考表适用于V2版本的架构。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[model name]
AIMessageChunk(content=”hello”)
on_chat_model_end
[model name]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[model name]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[model name]
‘你好,人类!’
链上开始
格式化文档
on_chain_stream
format_docs
“你好世界!,再见世界!”
on_chain_end
format_docs
[Document(…)]
“你好世界!,再见世界!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[retriever name]
{“query”: “hello”}
on_retriever_end
[retriever name]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[template_name]
{“question”: “hello”}
on_prompt_end
[template_name]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以派发自定义事件(见下面的示例)。
自定义事件将仅在API的v2版本中显示!
自定义事件具有以下格式:
属性
类型
描述
name
str
用户定义的事件名称。
data
Any
与事件相关的数据。这可以是任何内容,但我们建议使其可JSON序列化。
以下是上述标准事件相关的声明:
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例:
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分发自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- Parameters:
input (Any) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。
version (Literal['v1', 'v2']) – 使用的模式版本,可以是 v2 或 v1。 用户应使用 v2。 v1 是为了向后兼容,将在 0.4.0 版本中弃用。 在 API 稳定之前不会分配默认值。 自定义事件仅在 v2 中显示。
include_names (Sequence[str] | None) – 仅包含来自具有匹配名称的可运行对象的事件。
include_types (Sequence[str] | None) – 仅包含来自具有匹配类型的可运行对象的事件。
include_tags (Sequence[str] | None) – 仅包含具有匹配标签的可运行对象的事件。
exclude_names (Sequence[str] | None) – 排除具有匹配名称的可运行对象的事件。
exclude_types (Sequence[str] | None) – 排除具有匹配类型的可运行对象的事件。
exclude_tags (Sequence[str] | None) – 排除具有匹配标签的可运行对象的事件。
kwargs (Any) – 传递给 Runnable 的额外关键字参数。 这些参数将传递给 astream_log,因为 astream_events 的实现是基于 astream_log 的。
- Yields:
一个异步的StreamEvents流。
- Raises:
NotImplementedError – 如果版本不是v1或v2。
- Return type:
AsyncIterator[StandardStreamEvent | CustomStreamEvent]
- batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output] #
默认实现使用线程池执行器并行运行invoke。
batch的默认实现对于IO绑定的runnables效果很好。
如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。
- Parameters:
inputs (列表[Input])
config (RunnableConfig | list[RunnableConfig] | None)
return_exceptions (bool)
kwargs (任意 | 无)
- Return type:
列表[输出]
- batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) Iterator[tuple[int, Output | Exception]] #
在输入列表上并行运行invoke,在它们完成时产生结果。
- Parameters:
inputs (Sequence[Input])
config (RunnableConfig | Sequence[RunnableConfig] | None)
return_exceptions (bool)
kwargs (任意 | 无)
- Return type:
Iterator[元组[int, Output | 异常]]
- bind(**kwargs: Any) Runnable[Input, Output] #
将参数绑定到Runnable,返回一个新的Runnable。
当链中的Runnable需要一个不在前一个Runnable输出中或用户输入中的参数时,这很有用。
- Parameters:
kwargs (Any) – 绑定到Runnable的参数。
- Returns:
一个新的Runnable,参数已绑定。
- Return type:
Runnable[Input, Output]
示例:
from langchain_community.chat_models import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- bind_tools(tools: Sequence[Dict[str, Any] | type[BaseModel] | Callable | BaseTool], *, tool_choice: dict | str | Literal['auto', 'any'] | None = None, **kwargs: Any) Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], BaseMessage] [source]#
- Parameters:
工具 (序列[字典[字符串, 任意类型] | 类型[基础模型] | 可调用对象 | 基础工具])
tool_choice (dict | str | Literal['auto', 'any'] | None)
kwargs (Any)
- Return type:
Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], BaseMessage]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) RunnableSerializable #
配置可以在运行时设置的Runnables的替代方案。
- Parameters:
which (ConfigurableField) – 将用于选择替代项的ConfigurableField实例。
default_key (str) – 如果没有选择其他选项,则使用的默认键。 默认为“default”。
prefix_keys (bool) – 是否在键前加上 ConfigurableField 的 id。 默认为 False。
**kwargs (Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) – 一个字典,键为Runnable实例或返回Runnable实例的可调用对象。
- Returns:
一个新的Runnable,配置了替代方案。
- Return type:
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) RunnableSerializable #
在运行时配置特定的Runnable字段。
- Parameters:
**kwargs (ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) – 一个包含ConfigurableField实例的字典,用于配置。
- Returns:
一个新的Runnable,其字段已配置。
- Return type:
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- get_num_tokens(text: str) int #
获取文本中存在的标记数量。
用于检查输入是否适合模型的上下文窗口。
- Parameters:
文本 (字符串) – 要分词的字符串输入。
- Returns:
文本中的标记的整数数量。
- Return type:
整数
- get_num_tokens_from_messages(messages: list[BaseMessage], tools: Sequence | None = None) int #
获取消息中的令牌数量。
用于检查输入是否适合模型的上下文窗口。
注意: get_num_tokens_from_messages 的基本实现忽略了工具模式。
- Parameters:
messages (list[BaseMessage]) – 要标记化的消息输入。
工具 (序列 | 无) – 如果提供,则为字典、BaseModel、函数或BaseTools的序列,将被转换为工具模式。
- Returns:
消息中令牌数量的总和。
- Return type:
整数
- get_token_ids(text: str) list[int] #
返回文本中标记的有序ID。
- Parameters:
文本 (字符串) – 要分词的字符串输入。
- Returns:
- 与文本中的标记对应的ID列表,按它们在文本中出现的顺序排列
在文本中。
- Return type:
列表[int]
- invoke(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) BaseMessage #
将单个输入转换为输出。重写以实现。
- Parameters:
input (LanguageModelInput) – Runnable 的输入。
config (可选[RunnableConfig]) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。
stop (可选[列表[字符串]])
kwargs (Any)
- Returns:
Runnable 的输出。
- Return type:
- stream(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) Iterator[BaseMessageChunk] #
流的默认实现,调用invoke。 如果子类支持流输出,则应重写此方法。
- Parameters:
input (LanguageModelInput) – Runnable 的输入。
config (可选[RunnableConfig]) – 用于Runnable的配置。默认为None。
kwargs (Any) – 传递给Runnable的额外关键字参数。
stop (可选[列表[字符串]])
- Yields:
Runnable 的输出。
- Return type:
迭代器[BaseMessageChunk]
- with_alisteners(*, on_start: AsyncListener | None = None, on_end: AsyncListener | None = None, on_error: AsyncListener | None = None) Runnable[Input, Output] #
将异步生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前异步调用。 on_end: 在Runnable完成运行之后异步调用。 on_error: 如果Runnable抛出错误,则异步调用。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
- Parameters:
on_start (Optional[AsyncListener]) – 在Runnable开始运行之前异步调用。 默认为None。
on_end (Optional[AsyncListener]) – 在Runnable运行结束后异步调用。 默认为None。
on_error (可选[AsyncListener]) – 如果Runnable抛出错误,则异步调用。 默认为None。
- Returns:
一个新的Runnable,绑定了监听器。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda import time async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())} await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())} await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2024-05-16T14:20:29.637053+00:00 on start callback starts at 2024-05-16T14:20:29.637150+00:00 on start callback ends at 2024-05-16T14:20:32.638305+00:00 on start callback ends at 2024-05-16T14:20:32.638383+00:00 Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00 Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00 Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00 on end callback starts at 2024-05-16T14:20:35.640534+00:00 Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00 on end callback starts at 2024-05-16T14:20:37.640574+00:00 on end callback ends at 2024-05-16T14:20:37.640654+00:00 on end callback ends at 2024-05-16T14:20:39.641751+00:00
- with_config(config: RunnableConfig | None = None, **kwargs: Any) Runnable[Input, Output] #
将配置绑定到一个可运行对象,返回一个新的可运行对象。
- Parameters:
config (RunnableConfig | None) – 绑定到Runnable的配置。
kwargs (Any) – 传递给Runnable的额外关键字参数。
- Returns:
一个新的Runnable,带有绑定的配置。
- Return type:
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output] #
为Runnable添加回退,返回一个新的Runnable。
新的Runnable将尝试原始的Runnable,然后在失败时依次尝试每个回退。
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 runnables。
exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。 默认为 (Exception,)。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。默认为 None。
- Returns:
一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。
- Return type:
RunnableWithFallbacksT[Input, Output]
示例
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 runnables。
exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。
- Returns:
一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。
- Return type:
RunnableWithFallbacksT[Input, Output]
- with_listeners(*, on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None) Runnable[Input, Output] #
将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前调用,带有Run对象。 on_end: 在Runnable完成运行之后调用,带有Run对象。 on_error: 如果Runnable抛出错误时调用,带有Run对象。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
- Parameters:
on_start (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 在Runnable开始运行之前调用。默认为无。
on_end (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 在Runnable完成运行后调用。默认为无。
on_error (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 如果Runnable抛出错误时调用。默认为无。
- Returns:
一个新的Runnable,绑定了监听器。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output] #
创建一个新的Runnable,在异常时重试原始的Runnable。
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于重试。 默认值为 (Exception,)。
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。
stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为3。
- Returns:
一个新的Runnable,在异常时重试原始的Runnable。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于在发生这些异常时重试
wait_exponential_jitter (bool) – 是否在重试之间为等待时间添加抖动
stop_after_attempt (int) – 在放弃之前尝试的最大次数
- Returns:
一个新的Runnable,在异常时重试原始的Runnable。
- Return type:
Runnable[Input, Output]
- with_structured_output(schema: Dict[str, Any] | Type[_BM] | Type, *, include_raw: bool = False, **kwargs: Any) Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], Dict | BaseModel] [source]#
模型包装器,返回格式化为匹配给定模式的输出。
- Parameters:
schema (Dict[str, Any] | Type[_BM] | Type) –
- 输出模式。可以作为以下形式传入:
一个OpenAI函数/工具模式,
一个JSON模式,
一个TypedDict类,
或一个Pydantic类。
如果
schema
是一个Pydantic类,那么模型输出将是该类的一个Pydantic实例,并且模型生成的字段将由Pydantic类进行验证。否则,模型输出将是一个字典,并且不会被验证。有关在指定Pydantic或TypedDict类时如何正确指定模式字段的类型和描述的更多信息,请参见langchain_core.utils.function_calling.convert_to_openai_tool()
。include_raw (bool) – 如果为False,则仅返回解析后的结构化输出。如果在模型输出解析过程中发生错误,将会抛出。如果为True,则返回原始模型响应(一个BaseMessage)和解析后的模型响应。如果在输出解析过程中发生错误,它将被捕获并返回。最终输出始终是一个包含“raw”、“parsed”和“parsing_error”键的字典。
kwargs (Any)
- Returns:
一个Runnable,它接受与
langchain_core.language_models.chat.BaseChatModel
相同的输入。如果
include_raw
为False且schema
是一个Pydantic类,Runnable会输出schema
的一个实例(即一个Pydantic对象)。否则,如果
include_raw
为False,则Runnable输出一个字典。- 如果
include_raw
为True,则Runnable输出一个包含以下键的字典: "raw"
: BaseMessage"parsed"
: 如果解析错误则为None,否则类型取决于schema
,如上所述。"parsing_error"
: Optional[BaseException]
- 如果
- Return type:
Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], Dict | BaseModel]
- Example: Pydantic schema (include_raw=False):
from pydantic import BaseModel class AnswerWithJustification(BaseModel): '''An answer to the user question along with justification for the answer.''' answer: str justification: str llm = ChatModel(model="model-name", temperature=0) structured_llm = llm.with_structured_output(AnswerWithJustification) structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers") # -> AnswerWithJustification( # answer='They weigh the same', # justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.' # )
- Example: Pydantic schema (include_raw=True):
from pydantic import BaseModel class AnswerWithJustification(BaseModel): '''An answer to the user question along with justification for the answer.''' answer: str justification: str llm = ChatModel(model="model-name", temperature=0) structured_llm = llm.with_structured_output(AnswerWithJustification, include_raw=True) structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers") # -> { # 'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Ao02pnFYXD6GN1yzc0uXPsvF', 'function': {'arguments': '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}', 'name': 'AnswerWithJustification'}, 'type': 'function'}]}), # 'parsed': AnswerWithJustification(answer='They weigh the same.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'), # 'parsing_error': None # }
- Example: Dict schema (include_raw=False):
from pydantic import BaseModel from langchain_core.utils.function_calling import convert_to_openai_tool class AnswerWithJustification(BaseModel): '''An answer to the user question along with justification for the answer.''' answer: str justification: str dict_schema = convert_to_openai_tool(AnswerWithJustification) llm = ChatModel(model="model-name", temperature=0) structured_llm = llm.with_structured_output(dict_schema) structured_llm.invoke("What weighs more a pound of bricks or a pound of feathers") # -> { # 'answer': 'They weigh the same', # 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.' # }
在版本0.2.26中更改:增加了对TypedDict类的支持。
- with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) Runnable[Input, Output] #
将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。
- Parameters:
input_type (type[Input] | None) – 要绑定到Runnable的输入类型。默认为None。
output_type (type[Output] | None) – 要绑定到Runnable的输出类型。默认为None。
- Returns:
一个带有类型绑定的新Runnable。
- Return type:
Runnable[Input, Output]
使用 ChatBedrockConverse 的示例