MapRerankDocumentsChain#
- class langchain.chains.combine_documents.map_rerank.MapRerankDocumentsChain[source]#
-
自版本0.3.1起已弃用:此类已弃用。请参阅迁移指南以获取推荐的替代方案:https://python.langchain.com/docs/versions/migrating_chains/map_rerank_docs_chain/ 在langchain==1.0之前不会移除。
通过映射链组合文档,然后重新排序结果。
该算法在每个输入文档上调用一个LLMChain。LLMChain预期会有一个OutputParser,将结果解析为答案(answer_key)和分数(rank_key)。然后返回分数最高的答案。
- Example:
from langchain.chains import MapRerankDocumentsChain, LLMChain from langchain_core.prompts import PromptTemplate from langchain_community.llms import OpenAI from langchain.output_parsers.regex import RegexParser document_variable_name = "context" llm = OpenAI() # The prompt here should take as an input variable the # `document_variable_name` # The actual prompt will need to be a lot more complex, this is just # an example. prompt_template = ( "Use the following context to tell me the chemical formula " "for water. Output both your answer and a score of how confident " "you are. Context: {context}" ) output_parser = RegexParser( regex=r"(.*?)
- Score: (.*)”,
输出键=[“answer”, “score”],
) prompt = PromptTemplate(
template=prompt_template, input_variables=[“context”], output_parser=output_parser,
) llm_chain = LLMChain(llm=llm, prompt=prompt) chain = MapRerankDocumentsChain(
llm_chain=llm_chain, document_variable_name=document_variable_name, rank_key=”score”, answer_key=”answer”,
)
注意
MapRerankDocumentsChain 实现了标准的
Runnable Interface
。🏃Runnable Interface
接口在可运行对象上提供了额外的方法,例如with_types
,with_retry
,assign
,bind
,get_graph
, 等等。- param answer_key: str [Required]#
在llm_chain的输出中输入键以返回答案。
- param callback_manager: BaseCallbackManager | None = None#
[已弃用] 请使用 callbacks 代替。
- param callbacks: Callbacks = None#
可选的回调处理程序(或回调管理器)列表。默认为None。 回调处理程序在调用链的整个生命周期中被调用, 从on_chain_start开始,到on_chain_end或on_chain_error结束。 每个自定义链可以选择性地调用额外的回调方法,详情请参阅Callback文档。
- param document_variable_name: str [Required]#
在llm_chain中用于存放文档的变量名。如果llm_chain中只有一个变量,则无需提供此变量名。
- param memory: BaseMemory | None = None#
可选的内存对象。默认为None。 Memory是一个在每条链的开始和结束时被调用的类。在开始时,内存加载变量并将它们传递到链中。在结束时,它保存任何返回的变量。 有许多不同类型的内存 - 请参阅内存文档以获取完整目录。
- param metadata: Dict[str, Any] | None = None#
与链关联的可选元数据。默认为None。 此元数据将与每次调用此链相关联, 并作为参数传递给callbacks中定义的处理程序。 您可以使用这些元数据来识别具有特定用例的链实例。
- param metadata_keys: List[str] | None = None#
从所选文档返回的附加元数据。
- param rank_key: str [Required]#
在llm_chain的输出中输入关键词以进行排名。
- param return_intermediate_steps: bool = False#
返回中间步骤。 中间步骤包括在每个文档上调用llm_chain的结果。
- param tags: List[str] | None = None#
与链关联的可选标签列表。默认为None。 这些标签将与每次调用此链相关联, 并作为参数传递给callbacks中定义的处理程序。 您可以使用这些标签来识别具有特定用例的链实例。
- param verbose: bool [Optional]#
是否以详细模式运行。在详细模式下,一些中间日志将被打印到控制台。默认为全局verbose值,可通过langchain.globals.get_verbose()访问。
- __call__(inputs: Dict[str, Any] | Any, return_only_outputs: bool = False, callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, *, tags: List[str] | None = None, metadata: Dict[str, Any] | None = None, run_name: str | None = None, include_run_info: bool = False) Dict[str, Any] #
自版本0.1.0起已弃用:请改用
invoke()
。在langchain==1.0之前不会移除。执行链。
- Parameters:
inputs (Dict[str, Any] | Any) – 输入的字典,如果链只期望一个参数,则为单个输入。应包含在Chain.input_keys中指定的所有输入,除了将由链的内存设置的输入。
return_only_outputs (bool) – 是否仅在响应中返回输出。如果为True,则仅返回此链生成的新键。如果为False,则返回输入键和此链生成的新键。默认为False。
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – 用于此链运行的回调。这些回调将在构造期间传递给链的回调之外调用,但只有这些运行时回调会传播到对其他对象的调用中。
tags (List[str] | None) – 传递给所有回调的字符串标签列表。这些标签将与在构造期间传递给链的标签一起传递,但只有这些运行时标签会传播到对其他对象的调用中。
metadata (Dict[str, Any] | None) – 可选的与链相关的元数据。默认为 None
include_run_info (bool) – 是否在响应中包含运行信息。默认为False。
run_name (str | None)
- Returns:
- 一个包含命名输出的字典。应包含所有在
Chain.output_keys中指定的输出。
- Return type:
Dict[str, Any]
- async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output] #
默认实现使用asyncio.gather并行运行ainvoke。
batch的默认实现对于IO绑定的runnables效果很好。
如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。
- Parameters:
inputs (list[Input]) – Runnable 的输入列表。
config (RunnableConfig | list[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Returns:
Runnable 的输出列表。
- Return type:
列表[输出]
- async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) AsyncIterator[tuple[int, Output | Exception]] #
在输入列表上并行运行ainvoke,在它们完成时产生结果。
- Parameters:
inputs (Sequence[Input]) – Runnable 的输入列表。
config (RunnableConfig | Sequence[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的'tags'、'metadata',用于控制并行工作量的'max_concurrency',以及其他键。有关更多详细信息,请参阅RunnableConfig。默认为None。默认为None。
return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
输入索引和Runnable输出的元组。
- Return type:
AsyncIterator[元组[int, Output | 异常]]
- async acall(inputs: Dict[str, Any] | Any, return_only_outputs: bool = False, callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, *, tags: List[str] | None = None, metadata: Dict[str, Any] | None = None, run_name: str | None = None, include_run_info: bool = False) Dict[str, Any] #
自版本0.1.0起已弃用:请改用
ainvoke()
。在langchain==1.0之前不会移除。异步执行链。
- Parameters:
inputs (Dict[str, Any] | Any) – 输入的字典,如果链只期望一个参数,则为单个输入。应包含在Chain.input_keys中指定的所有输入,除了将由链的内存设置的输入。
return_only_outputs (bool) – 是否仅在响应中返回输出。如果为True,则仅返回此链生成的新键。如果为False,则返回输入键和此链生成的新键。默认为False。
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – 用于此链运行的回调。这些回调将在构造期间传递给链的回调之外调用,但只有这些运行时回调会传播到对其他对象的调用中。
tags (List[str] | None) – 传递给所有回调的字符串标签列表。这些标签将与在构造期间传递给链的标签一起传递,但只有这些运行时标签会传播到对其他对象的调用中。
metadata (Dict[str, Any] | None) – 可选的与链相关的元数据。默认为 None
include_run_info (bool) – 是否在响应中包含运行信息。默认为False。
run_name (str | None)
- Returns:
- 一个包含命名输出的字典。应包含所有在
Chain.output_keys中指定的输出。
- Return type:
Dict[str, Any]
- async acombine_docs(docs: List[Document], callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, **kwargs: Any) Tuple[str, dict] [source]#
以地图重新排序的方式合并文档。
通过首先在所有文档上映射链,然后重新排序结果来组合。
- Parameters:
docs (List[Document]) – 要合并的文档列表
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – 要传递的回调函数
**kwargs (Any) – 传递给LLM调用的额外参数(例如除了文档之外的其他输入变量)
- Returns:
返回的第一个元素是单个字符串输出。返回的第二个元素是其他键的字典。
- Return type:
元组[str, dict]
- async ainvoke(input: Dict[str, Any], config: RunnableConfig | None = None, **kwargs: Any) Dict[str, Any] #
ainvoke的默认实现,从线程调用invoke。
默认实现允许使用异步代码,即使Runnable没有实现本地的异步版本的invoke。
如果子类可以异步运行,则应重写此方法。
- Parameters:
输入 (字典[字符串, 任意类型])
config (RunnableConfig | None)
kwargs (Any)
- Return type:
Dict[str, Any]
- apply(input_list: List[Dict[str, Any]], callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None) List[Dict[str, str]] #
自版本0.1.0起已弃用:请改用
batch()
。在langchain==1.0之前不会移除。在列表中的所有输入上调用链。
- Parameters:
input_list (List[Dict[str, Any]])
callbacks (列表[BaseCallbackHandler] | BaseCallbackManager | 无)
- Return type:
列表[字典[str, str]]
- async aprep_inputs(inputs: Dict[str, Any] | Any) Dict[str, str] #
准备链输入,包括从内存中添加输入。
- Parameters:
inputs (Dict[str, Any] | Any) – 原始输入的字典,如果链只期望一个参数,则为单个输入。应包含在Chain.input_keys中指定的所有输入,除了将由链的内存设置的输入。
- Returns:
所有输入的字典,包括由链的内存添加的那些。
- Return type:
字典[str, str]
- async aprep_outputs(inputs: Dict[str, str], outputs: Dict[str, str], return_only_outputs: bool = False) Dict[str, str] #
验证并准备链输出,并将此运行的信息保存到内存中。
- Parameters:
inputs (Dict[str, str]) – 链输入的字典,包括由链内存添加的任何输入。
outputs (Dict[str, str]) – 初始链输出的字典。
return_only_outputs (bool) – 是否仅返回链的输出。如果为False,输入也会添加到最终输出中。
- Returns:
最终链输出的字典。
- Return type:
字典[str, str]
- async arun(*args: Any, callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, tags: List[str] | None = None, metadata: Dict[str, Any] | None = None, **kwargs: Any) Any #
自版本0.1.0起已弃用:请改用
ainvoke()
。在langchain==1.0之前不会移除。执行链的便捷方法。
此方法与Chain.__call__的主要区别在于,此方法期望输入直接作为位置参数或关键字参数传递,而Chain.__call__期望一个包含所有输入的单一输入字典
- Parameters:
*args (Any) – 如果链期望单个输入,则可以将其作为唯一的位置参数传递。
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – 用于此链运行的回调。这些回调将在构造期间传递给链的回调之外调用,但只有这些运行时回调会传播到对其他对象的调用中。
tags (List[str] | None) – 传递给所有回调的字符串标签列表。这些标签将与在构造期间传递给链的标签一起传递,但只有这些运行时标签会传播到对其他对象的调用中。
**kwargs (Any) – 如果链期望多个输入,它们可以直接作为关键字参数传递。
metadata (Dict[str, Any] | None)
**kwargs
- Returns:
链式输出。
- Return type:
任何
示例
# Suppose we have a single-input chain that takes a 'question' string: await chain.arun("What's the temperature in Boise, Idaho?") # -> "The temperature in Boise is..." # Suppose we have a multi-input chain that takes a 'question' string # and 'context' string: question = "What's the temperature in Boise, Idaho?" context = "Weather report for Boise, Idaho on 07/03/23..." await chain.arun(question=question, context=context) # -> "The temperature in Boise is..."
- async astream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) AsyncIterator[Output] #
astream的默认实现,调用ainvoke。 如果子类支持流式输出,则应重写此方法。
- Parameters:
input (Input) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。默认为None。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
Runnable 的输出。
- Return type:
AsyncIterator[Output]
- async astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2'], include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[StandardStreamEvent | CustomStreamEvent] #
生成事件流。
用于创建一个迭代器,遍历提供实时信息的StreamEvents,包括来自中间结果的StreamEvents。
StreamEvent 是一个具有以下模式的字典:
event
: str - 事件名称的格式为格式: on_[runnable_type]_(start|stream|end).
name
: str - 生成事件的 Runnable 的名称。run_id
: str - 与给定执行相关联的随机生成的ID发出事件的Runnable。 作为父Runnable执行的一部分被调用的子Runnable会被分配其自己唯一的ID。
parent_ids
: List[str] - 生成事件的父可运行对象的ID。根可运行对象将有一个空列表。 父ID的顺序是从根到直接父对象。 仅适用于API的v2版本。API的v1版本将返回一个空列表。
tags
: Optional[List[str]] - 生成事件的Runnable的标签事件。
metadata
: Optional[Dict[str, Any]] - Runnable的元数据生成事件的元数据。
data
: Dict[str, Any]
下表展示了一些可能由不同链发出的事件。为了简洁起见,表中省略了元数据字段。链定义已包含在表后。
注意 此参考表适用于V2版本的架构。
事件
名称
块
输入
输出
on_chat_model_start
[模型名称]
{“messages”: [[SystemMessage, HumanMessage]]}
on_chat_model_stream
[model name]
AIMessageChunk(content=”hello”)
on_chat_model_end
[model name]
{“messages”: [[SystemMessage, HumanMessage]]}
AIMessageChunk(content=”hello world”)
on_llm_start
[model name]
{‘input’: ‘hello’}
on_llm_stream
[模型名称]
‘Hello’
on_llm_end
[model name]
‘你好,人类!’
链上开始
格式化文档
on_chain_stream
format_docs
“你好世界!,再见世界!”
on_chain_end
format_docs
[Document(…)]
“你好世界!,再见世界!”
on_tool_start
some_tool
{“x”: 1, “y”: “2”}
on_tool_end
some_tool
{“x”: 1, “y”: “2”}
on_retriever_start
[retriever name]
{“query”: “hello”}
on_retriever_end
[retriever name]
{“query”: “hello”}
[Document(…), ..]
on_prompt_start
[template_name]
{“question”: “hello”}
on_prompt_end
[template_name]
{“question”: “hello”}
ChatPromptValue(messages: [SystemMessage, …])
除了标准事件外,用户还可以派发自定义事件(见下面的示例)。
自定义事件将仅在API的v2版本中显示!
自定义事件具有以下格式:
属性
类型
描述
name
str
用户定义的事件名称。
data
Any
与事件相关的数据。这可以是任何内容,但我们建议使其可JSON序列化。
以下是上述标准事件相关的声明:
format_docs:
def format_docs(docs: List[Document]) -> str: '''Format the docs.''' return ", ".join([doc.page_content for doc in docs]) format_docs = RunnableLambda(format_docs)
some_tool:
@tool def some_tool(x: int, y: str) -> dict: '''Some_tool.''' return {"x": x, "y": y}
提示:
template = ChatPromptTemplate.from_messages( [("system", "You are Cat Agent 007"), ("human", "{question}")] ).with_config({"run_name": "my_template", "tags": ["my_template"]})
示例:
from langchain_core.runnables import RunnableLambda async def reverse(s: str) -> str: return s[::-1] chain = RunnableLambda(func=reverse) events = [ event async for event in chain.astream_events("hello", version="v2") ] # will produce the following events (run_id, and parent_ids # has been omitted for brevity): [ { "data": {"input": "hello"}, "event": "on_chain_start", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"chunk": "olleh"}, "event": "on_chain_stream", "metadata": {}, "name": "reverse", "tags": [], }, { "data": {"output": "olleh"}, "event": "on_chain_end", "metadata": {}, "name": "reverse", "tags": [], }, ]
示例:分发自定义事件
from langchain_core.callbacks.manager import ( adispatch_custom_event, ) from langchain_core.runnables import RunnableLambda, RunnableConfig import asyncio async def slow_thing(some_input: str, config: RunnableConfig) -> str: """Do something that takes a long time.""" await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 1 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation await adispatch_custom_event( "progress_event", {"message": "Finished step 2 of 3"}, config=config # Must be included for python < 3.10 ) await asyncio.sleep(1) # Placeholder for some slow operation return "Done" slow_thing = RunnableLambda(slow_thing) async for event in slow_thing.astream_events("some_input", version="v2"): print(event)
- Parameters:
input (Any) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。
version (Literal['v1', 'v2']) – 使用的模式版本,可以是 v2 或 v1。 用户应使用 v2。 v1 是为了向后兼容,将在 0.4.0 版本中弃用。 在 API 稳定之前不会分配默认值。 自定义事件仅在 v2 中显示。
include_names (Sequence[str] | None) – 仅包含来自具有匹配名称的可运行对象的事件。
include_types (Sequence[str] | None) – 仅包含来自具有匹配类型的可运行对象的事件。
include_tags (Sequence[str] | None) – 仅包含具有匹配标签的可运行对象的事件。
exclude_names (Sequence[str] | None) – 排除具有匹配名称的可运行对象的事件。
exclude_types (Sequence[str] | None) – 排除具有匹配类型的可运行对象的事件。
exclude_tags (Sequence[str] | None) – 排除具有匹配标签的可运行对象的事件。
kwargs (Any) – 传递给 Runnable 的额外关键字参数。 这些参数将传递给 astream_log,因为 astream_events 的实现是基于 astream_log 的。
- Yields:
一个异步的StreamEvents流。
- Raises:
NotImplementedError – 如果版本不是v1或v2。
- Return type:
AsyncIterator[StandardStreamEvent | CustomStreamEvent]
- batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output] #
默认实现使用线程池执行器并行运行invoke。
batch的默认实现对于IO绑定的runnables效果很好。
如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。
- Parameters:
inputs (列表[Input])
config (RunnableConfig | list[RunnableConfig] | None)
return_exceptions (bool)
kwargs (任意 | 无)
- Return type:
列表[输出]
- batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) Iterator[tuple[int, Output | Exception]] #
在输入列表上并行运行invoke,在它们完成时产生结果。
- Parameters:
inputs (Sequence[Input])
config (RunnableConfig | Sequence[RunnableConfig] | None)
return_exceptions (bool)
kwargs (任意 | 无)
- Return type:
Iterator[元组[int, Output | 异常]]
- bind(**kwargs: Any) Runnable[Input, Output] #
将参数绑定到Runnable,返回一个新的Runnable。
当链中的Runnable需要一个不在前一个Runnable输出中或用户输入中的参数时,这很有用。
- Parameters:
kwargs (Any) – 绑定到Runnable的参数。
- Returns:
一个新的Runnable,参数已绑定。
- Return type:
Runnable[Input, Output]
示例:
from langchain_community.chat_models import ChatOllama from langchain_core.output_parsers import StrOutputParser llm = ChatOllama(model='llama2') # Without bind. chain = ( llm | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two three four five.' # With bind. chain = ( llm.bind(stop=["three"]) | StrOutputParser() ) chain.invoke("Repeat quoted words exactly: 'One two three four five.'") # Output is 'One two'
- combine_docs(docs: List[Document], callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, **kwargs: Any) Tuple[str, dict] [source]#
以地图重新排序的方式合并文档。
通过首先在所有文档上映射链,然后重新排序结果来组合。
- Parameters:
docs (List[Document]) – 要合并的文档列表
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – 要传递的回调函数
**kwargs (Any) – 传递给LLM调用的额外参数(例如除了文档之外的其他输入变量)
- Returns:
返回的第一个元素是单个字符串输出。返回的第二个元素是其他键的字典。
- Return type:
元组[str, dict]
- configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) RunnableSerializable #
配置可以在运行时设置的Runnables的替代方案。
- Parameters:
which (ConfigurableField) – 将用于选择替代项的ConfigurableField实例。
default_key (str) – 如果没有选择其他选项,则使用的默认键。 默认为“default”。
prefix_keys (bool) – 是否在键前加上 ConfigurableField 的 id。 默认为 False。
**kwargs (Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) – 一个字典,键为Runnable实例或返回Runnable实例的可调用对象。
- Returns:
一个新的Runnable,配置了替代方案。
- Return type:
from langchain_anthropic import ChatAnthropic from langchain_core.runnables.utils import ConfigurableField from langchain_openai import ChatOpenAI model = ChatAnthropic( model_name="claude-3-sonnet-20240229" ).configurable_alternatives( ConfigurableField(id="llm"), default_key="anthropic", openai=ChatOpenAI() ) # uses the default model ChatAnthropic print(model.invoke("which organization created you?").content) # uses ChatOpenAI print( model.with_config( configurable={"llm": "openai"} ).invoke("which organization created you?").content )
- configurable_fields(**kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) RunnableSerializable #
在运行时配置特定的Runnable字段。
- Parameters:
**kwargs (ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) – 一个包含ConfigurableField实例的字典,用于配置。
- Returns:
一个新的Runnable,其字段已配置。
- Return type:
from langchain_core.runnables import ConfigurableField from langchain_openai import ChatOpenAI model = ChatOpenAI(max_tokens=20).configurable_fields( max_tokens=ConfigurableField( id="output_token_number", name="Max tokens in the output", description="The maximum number of tokens in the output", ) ) # max_tokens = 20 print( "max_tokens_20: ", model.invoke("tell me something about chess").content ) # max_tokens = 200 print("max_tokens_200: ", model.with_config( configurable={"output_token_number": 200} ).invoke("tell me something about chess").content )
- invoke(input: Dict[str, Any], config: RunnableConfig | None = None, **kwargs: Any) Dict[str, Any] #
将单个输入转换为输出。重写以实现。
- Parameters:
input (Dict[str, Any]) – Runnable 的输入。
config (RunnableConfig | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。
kwargs (Any)
- Returns:
Runnable 的输出。
- Return type:
Dict[str, Any]
- prep_inputs(inputs: Dict[str, Any] | Any) Dict[str, str] #
准备链输入,包括从内存中添加输入。
- Parameters:
inputs (Dict[str, Any] | Any) – 原始输入的字典,如果链只期望一个参数,则为单个输入。应包含在Chain.input_keys中指定的所有输入,除了将由链的内存设置的输入。
- Returns:
所有输入的字典,包括由链的内存添加的那些。
- Return type:
字典[str, str]
- prep_outputs(inputs: Dict[str, str], outputs: Dict[str, str], return_only_outputs: bool = False) Dict[str, str] #
验证并准备链输出,并将此运行的信息保存到内存中。
- Parameters:
inputs (Dict[str, str]) – 链输入的字典,包括由链内存添加的任何输入。
outputs (Dict[str, str]) – 初始链输出的字典。
return_only_outputs (bool) – 是否仅返回链的输出。如果为False,输入也会添加到最终输出中。
- Returns:
最终链输出的字典。
- Return type:
字典[str, str]
- prompt_length(docs: List[Document], **kwargs: Any) int | None #
返回传入文档的提示长度。
调用者可以使用此功能来确定传入文档列表是否会超过某个提示长度。这在尝试确保提示的大小保持在某个上下文限制以下时非常有用。
- Parameters:
docs (List[Document]) – List[Document], 用于计算总提示长度的文档列表。
kwargs (Any)
- Returns:
如果该方法不依赖于提示长度,则返回 None,否则返回提示的长度(以标记为单位)。
- Return type:
整数 | 无
- run(*args: Any, callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, tags: List[str] | None = None, metadata: Dict[str, Any] | None = None, **kwargs: Any) Any #
自版本0.1.0起已弃用:请改用
invoke()
。在langchain==1.0之前不会移除。执行链的便捷方法。
此方法与Chain.__call__的主要区别在于,此方法期望输入直接作为位置参数或关键字参数传递,而Chain.__call__期望一个包含所有输入的单一输入字典
- Parameters:
*args (Any) – 如果链期望单个输入,则可以将其作为唯一的位置参数传递。
callbacks (list[BaseCallbackHandler] | BaseCallbackManager | None) – 用于此链运行的回调。这些回调将在构造期间传递给链的回调之外调用,但只有这些运行时回调会传播到对其他对象的调用中。
tags (List[str] | None) – 传递给所有回调的字符串标签列表。这些标签将与在构造期间传递给链的标签一起传递,但只有这些运行时标签会传播到对其他对象的调用中。
**kwargs (Any) – 如果链期望多个输入,它们可以直接作为关键字参数传递。
metadata (Dict[str, Any] | None)
**kwargs
- Returns:
链式输出。
- Return type:
任何
示例
# Suppose we have a single-input chain that takes a 'question' string: chain.run("What's the temperature in Boise, Idaho?") # -> "The temperature in Boise is..." # Suppose we have a multi-input chain that takes a 'question' string # and 'context' string: question = "What's the temperature in Boise, Idaho?" context = "Weather report for Boise, Idaho on 07/03/23..." chain.run(question=question, context=context) # -> "The temperature in Boise is..."
- save(file_path: Path | str) None #
保存链。
- Expects Chain._chain_type property to be implemented and for memory to be
空值。
- Parameters:
file_path (Path | str) – 保存链的文件路径。
- Return type:
无
示例
chain.save(file_path="path/chain.yaml")
- stream(input: Input, config: RunnableConfig | None = None, **kwargs: Any | None) Iterator[Output] #
流的默认实现,调用invoke。 如果子类支持流输出,则应重写此方法。
- Parameters:
input (Input) – Runnable 的输入。
config (RunnableConfig | None) – 用于Runnable的配置。默认为None。
kwargs (Any | None) – 传递给Runnable的额外关键字参数。
- Yields:
Runnable 的输出。
- Return type:
迭代器[输出]
- with_alisteners(*, on_start: AsyncListener | None = None, on_end: AsyncListener | None = None, on_error: AsyncListener | None = None) Runnable[Input, Output] #
将异步生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前异步调用。 on_end: 在Runnable完成运行之后异步调用。 on_error: 如果Runnable抛出错误,则异步调用。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
- Parameters:
on_start (Optional[AsyncListener]) – 在Runnable开始运行之前异步调用。 默认为None。
on_end (Optional[AsyncListener]) – 在Runnable运行结束后异步调用。 默认为None。
on_error (可选[AsyncListener]) – 如果Runnable抛出错误,则异步调用。 默认为None。
- Returns:
一个新的Runnable,绑定了监听器。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda import time async def test_runnable(time_to_sleep : int): print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}") await asyncio.sleep(time_to_sleep) print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}") async def fn_start(run_obj : Runnable): print(f"on start callback starts at {format_t(time.time())} await asyncio.sleep(3) print(f"on start callback ends at {format_t(time.time())}") async def fn_end(run_obj : Runnable): print(f"on end callback starts at {format_t(time.time())} await asyncio.sleep(2) print(f"on end callback ends at {format_t(time.time())}") runnable = RunnableLambda(test_runnable).with_alisteners( on_start=fn_start, on_end=fn_end ) async def concurrent_runs(): await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3)) asyncio.run(concurrent_runs()) Result: on start callback starts at 2024-05-16T14:20:29.637053+00:00 on start callback starts at 2024-05-16T14:20:29.637150+00:00 on start callback ends at 2024-05-16T14:20:32.638305+00:00 on start callback ends at 2024-05-16T14:20:32.638383+00:00 Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00 Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00 Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00 on end callback starts at 2024-05-16T14:20:35.640534+00:00 Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00 on end callback starts at 2024-05-16T14:20:37.640574+00:00 on end callback ends at 2024-05-16T14:20:37.640654+00:00 on end callback ends at 2024-05-16T14:20:39.641751+00:00
- with_config(config: RunnableConfig | None = None, **kwargs: Any) Runnable[Input, Output] #
将配置绑定到一个可运行对象,返回一个新的可运行对象。
- Parameters:
config (RunnableConfig | None) – 绑定到Runnable的配置。
kwargs (Any) – 传递给Runnable的额外关键字参数。
- Returns:
一个新的Runnable,带有绑定的配置。
- Return type:
Runnable[Input, Output]
- with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output] #
为Runnable添加回退,返回一个新的Runnable。
新的Runnable将尝试原始的Runnable,然后在失败时依次尝试每个回退。
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 runnables。
exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。 默认为 (Exception,)。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。默认为 None。
- Returns:
一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。
- Return type:
RunnableWithFallbacksT[Input, Output]
示例
from typing import Iterator from langchain_core.runnables import RunnableGenerator def _generate_immediate_error(input: Iterator) -> Iterator[str]: raise ValueError() yield "" def _generate(input: Iterator) -> Iterator[str]: yield from "foo bar" runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks( [RunnableGenerator(_generate)] ) print(''.join(runnable.stream({}))) #foo bar
- Parameters:
fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 runnables。
exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。
exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。
- Returns:
一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。
- Return type:
RunnableWithFallbacksT[Input, Output]
- with_listeners(*, on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None) Runnable[Input, Output] #
将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。
on_start: 在Runnable开始运行之前调用,带有Run对象。 on_end: 在Runnable完成运行之后调用,带有Run对象。 on_error: 如果Runnable抛出错误时调用,带有Run对象。
Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。
- Parameters:
on_start (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 在Runnable开始运行之前调用。默认为无。
on_end (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 在Runnable完成运行后调用。默认为无。
on_error (可选[联合[可调用[[运行], 无], 可调用[[运行, RunnableConfig], 无]]]) – 如果Runnable抛出错误时调用。默认为无。
- Returns:
一个新的Runnable,绑定了监听器。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda from langchain_core.tracers.schemas import Run import time def test_runnable(time_to_sleep : int): time.sleep(time_to_sleep) def fn_start(run_obj: Run): print("start_time:", run_obj.start_time) def fn_end(run_obj: Run): print("end_time:", run_obj.end_time) chain = RunnableLambda(test_runnable).with_listeners( on_start=fn_start, on_end=fn_end ) chain.invoke(2)
- with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output] #
创建一个新的Runnable,在异常时重试原始的Runnable。
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于重试。 默认值为 (Exception,)。
wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。
stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为3。
- Returns:
一个新的Runnable,在异常时重试原始的Runnable。
- Return type:
Runnable[Input, Output]
示例:
from langchain_core.runnables import RunnableLambda count = 0 def _lambda(x: int) -> None: global count count = count + 1 if x == 1: raise ValueError("x is 1") else: pass runnable = RunnableLambda(_lambda) try: runnable.with_retry( stop_after_attempt=2, retry_if_exception_type=(ValueError,), ).invoke(1) except ValueError: pass assert (count == 2)
- Parameters:
retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于在发生这些异常时重试
wait_exponential_jitter (bool) – 是否在重试之间为等待时间添加抖动
stop_after_attempt (int) – 在放弃之前尝试的最大次数
- Returns:
一个新的Runnable,在异常时重试原始的Runnable。
- Return type:
Runnable[Input, Output]
- with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) Runnable[Input, Output] #
将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。
- Parameters:
input_type (type[Input] | None) – 要绑定到Runnable的输入类型。默认为None。
output_type (type[Output] | None) – 要绑定到Runnable的输出类型。默认为None。
- Returns:
一个带有类型绑定的新Runnable。
- Return type:
Runnable[Input, Output]
使用 MapRerankDocumentsChain 的示例