RunnableGenerator#

class langchain_core.runnables.base.RunnableGenerator(transform: Callable[[Iterator[Input]], Iterator[Output]] | Callable[[AsyncIterator[Input]], AsyncIterator[Output]], atransform: Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | None = None, *, name: str | None = None)[source]#

运行生成器函数的Runnable。

RunnableGenerators 可以直接实例化,也可以通过在一个序列中使用生成器来实例化。

RunnableGenerators 可用于实现自定义行为,例如自定义输出解析器,同时保留流式处理能力。给定一个具有签名 Iterator[A] -> Iterator[B] 的生成器函数,将其包装在 RunnableGenerator 中允许它在从上一个步骤流式传输时立即发出输出块。

请注意,如果一个生成器函数的签名为 A -> Iterator[B],这意味着它需要前一步的输入完成后才能发出块(例如,大多数LLMs需要整个提示可用才能开始生成),那么它可以被包装在 RunnableLambda 中。

以下是一个展示RunnableGenerator基本机制的示例:

from typing import Any, AsyncIterator, Iterator

from langchain_core.runnables import RunnableGenerator


def gen(input: Iterator[Any]) -> Iterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token


runnable = RunnableGenerator(gen)
runnable.invoke(None)  # "Have a nice day"
list(runnable.stream(None))  # ["Have", " a", " nice", " day"]
runnable.batch([None, None])  # ["Have a nice day", "Have a nice day"]


# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
    for token in ["Have", " a", " nice", " day"]:
        yield token

runnable = RunnableGenerator(agen)
await runnable.ainvoke(None)  # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]

RunnableGenerator 使得在流式处理环境中实现自定义行为变得容易。下面我们展示一个示例:

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser


model = ChatOpenAI()
chant_chain = (
    ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
    | model
    | StrOutputParser()
)

def character_generator(input: Iterator[str]) -> Iterator[str]:
    for token in input:
        if "," in token or "." in token:
            yield "👏" + token
        else:
            yield token


runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator
"".join(runnable.stream({"topic": "waste"})) # Reduce👏, Reuse👏, Recycle👏.

# Note that RunnableLambda can be used to delay streaming of one step in a
# sequence until the previous step is finished:
def reverse_generator(input: str) -> Iterator[str]:
    # Yield characters of input in reverse order.
    for character in input[::-1]:
        yield character

runnable = chant_chain | RunnableLambda(reverse_generator)
"".join(runnable.stream({"topic": "waste"}))  # ".elcycer ,esuer ,ecudeR"

初始化一个RunnableGenerator。

Parameters:
  • transform (Union[Callable[[Iterator[Input]], Iterator[Output]], Callable[[AsyncIterator[Input]], AsyncIterator[Output]]]) – 转换函数。

  • atransform (可选[Callable[[AsyncIterator[Input]], AsyncIterator[Output]]]) – 异步转换函数。默认为 None。

  • 名称 (字符串 | )

Raises:

TypeError – 如果转换不是生成器函数。

注意

RunnableGenerator 实现了标准的 Runnable Interface。🏃

Runnable Interface 接口在可运行对象上提供了额外的方法,例如 with_types, with_retry, assign, bind, get_graph, 以及更多。

属性

方法

__init__(*args, **kwargs)

初始化自身。

使用 RunnableGenerator 的示例