人工干预¶
本指南使用新的 interrupt 函数。
从 LangGraph 0.2.57 开始,推荐的设置断点方式是使用 interrupt 函数,因为它简化了**人工干预**模式。
如果您在寻找依赖于静态断点和 NodeInterrupt 异常的先前版本的概念指南,可以在 这里 找到。
人工干预(或称“在循环中”)工作流将人类输入整合到自动化过程中,在关键阶段允许决策、验证或修正。这在**基于 LLM 的应用**中尤其有用,因为基础模型可能会产生偶尔的不准确性。在合规、决策或内容生成等低误差容忍场景中,人类的参与通过允许审查、修正或覆盖模型输出来确保可靠性。
使用案例¶
基于 LLM 应用中的**人工干预**工作流的关键使用案例包括:
- 🛠️ 审查工具调用:人类可以在工具执行之前审查、编辑或批准 LLM 请求的工具调用。
- ✅ 验证 LLM 输出:人类可以审查、编辑或批准 LLM 生成的内容。
- 💡 提供上下文:使 LLM 能够明确请求人类输入以进行澄清或提供额外细节,或支持多轮对话。
interrupt¶
interrupt 函数 在 LangGraph 中通过在特定节点暂停图形、向人类展示信息以及使用他们的输入恢复图形,从而启用人工干预工作流。该函数对于批准、编辑或收集额外输入等任务非常有用。interrupt 函数 与 Command 对象结合使用,以人类提供的值恢复图形。
from langgraph.types import interrupt
def human_node(state: State):
value = interrupt(
# 任何可序列化为 JSON 的值,供人类查看。
# 例如,一个问题、一段文本或状态中的一组键
{
"text_to_revise": state["some_text"]
}
)
# 使用人类的输入更新状态或根据输入调整图形。
return {
"some_text": value
}
graph = graph_builder.compile(
checkpointer=checkpointer # `interrupt` 工作所需
)
# 运行图形直到遇到中断
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(some_input, config=thread_config)
# 用人类的输入恢复图形
graph.invoke(Command(resume=value_from_human), config=thread_config)
Warning
中断非常强大且易于使用。然而,尽管它们在开发者体验上可能类似于 Python 的 input() 函数,但重要的是要注意,它们不会自动从中断点恢复执行。相反,它们会重新运行使用中断的整个节点。 因此,中断通常最好放置在节点的开头或一个专用的节点中。请阅读 从中断恢复 部分以获取更多细节。
完整代码
这是如何在图形中使用 interrupt 的完整示例,如果您想看到代码在实际中的应用。
from typing import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
class State(TypedDict):
"""图形状态。"""
some_text: str
def human_node(state: State):
value = interrupt(
# 任何可序列化为 JSON 的值,供人类查看。
# 例如,一个问题、一段文本或状态中的一组键
{
"text_to_revise": state["some_text"]
}
)
return {
# 使用人类的输入更新状态
"some_text": value
}
# 构建图形
graph_builder = StateGraph(State)
# 将人类节点添加到图形中
graph_builder.add_node("human_node", human_node)
graph_builder.add_edge(START, "human_node")
# 使用中断功能需要一个检查点。
checkpointer = MemorySaver()
graph = graph_builder.compile(
checkpointer=checkpointer
)
# 使用 stream() 直接展示 `__interrupt__` 信息。
for chunk in graph.stream({"some_text": "原始文本"}, config=thread_config):
print(chunk)
# 使用 Command 恢复
for chunk in graph.stream(Command(resume="编辑后的文本"), config=thread_config):
print(chunk)
要求¶
要在图形中使用 interrupt,您需要:
- 指定一个检查点,以在每一步后保存图形状态。
- 在适当位置调用
interrupt()。有关示例,请参见 设计模式 部分。 - 使用 线程 ID 运行图形,直到触发
interrupt。 - 使用
invoke/ainvoke/stream/astream恢复执行(见Command原语)。
设计模式¶
您通常可以采取三种不同的**行动**来实现人工干预工作流:
- 批准或拒绝:在关键步骤之前暂停图形,例如 API 调用,以审查和批准该操作。如果拒绝该操作,您可以阻止图形执行该步骤,并可能采取替代行动。此模式通常涉及根据人类的输入**路由**图形。
- 编辑图形状态:暂停图形以审查和编辑图形状态。这对于纠正错误或使用额外信息更新状态非常有用。此模式通常涉及使用人类的输入**更新**状态。
- 获取输入:在图形的特定步骤明确请求人类输入。这对于收集额外信息或上下文以帮助代理的决策过程或支持**多轮对话**非常有用。
下面我们展示可以使用这些**行动**实现的不同设计模式。
批准或拒绝¶

在关键步骤之前暂停图形,例如 API 调用,以审查和批准该操作。如果拒绝该操作,您可以阻止图形执行该步骤,并可能采取替代行动。
from typing import Literal
from langgraph.types import interrupt, Command
def human_approval(state: State) -> Command[Literal["some_node", "another_node"]]:
is_approved = interrupt(
{
"question": "这是正确的吗?",
# 展示应由人类审查和批准的输出。
"llm_output": state["llm_output"]
}
)
if is_approved:
return Command(goto="some_node")
else:
return Command(goto="another_node")
# 将节点添加到图形中的适当位置并连接到相关节点。
graph_builder.add_node("human_approval", human_approval)
graph = graph_builder.compile(checkpointer=checkpointer)
# 在运行图形并触发中断后,图形将暂停。
# 用批准或拒绝恢复。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(Command(resume=True), config=thread_config)
请参见 如何审查工具调用 获取更详细的示例。
审查与编辑状态¶

from langgraph.types import interrupt
def human_editing(state: State):
...
result = interrupt(
# 中断信息以展示给客户端。
# 可以是任何可序列化为 JSON 的值。
{
"task": "审查 LLM 的输出并进行必要的编辑。",
"llm_generated_summary": state["llm_generated_summary"]
}
)
# 使用编辑后的文本更新状态
return {
"llm_generated_summary": result["edited_text"]
}
# 将节点添加到图形中的适当位置并连接到相关节点。
graph_builder.add_node("human_editing", human_editing)
graph = graph_builder.compile(checkpointer=checkpointer)
...
# 在运行图形并触发中断后,图形将暂停。
# 用编辑后的文本恢复。
thread_config = {"configurable": {"thread_id": "some_id"}}
graph.invoke(
Command(resume={"edited_text": "编辑后的文本"}),
config=thread_config
)
请参见 如何使用中断等待用户输入 获取更详细的示例。
审查工具调用¶

def human_review_node(state) -> Command[Literal["call_llm", "run_tool"]]:
# 这是我们将通过 Command(resume=<human_review>) 提供的值
human_review = interrupt(
{
"question": "这个正确吗?",
# 提供工具调用以供审查
"tool_call": tool_call
}
)
review_action, review_data = human_review
# 批准工具调用并继续
if review_action == "continue":
return Command(goto="run_tool")
# 手动修改工具调用,然后继续
elif review_action == "update":
...
updated_msg = get_updated_msg(review_data)
# 请记住,要修改现有消息,您需要
# 传递具有匹配ID的消息。
return Command(goto="run_tool", update={"messages": [updated_message]})
# 提供自然语言反馈,然后将其传回代理
elif review_action == "feedback":
...
feedback_msg = get_feedback_msg(review_data)
return Command(goto="call_llm", update={"messages": [feedback_msg]})
请参阅 如何审查工具调用 获取更详细的示例。
多轮对话¶

**多轮对话**涉及代理与人类之间的多次来回互动,这可以使代理以对话的方式从人类那里获取更多信息。
这种设计模式在由多个代理组成的LLM应用中是有用的。一个或多个代理可能需要与人类进行多轮对话,其中人类在对话的不同阶段提供输入或反馈。为了简单起见,下面的代理实现被示为单个节点,但实际上它可能是由多个节点组成的更大图形的一部分,并包括一个条件边。
在此模式中,每个代理都有自己的用于收集用户输入的人类节点。可以通过为人类节点命名上独特的名称(例如,“代理1的人类”,“代理2的人类”)或使用子图,其中子图包含一个人类节点和一个代理节点来实现。
from langgraph.types import interrupt
def human_input(state: State):
human_message = interrupt("human_input")
return {
"messages": [
{
"role": "human",
"content": human_message
}
]
}
def agent(state: State):
# 代理逻辑
...
graph_builder.add_node("human_input", human_input)
graph_builder.add_edge("human_input", "agent")
graph = graph_builder.compile(checkpointer=checkpointer)
# 运行图形并触发中断后,图形将暂停。
# 使用人类的输入恢复它。
graph.invoke(
Command(resume="你好!"),
config=thread_config
)
在此模式中,单个人类节点用于为多个代理收集用户输入。根据状态确定活动代理,因此在收集用户输入后,图可以路由到正确的代理。
from langgraph.types import interrupt
def human_node(state: MessagesState) -> Command[Literal["agent_1", "agent_2", ...]]:
"""一个用于收集用户输入的节点。"""
user_input = interrupt(value="准备好进行用户输入。")
# 从状态中确定**活动代理**,因此
# 收集输入后可以路由到正确的代理。
# 例如,可以在状态中添加一个字段,或使用最后一个活动代理。
# 或填写代理生成的AI消息的`name`属性。
active_agent = ...
return Command(
update={
"messages": [{
"role": "human",
"content": user_input,
}]
},
goto=active_agent,
)
请参阅 如何实现多轮对话 获取更详细的示例。
验证人类输入¶
如果您需要在图中验证人类提供的输入(而不是在客户端进行验证),可以通过在单个节点内使用多个中断调用来实现。
from langgraph.types import interrupt
def human_node(state: State):
"""带验证的人类节点。"""
question = "你多大年龄?"
while True:
answer = interrupt(question)
# 验证答案,如果答案无效,则重新请求输入。
if not isinstance(answer, int) or answer < 0:
question = f"'{answer} 不是一个有效的年龄。你多大年龄?"
answer = None
continue
else:
# 如果答案有效,我们可以继续。
break
print(f"回路中的人类年龄为 {answer} 岁。")
return {
"age": answer
}
Command 原语¶
当使用 interrupt 函数时,图将在中断位置暂停并等待用户输入。
可以使用 Command 原语恢复图的执行,Command 可通过 invoke、ainvoke、stream 或 astream 方法传递。
Command 原语提供多种选项,以控制和修改图在恢复期间的状态:
-
传递值给
interrupt:使用Command(resume=value)将数据(如用户的响应)提供给图。执行从使用interrupt的节点的开头开始,但是这一次interrupt(...)调用将返回传递到Command(resume=value)中的值,而不是暂停图。 -
更新图的状态:使用
Command(update=update)修改图的状态。请注意,恢复从使用interrupt的节点的开头开始。执行从使用interrupt的节点的开头恢复,但状态已更新。
通过利用 Command,您可以恢复图形执行、处理用户输入并动态调整图的状态。
与 invoke 和 ainvoke 一起使用¶
当您使用 stream 或 astream 运行图形时,您将收到一个 Interrupt 事件,提示您中断已被触发。
invoke 和 ainvoke 不返回中断信息。要访问此信息,您必须使用 get_state 方法获取在调用 invoke 或 ainvoke 后图的状态。
# 运行图形直到中断
result = graph.invoke(inputs, thread_config)
# 获取图的状态以获取中断信息。
state = graph.get_state(thread_config)
# 打印状态值
print(state.values)
# 打印待处理任务
print(state.tasks)
# 使用用户的输入恢复图。
graph.invoke(Command(resume={"age": "25"}), thread_config)
{'foo': 'bar'} # 状态值
(
PregelTask(
id='5d8ffc92-8011-0c9b-8b59-9d3545b7e553',
name='node_foo',
path=('__pregel_pull', 'node_foo'),
error=None,
interrupts=(Interrupt(value='value_in_interrupt', resumable=True, ns=['node_foo:5d8ffc92-8011-0c9b-8b59-9d3545b7e553'], when='during'),), state=None,
result=None
),
) # 待处理任务. interrupts
中断后的恢复是如何工作的?¶
Warning
从interrupt恢复与Python的 input() 函数是 不同的,后者的执行从调用 input() 函数的确切位置恢复。
使用 interrupt 的一个关键方面是理解恢复的工作原理。当您在 interrupt 后恢复执行时,图的执行从上一个触发interrupt的**图节点**的**开头**开始。
**所有**从节点的开头到 interrupt 的代码将被重新执行。
counter = 0
def node(state: State):
# 从节点开始到中断的所有代码将在图恢复时重新执行。
global counter
counter += 1
print(f"> 进入节点: {counter} 次")
# 暂停图并等待用户输入。
answer = interrupt()
print("计数器的值是:", counter)
...
在**恢复**图形时,计数器将第二次递增,输出如下:
常见误区¶
副作用¶
将具有副作用的代码(如API调用)放在 interrupt 之后,以避免重复,因为这些代码将在每次恢复节点时重新触发。
当节点从 interrupt 中恢复时,此代码将再次执行 API 调用。
如果 API 调用不是幂等的或者代价高昂,这可能会导致问题。
作为函数调用的子图¶
当以函数 调用子图 时,**父图**将从调用子图(以及触发 interrupt 的地方)的**节点**开始恢复执行。类似地,**子图**将从调用 interrupt() 函数的**节点**开始恢复执行。
例如,
def node_in_parent_graph(state: State):
some_code() # <-- 这将在子图恢复时再次执行。
# 以函数方式调用子图。
# 子图包含一个 `interrupt` 调用。
subgraph_result = subgraph.invoke(some_input)
...
示例:父图和子图执行流
假设我们有一个包含 3 个节点的父图:
父图:node_1 → node_2 (子图调用)→ node_3
而子图有 3 个节点,其中第二个节点包含一个 interrupt:
子图:sub_node_1 → sub_node_2 (interrupt) → sub_node_3
在恢复图时,执行流程如下:
- 跳过父图中的
node_1(已执行,图状态在快照中保存)。 - 从头重新执行父图中的
node_2。 - 跳过子图中的
sub_node_1(已执行,图状态在快照中保存)。 - 从头重新执行子图中的
sub_node_2。 - 继续执行
sub_node_3和后续节点。
这里有一个简化的示例代码,您可以用它来理解子图如何与中断一起工作。 它计算每个节点被进入的次数并打印计数。
import uuid
from typing import TypedDict
from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
"""图状态。"""
state_counter: int
counter_node_in_subgraph = 0
def node_in_subgraph(state: State):
"""子图中的一个节点。"""
global counter_node_in_subgraph
counter_node_in_subgraph += 1 # 这段代码将 **不会** 再次运行!
print(f"进入 `node_in_subgraph` 总共 {counter_node_in_subgraph} 次")
counter_human_node = 0
def human_node(state: State):
global counter_human_node
counter_human_node += 1 # 这段代码将再次运行!
print(f"在子图中进入 human_node 总共 {counter_human_node} 次")
answer = interrupt("你叫什么名字?")
print(f"得到了答案:{answer}")
checkpointer = MemorySaver()
subgraph_builder = StateGraph(State)
subgraph_builder.add_node("some_node", node_in_subgraph)
subgraph_builder.add_node("human_node", human_node)
subgraph_builder.add_edge(START, "some_node")
subgraph_builder.add_edge("some_node", "human_node")
subgraph = subgraph_builder.compile(checkpointer=checkpointer)
counter_parent_node = 0
def parent_node(state: State):
"""这个父节点将调用子图。"""
global counter_parent_node
counter_parent_node += 1 # 这段代码将在恢复时再次运行!
print(f"进入 `parent_node` 总共 {counter_parent_node} 次")
# 请注意,我们故意在图状态中增加状态计数器
# 以演示子图更新相同键不会与父图冲突(直至
subgraph_state = subgraph.invoke(state)
return subgraph_state
builder = StateGraph(State)
builder.add_node("parent_node", parent_node)
builder.add_edge(START, "parent_node")
# 必须启用检查点器才能使中断功能正常工作!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"state_counter": 1}, config):
print(chunk)
print('--- 恢复 ---')
for chunk in graph.stream(Command(resume="35"), config):
print(chunk)
这将打印出
--- 第一次调用 ---
在父节点中: {'foo': 'bar'}
进入 `parent_node` 总共 1 次
进入 `node_in_subgraph` 总共 1 次
在子图中进入 human_node 总共 1 次
{'__interrupt__': (Interrupt(value='你叫什么名字?', resumable=True, ns=['parent_node:0b23d72f-aaba-0329-1a59-ca4f3c8bad3b', 'human_node:25df717c-cb80-57b0-7410-44e20aac8f3c'], when='during'),)}
--- 恢复 ---
在父节点中: {'foo': 'bar'}
进入 `parent_node` 总共 2 次
在子图中进入 human_node 总共 2 次
得到了答案:35
{'parent_node': None}
使用多个中断¶
在 单个 节点中使用多个中断可以帮助处理如 验证人类输入 等模式。然而,如果处理不当,在同一节点中使用多个中断可能会导致意外行为。
当一个节点包含多个中断调用时,LangGraph 会保持一个特定于执行该节点的任务的恢复值列表。每当执行恢复时,它会从节点的开头开始。对于遇到的每个中断,LangGraph 会检查任务的恢复列表中是否存在匹配的值。匹配是 严格基于索引的,因此节点中中断调用的顺序是关键。
为避免问题,请避免在执行之间动态更改节点的结构。这包括添加、删除或重新排序中断调用,因为此类更改可能导致索引不匹配。这些问题通常源于非常规模式,例如通过 Command(resume=..., update=SOME_STATE_MUTATION) 修改状态或依赖全局变量动态修改节点的结构。
不正确代码的示例
import uuid
from typing import TypedDict, Optional
from langgraph.graph import StateGraph
from langgraph.constants import START
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
class State(TypedDict):
"""图状态。"""
age: Optional[str]
name: Optional[str]
def human_node(state: State):
if not state.get('name'):
name = interrupt("你叫什么名字?")
else:
name = "N/A"
if not state.get('age'):
age = interrupt("你多大了?")
else:
age = "N/A"
print(f"名字: {name}. 年龄: {age}")
return {
"age": age,
"name": name,
}
builder = StateGraph(State)
builder.add_node("human_node", human_node)
builder.add_edge(START, "human_node")
# 必须启用检查点器才能使中断功能正常工作!
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"age": None, "name": None}, config):
print(chunk)
for chunk in graph.stream(Command(resume="John", update={"name": "foo"}), config):
print(chunk)