LangChain 表达式语言速查表
这是所有最重要的LCEL原语的快速参考。有关更高级的用法,请参阅LCEL操作指南和完整的API参考。
调用一个可运行对象
Runnable.invoke() / Runnable.ainvoke()
from langchain_core.runnables import RunnableLambda
runnable = RunnableLambda(lambda x: str(x))
runnable.invoke(5)
# Async variant:
# await runnable.ainvoke(5)
API Reference:RunnableLambda
'5'
批量处理可运行对象
Runnable.batch() / Runnable.abatch()
from langchain_core.runnables import RunnableLambda
runnable = RunnableLambda(lambda x: str(x))
runnable.batch([7, 8, 9])
# Async variant:
# await runnable.abatch([7, 8, 9])
API Reference:RunnableLambda
['7', '8', '9']
流式运行一个可执行程序
Runnable.stream() / Runnable.astream()
from langchain_core.runnables import RunnableLambda
def func(x):
for y in x:
yield str(y)
runnable = RunnableLambda(func)
for chunk in runnable.stream(range(5)):
print(chunk)
# Async variant:
# async for chunk in await runnable.astream(range(5)):
# print(chunk)
API Reference:RunnableLambda
0
1
2
3
4
组合可运行对象
管道操作符 |
from langchain_core.runnables import RunnableLambda
runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
chain = runnable1 | runnable2
chain.invoke(2)
API Reference:RunnableLambda
[{'foo': 2}, {'foo': 2}]
并行调用可运行对象
RunnableParallel
from langchain_core.runnables import RunnableLambda, RunnableParallel
runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
chain = RunnableParallel(first=runnable1, second=runnable2)
chain.invoke(2)
API Reference:RunnableLambda | RunnableParallel
{'first': {'foo': 2}, 'second': [2, 2]}
将任何函数转换为可运行的
RunnableLambda
from langchain_core.runnables import RunnableLambda
def func(x):
return x + 5
runnable = RunnableLambda(func)
runnable.invoke(2)
API Reference:RunnableLambda
7
合并输入和输出字典
RunnablePassthrough.assign
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
runnable1 = RunnableLambda(lambda x: x["foo"] + 7)
chain = RunnablePassthrough.assign(bar=runnable1)
chain.invoke({"foo": 10})
API Reference:RunnableLambda | RunnablePassthrough
{'foo': 10, 'bar': 17}
在输出字典中包含输入字典
RunnablePassthrough
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnablePassthrough,
)
runnable1 = RunnableLambda(lambda x: x["foo"] + 7)
chain = RunnableParallel(bar=runnable1, baz=RunnablePassthrough())
chain.invoke({"foo": 10})
{'bar': 17, 'baz': {'foo': 10}}
添加默认调用参数
Runnable.bind
from typing import Optional
from langchain_core.runnables import RunnableLambda
def func(main_arg: dict, other_arg: Optional[str] = None) -> dict:
if other_arg:
return {**main_arg, **{"foo": other_arg}}
return main_arg
runnable1 = RunnableLambda(func)
bound_runnable1 = runnable1.bind(other_arg="bye")
bound_runnable1.invoke({"bar": "hello"})
API Reference:RunnableLambda
{'bar': 'hello', 'foo': 'bye'}
添加回退
Runnable.with_fallbacks
from langchain_core.runnables import RunnableLambda
runnable1 = RunnableLambda(lambda x: x + "foo")
runnable2 = RunnableLambda(lambda x: str(x) + "foo")
chain = runnable1.with_fallbacks([runnable2])
chain.invoke(5)
API Reference:RunnableLambda
'5foo'
添加重试
Runnable.with_retry
from langchain_core.runnables import RunnableLambda
counter = -1
def func(x):
global counter
counter += 1
print(f"attempt with {counter=}")
return x / counter
chain = RunnableLambda(func).with_retry(stop_after_attempt=2)
chain.invoke(2)
API Reference:RunnableLambda
attempt with counter=0
attempt with counter=1
2.0
配置可运行执行
RunnableConfig
from langchain_core.runnables import RunnableLambda, RunnableParallel
runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
runnable3 = RunnableLambda(lambda x: str(x))
chain = RunnableParallel(first=runnable1, second=runnable2, third=runnable3)
chain.invoke(7, config={"max_concurrency": 2})
API Reference:RunnableLambda | RunnableParallel
{'first': {'foo': 7}, 'second': [7, 7], 'third': '7'}
添加默认配置到可运行对象
Runnable.with_config
from langchain_core.runnables import RunnableLambda, RunnableParallel
runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
runnable3 = RunnableLambda(lambda x: str(x))
chain = RunnableParallel(first=runnable1, second=runnable2, third=runnable3)
configured_chain = chain.with_config(max_concurrency=2)
chain.invoke(7)
API Reference:RunnableLambda | RunnableParallel
{'first': {'foo': 7}, 'second': [7, 7], 'third': '7'}
使可运行属性可配置
Runnable.with_configurable_fields
from typing import Any, Optional
from langchain_core.runnables import (
ConfigurableField,
RunnableConfig,
RunnableSerializable,
)
class FooRunnable(RunnableSerializable[dict, dict]):
output_key: str
def invoke(
self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> list:
return self._call_with_config(self.subtract_seven, input, config, **kwargs)
def subtract_seven(self, input: dict) -> dict:
return {self.output_key: input["foo"] - 7}
runnable1 = FooRunnable(output_key="bar")
configurable_runnable1 = runnable1.configurable_fields(
output_key=ConfigurableField(id="output_key")
)
configurable_runnable1.invoke(
{"foo": 10}, config={"configurable": {"output_key": "not bar"}}
)
{'not bar': 3}
configurable_runnable1.invoke({"foo": 10})
{'bar': 3}
使链组件可配置
Runnable.with_configurable_alternatives
from typing import Any, Optional
from langchain_core.runnables import RunnableConfig, RunnableLambda, RunnableParallel
class ListRunnable(RunnableSerializable[Any, list]):
def invoke(
self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> list:
return self._call_with_config(self.listify, input, config, **kwargs)
def listify(self, input: Any) -> list:
return [input]
class StrRunnable(RunnableSerializable[Any, str]):
def invoke(
self, input: Any, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> list:
return self._call_with_config(self.strify, input, config, **kwargs)
def strify(self, input: Any) -> str:
return str(input)
runnable1 = RunnableLambda(lambda x: {"foo": x})
configurable_runnable = ListRunnable().configurable_alternatives(
ConfigurableField(id="second_step"), default_key="list", string=StrRunnable()
)
chain = runnable1 | configurable_runnable
chain.invoke(7, config={"configurable": {"second_step": "string"}})
"{'foo': 7}"
chain.invoke(7)
[{'foo': 7}]
根据输入动态构建链
from langchain_core.runnables import RunnableLambda, RunnableParallel
runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
chain = RunnableLambda(lambda x: runnable1 if x > 6 else runnable2)
chain.invoke(7)
API Reference:RunnableLambda | RunnableParallel
{'foo': 7}
chain.invoke(5)
[5, 5]
生成事件流
Runnable.astream_events
# | echo: false
import nest_asyncio
nest_asyncio.apply()
from langchain_core.runnables import RunnableLambda, RunnableParallel
runnable1 = RunnableLambda(lambda x: {"foo": x}, name="first")
async def func(x):
for _ in range(5):
yield x
runnable2 = RunnableLambda(func, name="second")
chain = runnable1 | runnable2
async for event in chain.astream_events("bar", version="v2"):
print(f"event={event['event']} | name={event['name']} | data={event['data']}")
API Reference:RunnableLambda | RunnableParallel
event=on_chain_start | name=RunnableSequence | data={'input': 'bar'}
event=on_chain_start | name=first | data={}
event=on_chain_stream | name=first | data={'chunk': {'foo': 'bar'}}
event=on_chain_start | name=second | data={}
event=on_chain_end | name=first | data={'output': {'foo': 'bar'}, 'input': 'bar'}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=second | data={'chunk': {'foo': 'bar'}}
event=on_chain_stream | name=RunnableSequence | data={'chunk': {'foo': 'bar'}}
event=on_chain_end | name=second | data={'output': {'foo': 'bar'}, 'input': {'foo': 'bar'}}
event=on_chain_end | name=RunnableSequence | data={'output': {'foo': 'bar'}}
在完成时生成批量输出
Runnable.batch_as_completed / Runnable.abatch_as_completed
import time
from langchain_core.runnables import RunnableLambda, RunnableParallel
runnable1 = RunnableLambda(lambda x: time.sleep(x) or print(f"slept {x}"))
for idx, result in runnable1.batch_as_completed([5, 1]):
print(idx, result)
# Async variant:
# async for idx, result in runnable1.abatch_as_completed([5, 1]):
# print(idx, result)
API Reference:RunnableLambda | RunnableParallel
slept 1
1 None
slept 5
0 None
返回输出字典的子集
Runnable.pick
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
runnable1 = RunnableLambda(lambda x: x["baz"] + 5)
chain = RunnablePassthrough.assign(foo=runnable1).pick(["foo", "bar"])
chain.invoke({"bar": "hi", "baz": 2})
API Reference:RunnableLambda | RunnablePassthrough
{'foo': 7, 'bar': 'hi'}
声明式地创建一个可运行对象的批处理版本
Runnable.map
from langchain_core.runnables import RunnableLambda
runnable1 = RunnableLambda(lambda x: list(range(x)))
runnable2 = RunnableLambda(lambda x: x + 5)
chain = runnable1 | runnable2.map()
chain.invoke(3)
API Reference:RunnableLambda
[5, 6, 7]
获取可运行对象的图形表示
Runnable.get_graph
from langchain_core.runnables import RunnableLambda, RunnableParallel
runnable1 = RunnableLambda(lambda x: {"foo": x})
runnable2 = RunnableLambda(lambda x: [x] * 2)
runnable3 = RunnableLambda(lambda x: str(x))
chain = runnable1 | RunnableParallel(second=runnable2, third=runnable3)
chain.get_graph().print_ascii()
API Reference:RunnableLambda | RunnableParallel
+-------------+
| LambdaInput |
+-------------+
*
*
*
+------------------------------+
| Lambda(lambda x: {'foo': x}) |
+------------------------------+
*
*
*
+-----------------------------+
| Parallel<second,third>Input |
+-----------------------------+
**** ***
**** ****
** **
+---------------------------+ +--------------------------+
| Lambda(lambda x: [x] * 2) | | Lambda(lambda x: str(x)) |
+---------------------------+ +--------------------------+
**** ***
**** ****
** **
+------------------------------+
| Parallel<second,third>Output |
+------------------------------+
获取链中的所有提示
Runnable.get_prompts
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
prompt1 = ChatPromptTemplate.from_messages(
[("system", "good ai"), ("human", "{input}")]
)
prompt2 = ChatPromptTemplate.from_messages(
[
("system", "really good ai"),
("human", "{input}"),
("ai", "{ai_output}"),
("human", "{input2}"),
]
)
fake_llm = RunnableLambda(lambda prompt: "i am good ai")
chain = prompt1.assign(ai_output=fake_llm) | prompt2 | fake_llm
for i, prompt in enumerate(chain.get_prompts()):
print(f"**prompt {i=}**\n")
print(prompt.pretty_repr())
print("\n" * 3)
API Reference:ChatPromptTemplate | RunnableLambda
**prompt i=0**
================================ System Message ================================
good ai
================================ Human Message =================================
{input}
**prompt i=1**
================================ System Message ================================
really good ai
================================ Human Message =================================
{input}
================================== AI Message ==================================
{ai_output}
================================ Human Message =================================
{input2}
添加生命周期监听器
Runnable.with_listeners
import time
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
def on_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def on_end(run_obj: Run):
print("end_time:", run_obj.end_time)
runnable1 = RunnableLambda(lambda x: time.sleep(x))
chain = runnable1.with_listeners(on_start=on_start, on_end=on_end)
chain.invoke(2)
API Reference:RunnableLambda | Run
start_time: 2024-05-17 23:04:00.951065+00:00
end_time: 2024-05-17 23:04:02.958765+00:00