ChatSambaNovaCloud#

class langchain_community.chat_models.sambanova.ChatSambaNovaCloud[source]#

基础类: BaseChatModel

SambaNova云聊天模型。

Setup:

要使用,您应该设置以下环境变量: SAMBANOVA_URL 设置为您的SambaNova云URL。 SAMBANOVA_API_KEY 设置为您的SambaNova云API密钥。 http://cloud.sambanova.ai/ 示例: .. code-block:: python

ChatSambaNovaCloud(

sambanova_url = SambaNova 云终端 URL, sambanova_api_key = 设置您的 SambaNova 云 API 密钥, model = 模型名称, max_tokens = 生成的最大令牌数, temperature = 模型温度, top_p = 模型顶部 p, top_k = 模型顶部 k, stream_options = 包含使用情况以获取生成指标

)

Key init args — completion params:
model: str

使用的模型名称,例如,Meta-Llama-3-70B-Instruct。

streaming: bool

在使用非流式方法时是否使用流式处理程序

max_tokens: int

生成的最大令牌数

temperature: float

模型温度

top_p: float

模型顶部 p

top_k: int

模型前k个

stream_options: dict

流选项,包括用于获取生成指标的用法

Key init args — client params:
sambanova_url: str

SambaNova 云网址

sambanova_api_key: str

SambaNova云API密钥

Instantiate:
from langchain_community.chat_models import ChatSambaNovaCloud

chat = ChatSambaNovaCloud(
    sambanova_url = SambaNova cloud endpoint URL,
    sambanova_api_key = set with your SambaNova cloud API key,
    model = model name,
    max_tokens = max number of tokens to generate,
    temperature = model temperature,
    top_p = model top p,
    top_k = model top k,
    stream_options = include usage to get generation metrics
)
Invoke:
messages = [
    SystemMessage(content="your are an AI assistant."),
    HumanMessage(content="tell me a joke."),
]
response = chat.invoke(messages)
Stream:
for chunk in chat.stream(messages):
    print(chunk.content, end="", flush=True)
Async:
response = chat.ainvoke(messages)
await response
Tool calling:
from pydantic import BaseModel, Field

class GetWeather(BaseModel):
    '''Get the current weather in a given location'''

    location: str = Field(
        ...,
        description="The city and state, e.g. Los Angeles, CA"
    )

llm_with_tools = llm.bind_tools([GetWeather, GetPopulation])
ai_msg = llm_with_tools.invoke("Should I bring my umbrella today in LA?")
ai_msg.tool_calls
[
    {
        'name': 'GetWeather',
        'args': {'location': 'Los Angeles, CA'},
        'id': 'call_adf61180ea2b4d228a'
    }
]
Structured output:
from typing import Optional

from pydantic import BaseModel, Field

class Joke(BaseModel):
    '''Joke to tell user.'''

    setup: str = Field(description="The setup of the joke")
    punchline: str = Field(description="The punchline to the joke")

structured_model = llm.with_structured_output(Joke)
structured_model.invoke("Tell me a joke about cats")
Joke(setup="Why did the cat join a band?",
punchline="Because it wanted to be the purr-cussionist!")

更多信息请参见 ChatSambanovaCloud.with_structured_output()

Token usage:
response = chat.invoke(messages)
print(response.response_metadata["usage"]["prompt_tokens"]
print(response.response_metadata["usage"]["total_tokens"]
Response metadata
response = chat.invoke(messages)
print(response.response_metadata)

初始化和验证环境变量

注意

ChatSambaNovaCloud 实现了标准的 Runnable Interface。🏃

Runnable Interface 接口在可运行对象上提供了额外的方法,例如 with_types, with_retry, assign, bind, get_graph, 等等。

param additional_headers: Dict[str, Any] = {}#

请求中发送的附加头信息

param cache: BaseCache | bool | None = None#

是否缓存响应。

  • 如果为真,将使用全局缓存。

  • 如果为false,将不使用缓存

  • 如果为None,将使用全局缓存(如果已设置),否则不使用缓存。

  • 如果是 BaseCache 的实例,将使用提供的缓存。

目前不支持对模型的流式方法进行缓存。

param callback_manager: BaseCallbackManager | None = None#

自版本0.1.7起已弃用:请改用callbacks()。它将在pydantic==1.0中移除。

回调管理器以添加到运行跟踪中。

param callbacks: Callbacks = None#

添加到运行跟踪的回调。

param custom_get_token_ids: Callable[[str], list[int]] | None = None#

用于计数标记的可选编码器。

param disable_streaming: bool | Literal['tool_calling'] = False#

是否禁用此模型的流式传输。

如果流式传输被绕过,那么 stream()/astream() 将依赖于 invoke()/ainvoke()

  • 如果为True,将始终绕过流式传输情况。

  • 如果是“tool_calling”,只有在使用tools关键字参数调用模型时,才会绕过流式处理的情况。

  • 如果为 False(默认值),将始终使用流式情况(如果可用)。

param max_tokens: int = 1024#

生成的最大令牌数

param metadata: dict[str, Any] | None = None#

要添加到运行跟踪的元数据。

param model: str = 'Meta-Llama-3.1-8B-Instruct'#

模型的名称

param rate_limiter: BaseRateLimiter | None = None#

一个可选的速率限制器,用于限制请求的数量。

param sambanova_api_key: SecretStr = SecretStr('')#

SambaNova云API密钥

param sambanova_url: str = ''#

SambaNova 云网址

param stream_options: Dict[str, Any] = {'include_usage': True}#

流选项,包括用于获取生成指标的用法

param streaming: bool = False#

在使用非流式方法时是否使用流式处理程序

param tags: list[str] | None = None#

要添加到运行跟踪的标签。

param temperature: float = 0.7#

模型温度

param top_k: int | None = None#

模型前k个

param top_p: float | None = None#

模型顶部 p

param verbose: bool [Optional]#

是否打印出响应文本。

class Config[source]#

基础类:object

populate_by_name = True#
__call__(messages: list[BaseMessage], stop: list[str] | None = None, callbacks: list[BaseCallbackHandler] | BaseCallbackManager | None = None, **kwargs: Any) BaseMessage#

自版本0.1.7起已弃用:请改用invoke()。在langchain-core==1.0之前不会移除。

Parameters:
Return type:

BaseMessage

async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output]#

默认实现使用asyncio.gather并行运行ainvoke。

batch的默认实现对于IO绑定的runnables效果很好。

如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。

Parameters:
  • inputs (list[Input]) – Runnable 的输入列表。

  • config (RunnableConfig | list[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。默认为None。

  • return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。

  • kwargs (Any | None) – 传递给Runnable的额外关键字参数。

Returns:

Runnable 的输出列表。

Return type:

列表[输出]

async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) AsyncIterator[tuple[int, Output | Exception]]#

在输入列表上并行运行ainvoke,在它们完成时产生结果。

Parameters:
  • inputs (Sequence[Input]) – Runnable 的输入列表。

  • config (RunnableConfig | Sequence[RunnableConfig] | None) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的'tags'、'metadata',用于控制并行工作量的'max_concurrency',以及其他键。有关更多详细信息,请参阅RunnableConfig。默认为None。默认为None。

  • return_exceptions (bool) – 是否返回异常而不是抛出它们。默认为 False。

  • kwargs (Any | None) – 传递给Runnable的额外关键字参数。

Yields:

输入索引和Runnable输出的元组。

Return type:

AsyncIterator[元组[int, Output | 异常]]

async ainvoke(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) BaseMessage#

ainvoke的默认实现,从线程调用invoke。

默认实现允许使用异步代码,即使Runnable没有实现本地的异步版本的invoke。

如果子类可以异步运行,则应重写此方法。

Parameters:
  • 输入 (LanguageModelInput)

  • config (可选[RunnableConfig])

  • stop (可选[列表[字符串]])

  • kwargs (Any)

Return type:

BaseMessage

async astream(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) AsyncIterator[BaseMessageChunk]#

astream的默认实现,调用ainvoke。 如果子类支持流式输出,则应重写此方法。

Parameters:
  • input (LanguageModelInput) – Runnable 的输入。

  • config (可选[RunnableConfig]) – 用于Runnable的配置。默认为None。

  • kwargs (Any) – 传递给Runnable的额外关键字参数。

  • stop (可选[列表[字符串]])

Yields:

Runnable 的输出。

Return type:

异步迭代器[BaseMessageChunk]

async astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2'], include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) AsyncIterator[StandardStreamEvent | CustomStreamEvent]#

生成事件流。

用于创建一个迭代器,遍历提供实时信息的StreamEvents,包括来自中间结果的StreamEvents。

StreamEvent 是一个具有以下模式的字典:

  • event: str - 事件名称的格式为

    格式: on_[runnable_type]_(start|stream|end).

  • name: str - 生成事件的 Runnable 的名称。

  • run_id: str - 与给定执行相关联的随机生成的ID

    发出事件的Runnable。 作为父Runnable执行的一部分被调用的子Runnable会被分配其自己唯一的ID。

  • parent_ids: List[str] - 生成事件的父可运行对象的ID。

    根可运行对象将有一个空列表。 父ID的顺序是从根到直接父对象。 仅适用于API的v2版本。API的v1版本将返回一个空列表。

  • tags: Optional[List[str]] - 生成事件的Runnable的标签

    事件。

  • metadata: Optional[Dict[str, Any]] - Runnable的元数据

    生成事件的元数据。

  • data: Dict[str, Any]

下表展示了一些可能由不同链发出的事件。为了简洁起见,表中省略了元数据字段。链定义已包含在表后。

注意 此参考表适用于V2版本的架构。

事件

名称

输入

输出

on_chat_model_start

[模型名称]

{“messages”: [[SystemMessage, HumanMessage]]}

on_chat_model_stream

[model name]

AIMessageChunk(content=”hello”)

on_chat_model_end

[model name]

{“messages”: [[SystemMessage, HumanMessage]]}

AIMessageChunk(content=”hello world”)

on_llm_start

[model name]

{‘input’: ‘hello’}

on_llm_stream

[模型名称]

‘Hello’

on_llm_end

[model name]

‘你好,人类!’

链上开始

格式化文档

on_chain_stream

format_docs

“你好世界!,再见世界!”

on_chain_end

format_docs

[Document(…)]

“你好世界!,再见世界!”

on_tool_start

some_tool

{“x”: 1, “y”: “2”}

on_tool_end

some_tool

{“x”: 1, “y”: “2”}

on_retriever_start

[retriever name]

{“query”: “hello”}

on_retriever_end

[retriever name]

{“query”: “hello”}

[Document(…), ..]

on_prompt_start

[template_name]

{“question”: “hello”}

on_prompt_end

[template_name]

{“question”: “hello”}

ChatPromptValue(messages: [SystemMessage, …])

除了标准事件外,用户还可以派发自定义事件(见下面的示例)。

自定义事件将仅在API的v2版本中显示!

自定义事件具有以下格式:

属性

类型

描述

name

str

用户定义的事件名称。

data

Any

与事件相关的数据。这可以是任何内容,但我们建议使其可JSON序列化。

以下是上述标准事件相关的声明:

format_docs:

def format_docs(docs: List[Document]) -> str:
    '''Format the docs.'''
    return ", ".join([doc.page_content for doc in docs])

format_docs = RunnableLambda(format_docs)

some_tool:

@tool
def some_tool(x: int, y: str) -> dict:
    '''Some_tool.'''
    return {"x": x, "y": y}

提示:

template = ChatPromptTemplate.from_messages(
    [("system", "You are Cat Agent 007"), ("human", "{question}")]
).with_config({"run_name": "my_template", "tags": ["my_template"]})

示例:

from langchain_core.runnables import RunnableLambda

async def reverse(s: str) -> str:
    return s[::-1]

chain = RunnableLambda(func=reverse)

events = [
    event async for event in chain.astream_events("hello", version="v2")
]

# will produce the following events (run_id, and parent_ids
# has been omitted for brevity):
[
    {
        "data": {"input": "hello"},
        "event": "on_chain_start",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"chunk": "olleh"},
        "event": "on_chain_stream",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
    {
        "data": {"output": "olleh"},
        "event": "on_chain_end",
        "metadata": {},
        "name": "reverse",
        "tags": [],
    },
]

示例:分发自定义事件

from langchain_core.callbacks.manager import (
    adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio


async def slow_thing(some_input: str, config: RunnableConfig) -> str:
    """Do something that takes a long time."""
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 1 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    await adispatch_custom_event(
        "progress_event",
        {"message": "Finished step 2 of 3"},
        config=config # Must be included for python < 3.10
    )
    await asyncio.sleep(1) # Placeholder for some slow operation
    return "Done"

slow_thing = RunnableLambda(slow_thing)

async for event in slow_thing.astream_events("some_input", version="v2"):
    print(event)
Parameters:
  • input (Any) – Runnable 的输入。

  • config (RunnableConfig | None) – 用于Runnable的配置。

  • version (Literal['v1', 'v2']) – 使用的模式版本,可以是 v2v1。 用户应使用 v2v1 是为了向后兼容,将在 0.4.0 版本中弃用。 在 API 稳定之前不会分配默认值。 自定义事件仅在 v2 中显示。

  • include_names (Sequence[str] | None) – 仅包含来自具有匹配名称的可运行对象的事件。

  • include_types (Sequence[str] | None) – 仅包含来自具有匹配类型的可运行对象的事件。

  • include_tags (Sequence[str] | None) – 仅包含具有匹配标签的可运行对象的事件。

  • exclude_names (Sequence[str] | None) – 排除具有匹配名称的可运行对象的事件。

  • exclude_types (Sequence[str] | None) – 排除具有匹配类型的可运行对象的事件。

  • exclude_tags (Sequence[str] | None) – 排除具有匹配标签的可运行对象的事件。

  • kwargs (Any) – 传递给 Runnable 的额外关键字参数。 这些参数将传递给 astream_log,因为 astream_events 的实现是基于 astream_log 的。

Yields:

一个异步的StreamEvents流。

Raises:

NotImplementedError – 如果版本不是v1v2

Return type:

AsyncIterator[StandardStreamEvent | CustomStreamEvent]

batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) list[Output]#

默认实现使用线程池执行器并行运行invoke。

batch的默认实现对于IO绑定的runnables效果很好。

如果子类能够更高效地进行批处理,则应重写此方法; 例如,如果底层的Runnable使用支持批处理模式的API。

Parameters:
Return type:

列表[输出]

batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) Iterator[tuple[int, Output | Exception]]#

在输入列表上并行运行invoke,在它们完成时产生结果。

Parameters:
Return type:

Iterator[元组[int, Output | 异常]]

bind(**kwargs: Any) Runnable[Input, Output]#

将参数绑定到Runnable,返回一个新的Runnable。

当链中的Runnable需要一个不在前一个Runnable输出中或用户输入中的参数时,这很有用。

Parameters:

kwargs (Any) – 绑定到Runnable的参数。

Returns:

一个新的Runnable,参数已绑定。

Return type:

Runnable[Input, Output]

示例:

from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import StrOutputParser

llm = ChatOllama(model='llama2')

# Without bind.
chain = (
    llm
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'

# With bind.
chain = (
    llm.bind(stop=["three"])
    | StrOutputParser()
)

chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
bind_tools(tools: Sequence[Dict[str, Any] | Type[Any] | Callable[[...], Any] | BaseTool], *, tool_choice: Dict[str, Any] | bool | str | None = None, parallel_tool_calls: bool | None = False, **kwargs: Any) Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], BaseMessage][source]#

将类似工具的对象绑定到此聊天模型

tool_choice: 目前不支持“any”,选择应为以下之一 [“auto”, “none”, “required”]

Parameters:
  • 工具 (序列[字典[字符串, 任意] | 类型[任意] | 可调用[[...], 任意] | BaseTool])

  • tool_choice (Dict[str, Any] | bool | str | None)

  • parallel_tool_calls (bool | None)

  • kwargs (Any)

Return type:

Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], BaseMessage]

configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) RunnableSerializable#

配置可以在运行时设置的Runnables的替代方案。

Parameters:
  • which (ConfigurableField) – 将用于选择替代项的ConfigurableField实例。

  • default_key (str) – 如果没有选择其他选项,则使用的默认键。 默认为“default”。

  • prefix_keys (bool) – 是否在键前加上 ConfigurableField 的 id。 默认为 False。

  • **kwargs (Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) – 一个字典,键为Runnable实例或返回Runnable实例的可调用对象。

Returns:

一个新的Runnable,配置了替代方案。

Return type:

RunnableSerializable

from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatAnthropic(
    model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
    ConfigurableField(id="llm"),
    default_key="anthropic",
    openai=ChatOpenAI()
)

# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)

# uses ChatOpenAI
print(
    model.with_config(
        configurable={"llm": "openai"}
    ).invoke("which organization created you?").content
)
configurable_fields(**kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) RunnableSerializable#

在运行时配置特定的Runnable字段。

Parameters:

**kwargs (ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) – 一个包含ConfigurableField实例的字典,用于配置。

Returns:

一个新的Runnable,其字段已配置。

Return type:

RunnableSerializable

from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI

model = ChatOpenAI(max_tokens=20).configurable_fields(
    max_tokens=ConfigurableField(
        id="output_token_number",
        name="Max tokens in the output",
        description="The maximum number of tokens in the output",
    )
)

# max_tokens = 20
print(
    "max_tokens_20: ",
    model.invoke("tell me something about chess").content
)

# max_tokens = 200
print("max_tokens_200: ", model.with_config(
    configurable={"output_token_number": 200}
    ).invoke("tell me something about chess").content
)
get_num_tokens(text: str) int#

获取文本中存在的标记数量。

用于检查输入是否适合模型的上下文窗口。

Parameters:

文本 (字符串) – 要分词的字符串输入。

Returns:

文本中的标记的整数数量。

Return type:

整数

get_num_tokens_from_messages(messages: list[BaseMessage], tools: Sequence | None = None) int#

获取消息中的令牌数量。

用于检查输入是否适合模型的上下文窗口。

注意: get_num_tokens_from_messages 的基本实现忽略了工具模式。

Parameters:
  • messages (list[BaseMessage]) – 要标记化的消息输入。

  • 工具 (序列 | ) – 如果提供,则为字典、BaseModel、函数或BaseTools的序列,将被转换为工具模式。

Returns:

消息中令牌数量的总和。

Return type:

整数

get_token_ids(text: str) list[int]#

返回文本中标记的有序ID。

Parameters:

文本 (字符串) – 要分词的字符串输入。

Returns:

与文本中的标记对应的ID列表,按它们在文本中出现的顺序排列

在文本中。

Return type:

列表[int]

invoke(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) BaseMessage#

将单个输入转换为输出。重写以实现。

Parameters:
  • input (LanguageModelInput) – Runnable 的输入。

  • config (可选[RunnableConfig]) – 调用Runnable时使用的配置。 该配置支持标准键,如用于跟踪目的的‘tags’、‘metadata’,用于控制并行工作量的‘max_concurrency’,以及其他键。更多详情请参考RunnableConfig。

  • stop (可选[列表[字符串]])

  • kwargs (Any)

Returns:

Runnable 的输出。

Return type:

BaseMessage

stream(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) Iterator[BaseMessageChunk]#

流的默认实现,调用invoke。 如果子类支持流输出,则应重写此方法。

Parameters:
  • input (LanguageModelInput) – Runnable 的输入。

  • config (可选[RunnableConfig]) – 用于Runnable的配置。默认为None。

  • kwargs (Any) – 传递给Runnable的额外关键字参数。

  • stop (可选[列表[字符串]])

Yields:

Runnable 的输出。

Return type:

迭代器[BaseMessageChunk]

with_alisteners(*, on_start: AsyncListener | None = None, on_end: AsyncListener | None = None, on_error: AsyncListener | None = None) Runnable[Input, Output]#

将异步生命周期监听器绑定到一个Runnable,返回一个新的Runnable。

on_start: 在Runnable开始运行之前异步调用。 on_end: 在Runnable完成运行之后异步调用。 on_error: 如果Runnable抛出错误,则异步调用。

Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。

Parameters:
  • on_start (Optional[AsyncListener]) – 在Runnable开始运行之前异步调用。 默认为None。

  • on_end (Optional[AsyncListener]) – 在Runnable运行结束后异步调用。 默认为None。

  • on_error (可选[AsyncListener]) – 如果Runnable抛出错误,则异步调用。 默认为None。

Returns:

一个新的Runnable,绑定了监听器。

Return type:

Runnable[Input, Output]

示例:

from langchain_core.runnables import RunnableLambda
import time

async def test_runnable(time_to_sleep : int):
    print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
    await asyncio.sleep(time_to_sleep)
    print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")

async def fn_start(run_obj : Runnable):
    print(f"on start callback starts at {format_t(time.time())}
    await asyncio.sleep(3)
    print(f"on start callback ends at {format_t(time.time())}")

async def fn_end(run_obj : Runnable):
    print(f"on end callback starts at {format_t(time.time())}
    await asyncio.sleep(2)
    print(f"on end callback ends at {format_t(time.time())}")

runnable = RunnableLambda(test_runnable).with_alisteners(
    on_start=fn_start,
    on_end=fn_end
)
async def concurrent_runs():
    await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))

asyncio.run(concurrent_runs())
Result:
on start callback starts at 2024-05-16T14:20:29.637053+00:00
on start callback starts at 2024-05-16T14:20:29.637150+00:00
on start callback ends at 2024-05-16T14:20:32.638305+00:00
on start callback ends at 2024-05-16T14:20:32.638383+00:00
Runnable[3s]: starts at 2024-05-16T14:20:32.638849+00:00
Runnable[5s]: starts at 2024-05-16T14:20:32.638999+00:00
Runnable[3s]: ends at 2024-05-16T14:20:35.640016+00:00
on end callback starts at 2024-05-16T14:20:35.640534+00:00
Runnable[5s]: ends at 2024-05-16T14:20:37.640169+00:00
on end callback starts at 2024-05-16T14:20:37.640574+00:00
on end callback ends at 2024-05-16T14:20:37.640654+00:00
on end callback ends at 2024-05-16T14:20:39.641751+00:00
with_config(config: RunnableConfig | None = None, **kwargs: Any) Runnable[Input, Output]#

将配置绑定到一个可运行对象,返回一个新的可运行对象。

Parameters:
  • config (RunnableConfig | None) – 绑定到Runnable的配置。

  • kwargs (Any) – 传递给Runnable的额外关键字参数。

Returns:

一个新的Runnable,带有绑定的配置。

Return type:

Runnable[Input, Output]

with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (<class 'Exception'>,), exception_key: Optional[str] = None) RunnableWithFallbacksT[Input, Output]#

为Runnable添加回退,返回一个新的Runnable。

新的Runnable将尝试原始的Runnable,然后在失败时依次尝试每个回退。

Parameters:
  • fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 runnables。

  • exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。 默认为 (Exception,)。

  • exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。默认为 None。

Returns:

一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。

Return type:

RunnableWithFallbacksT[Input, Output]

示例

from typing import Iterator

from langchain_core.runnables import RunnableGenerator


def _generate_immediate_error(input: Iterator) -> Iterator[str]:
    raise ValueError()
    yield ""


def _generate(input: Iterator) -> Iterator[str]:
    yield from "foo bar"


runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
    [RunnableGenerator(_generate)]
    )
print(''.join(runnable.stream({}))) #foo bar
Parameters:
  • fallbacks (Sequence[Runnable[Input, Output]]) – 如果原始 Runnable 失败,将尝试的一系列 runnables。

  • exceptions_to_handle (tuple[type[BaseException], ...]) – 要处理的异常类型的元组。

  • exception_key (Optional[str]) – 如果指定了字符串,则处理的异常将作为输入的一部分传递给后备函数,使用指定的键。如果为 None,异常将不会传递给后备函数。如果使用此参数,基础 Runnable 及其后备函数必须接受字典作为输入。

Returns:

一个新的Runnable,它将在失败时尝试原始的Runnable,然后依次尝试每个回退。

Return type:

RunnableWithFallbacksT[Input, Output]

with_listeners(*, on_start: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None, on_error: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None) Runnable[Input, Output]#

将生命周期监听器绑定到一个Runnable,返回一个新的Runnable。

on_start: 在Runnable开始运行之前调用,带有Run对象。 on_end: 在Runnable完成运行之后调用,带有Run对象。 on_error: 如果Runnable抛出错误时调用,带有Run对象。

Run对象包含有关运行的信息,包括其id、类型、输入、输出、错误、开始时间、结束时间以及添加到运行中的任何标签或元数据。

Parameters:
  • on_start (可选[联合[可调用[[运行], ], 可调用[[运行, RunnableConfig], ]]]) – 在Runnable开始运行之前调用。默认为无。

  • on_end (可选[联合[可调用[[运行], ], 可调用[[运行, RunnableConfig], ]]]) – 在Runnable完成运行后调用。默认为无。

  • on_error (可选[联合[可调用[[运行], ], 可调用[[运行, RunnableConfig], ]]]) – 如果Runnable抛出错误时调用。默认为无。

Returns:

一个新的Runnable,绑定了监听器。

Return type:

Runnable[Input, Output]

示例:

from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run

import time

def test_runnable(time_to_sleep : int):
    time.sleep(time_to_sleep)

def fn_start(run_obj: Run):
    print("start_time:", run_obj.start_time)

def fn_end(run_obj: Run):
    print("end_time:", run_obj.end_time)

chain = RunnableLambda(test_runnable).with_listeners(
    on_start=fn_start,
    on_end=fn_end
)
chain.invoke(2)
with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (<class 'Exception'>,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) Runnable[Input, Output]#

创建一个新的Runnable,在异常时重试原始的Runnable。

Parameters:
  • retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于重试。 默认值为 (Exception,)。

  • wait_exponential_jitter (bool) – 是否在重试之间的等待时间中添加抖动。默认为 True。

  • stop_after_attempt (int) – 在放弃之前尝试的最大次数。默认为3。

Returns:

一个新的Runnable,在异常时重试原始的Runnable。

Return type:

Runnable[Input, Output]

示例:

from langchain_core.runnables import RunnableLambda

count = 0


def _lambda(x: int) -> None:
    global count
    count = count + 1
    if x == 1:
        raise ValueError("x is 1")
    else:
         pass


runnable = RunnableLambda(_lambda)
try:
    runnable.with_retry(
        stop_after_attempt=2,
        retry_if_exception_type=(ValueError,),
    ).invoke(1)
except ValueError:
    pass

assert (count == 2)
Parameters:
  • retry_if_exception_type (tuple[type[BaseException], ...]) – 一个异常类型的元组,用于在发生这些异常时重试

  • wait_exponential_jitter (bool) – 是否在重试之间为等待时间添加抖动

  • stop_after_attempt (int) – 在放弃之前尝试的最大次数

Returns:

一个新的Runnable,在异常时重试原始的Runnable。

Return type:

Runnable[Input, Output]

with_structured_output(schema: Dict[str, Any] | Type[BaseModel] | None = None, *, method: Literal['function_calling', 'json_mode', 'json_schema'] = 'function_calling', include_raw: bool = False, **kwargs: Any) Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], Dict[str, Any] | BaseModel][source]#

模型包装器,返回格式化为匹配给定模式的输出。

Args:
schema:
The output schema. Can be passed in as:
  • 一个OpenAI函数/工具模式,

  • 一个 JSON 模式,

  • 一个 TypedDict 类,

  • 或者是一个 Pydantic.BaseModel 类。

如果 schema 是一个 Pydantic 类,那么模型输出将是该类的 Pydantic 实例,并且模型生成的字段将由 Pydantic 类进行验证。否则,模型输出将是一个字典,并且不会被验证。有关在指定 Pydantic 或 TypedDict 类时如何正确指定模式字段的类型和描述的更多信息,请参见 langchain_core.utils.function_calling.convert_to_openai_tool()

method:

用于指导模型生成的方法,可以是“function_calling”、“json_mode”或“json_schema”。 如果选择“function_calling”,则模式将被转换为OpenAI函数,返回的模型将使用函数调用API。如果选择“json_mode”或“json_schema”,则将使用OpenAI的JSON模式。 请注意,如果使用“json_mode”或“json_schema”,则必须在模型调用中包含将输出格式化为所需模式的指令。

include_raw:

如果为False,则只返回解析后的结构化输出。如果在模型输出解析过程中发生错误,将会抛出。如果为True,则返回原始模型响应(一个BaseMessage)和解析后的模型响应。如果在输出解析过程中发生错误,它将被捕获并返回。最终输出始终是一个包含“raw”、“parsed”和“parsing_error”键的字典。

Returns:

一个Runnable,它接受与langchain_core.language_models.chat.BaseChatModel相同的输入。

如果 include_raw 为 False 且 schema 是一个 Pydantic 类,Runnable 输出 schema 的一个实例(即一个 Pydantic 对象)。

否则,如果 include_raw 为 False,则 Runnable 输出一个字典。

If include_raw is True, then Runnable outputs a dict with keys:
  • “raw”: BaseMessage

  • “parsed”: 如果存在解析错误则为None,否则类型取决于上述描述的schema

  • “parsing_error”: 可选的[BaseException]

Example: schema=Pydantic class, method=”function_calling”, include_raw=False:
from typing import Optional

from langchain_community.chat_models import ChatSambaNovaCloud
from pydantic import BaseModel, Field


class AnswerWithJustification(BaseModel):
    '''An answer to the user question along with justification for the answer.'''

    answer: str
    justification: str = Field(
        description="A justification for the answer."
    )


llm = ChatSambaNovaCloud(model="Meta-Llama-3.1-70B-Instruct", temperature=0)
structured_llm = llm.with_structured_output(AnswerWithJustification)

structured_llm.invoke(
    "What weighs more a pound of bricks or a pound of feathers"
)

# -> AnswerWithJustification(
#     answer='They weigh the same',
#     justification='A pound is a unit of weight or mass, so a pound of bricks and a pound of feathers both weigh the same.'
# )
Example: schema=Pydantic class, method=”function_calling”, include_raw=True:
from langchain_community.chat_models import ChatSambaNovaCloud
from pydantic import BaseModel


class AnswerWithJustification(BaseModel):
    '''An answer to the user question along with justification for the answer.'''

    answer: str
    justification: str


llm = ChatSambaNovaCloud(model="Meta-Llama-3.1-70B-Instruct", temperature=0)
structured_llm = llm.with_structured_output(
    AnswerWithJustification, include_raw=True
)

structured_llm.invoke(
    "What weighs more a pound of bricks or a pound of feathers"
)
# -> {
#     'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'function': {'arguments': '{"answer": "They weigh the same.", "justification": "A pound is a unit of weight or mass, so one pound of bricks and one pound of feathers both weigh the same amount."}', 'name': 'AnswerWithJustification'}, 'id': 'call_17a431fc6a4240e1bd', 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'usage': {'acceptance_rate': 5, 'completion_tokens': 53, 'completion_tokens_after_first_per_sec': 343.7964936837758, 'completion_tokens_after_first_per_sec_first_ten': 439.1205661878638, 'completion_tokens_per_sec': 162.8511306784833, 'end_time': 1731527851.0698032, 'is_last_response': True, 'prompt_tokens': 213, 'start_time': 1731527850.7137961, 'time_to_first_token': 0.20475482940673828, 'total_latency': 0.32545061111450196, 'total_tokens': 266, 'total_tokens_per_sec': 817.3283162354066}, 'model_name': 'Meta-Llama-3.1-70B-Instruct', 'system_fingerprint': 'fastcoe', 'created': 1731527850}, id='95667eaf-447f-4b53-bb6e-b6e1094ded88', tool_calls=[{'name': 'AnswerWithJustification', 'args': {'answer': 'They weigh the same.', 'justification': 'A pound is a unit of weight or mass, so one pound of bricks and one pound of feathers both weigh the same amount.'}, 'id': 'call_17a431fc6a4240e1bd', 'type': 'tool_call'}]),
#     'parsed': AnswerWithJustification(answer='They weigh the same.', justification='A pound is a unit of weight or mass, so one pound of bricks and one pound of feathers both weigh the same amount.'),
#     'parsing_error': None
# }
Example: schema=TypedDict class, method=”function_calling”, include_raw=False:
# IMPORTANT: If you are using Python <=3.8, you need to import Annotated
# from typing_extensions, not from typing.
from typing_extensions import Annotated, TypedDict

from langchain_community.chat_models import ChatSambaNovaCloud


class AnswerWithJustification(TypedDict):
    '''An answer to the user question along with justification for the answer.'''

    answer: str
    justification: Annotated[
        Optional[str], None, "A justification for the answer."
    ]


llm = ChatSambaNovaCloud(model="Meta-Llama-3.1-70B-Instruct", temperature=0)
structured_llm = llm.with_structured_output(AnswerWithJustification)

structured_llm.invoke(
    "What weighs more a pound of bricks or a pound of feathers"
)
# -> {
#     'answer': 'They weigh the same',
#     'justification': 'A pound is a unit of weight or mass, so one pound of bricks and one pound of feathers both weigh the same amount.'
# }
Example: schema=OpenAI function schema, method=”function_calling”, include_raw=False:
from langchain_community.chat_models import ChatSambaNovaCloud

oai_schema = {
    'name': 'AnswerWithJustification',
    'description': 'An answer to the user question along with justification for the answer.',
    'parameters': {
        'type': 'object',
        'properties': {
            'answer': {'type': 'string'},
            'justification': {'description': 'A justification for the answer.', 'type': 'string'}
        },
       'required': ['answer']
   }
}

llm = ChatSambaNovaCloud(model="Meta-Llama-3.1-70B-Instruct", temperature=0)
structured_llm = llm.with_structured_output(oai_schema)

structured_llm.invoke(
    "What weighs more a pound of bricks or a pound of feathers"
)
# -> {
#     'answer': 'They weigh the same',
#     'justification': 'A pound is a unit of weight or mass, so one pound of bricks and one pound of feathers both weigh the same amount.'
# }
Example: schema=Pydantic class, method=”json_mode”, include_raw=True:
from langchain_community.chat_models import ChatSambaNovaCloud
from pydantic import BaseModel

class AnswerWithJustification(BaseModel):
    answer: str
    justification: str

llm = ChatSambaNovaCloud(model="Meta-Llama-3.1-70B-Instruct", temperature=0)
structured_llm = llm.with_structured_output(
    AnswerWithJustification,
    method="json_mode",
    include_raw=True
)

structured_llm.invoke(
    "Answer the following question. "
    "Make sure to return a JSON blob with keys 'answer' and 'justification'.

“一磅砖和一磅羽毛,哪个更重?”

) # -> { # ‘raw’: AIMessage(content=’{

“answer”: “它们的重量相同”, “justification”: “磅是重量或质量的单位,因此一磅砖和一磅羽毛的重量相同,都是一磅。区别在于它们的密度和体积。由于密度的不同,一磅羽毛会比一磅砖占据更多的空间。”

}’, additional_kwargs={}, response_metadata={‘finish_reason’: ‘stop’, ‘usage’: {‘acceptance_rate’: 5.3125, ‘completion_tokens’: 79, ‘completion_tokens_after_first_per_sec’: 292.65701089829776, ‘completion_tokens_after_first_per_sec_first_ten’: 346.43324678555325, ‘completion_tokens_per_sec’: 200.012158915008, ‘end_time’: 1731528071.1708555, ‘is_last_response’: True, ‘prompt_tokens’: 70, ‘start_time’: 1731528070.737394, ‘time_to_first_token’: 0.16693782806396484, ‘total_latency’: 0.3949759876026827, ‘total_tokens’: 149, ‘total_tokens_per_sec’: 377.2381225105847}, ‘model_name’: ‘Meta-Llama-3.1-70B-Instruct’, ‘system_fingerprint’: ‘fastcoe’, ‘created’: 1731528070}, id=’83208297-3eb9-4021-a856-ca78a15758df’),

# ‘parsed’: AnswerWithJustification(answer=’它们的重量相同’, justification=’磅是重量或质量的单位,因此一磅砖和一磅羽毛的重量相同,都是一磅。区别在于它们的密度和体积。由于密度的不同,一磅羽毛会比一磅砖占据更多的空间。’), # ‘parsing_error’: None # }

Example: schema=None, method=”json_mode”, include_raw=True:
from langchain_community.chat_models import ChatSambaNovaCloud

llm = ChatSambaNovaCloud(model="Meta-Llama-3.1-70B-Instruct", temperature=0)
structured_llm = llm.with_structured_output(method="json_mode", include_raw=True)

structured_llm.invoke(
    "Answer the following question. "
    "Make sure to return a JSON blob with keys 'answer' and 'justification'.

“一磅砖和一磅羽毛,哪个更重?”

) # -> { # ‘raw’: AIMessage(content=’{

“answer”: “它们的重量相同”, “justification”: “磅是重量或质量的单位,因此一磅砖和一磅羽毛的重量相同,都是一磅。区别在于它们的密度和体积。由于密度的不同,一磅羽毛会比一磅砖占据更多的空间。”

}’, additional_kwargs={}, response_metadata={‘finish_reason’: ‘stop’, ‘usage’: {‘acceptance_rate’: 4.722222222222222, ‘completion_tokens’: 79, ‘completion_tokens_after_first_per_sec’: 357.1315485254867, ‘completion_tokens_after_first_per_sec_first_ten’: 416.83279609305305, ‘completion_tokens_per_sec’: 240.92819585198137, ‘end_time’: 1731528164.8474727, ‘is_last_response’: True, ‘prompt_tokens’: 70, ‘start_time’: 1731528164.4906917, ‘time_to_first_token’: 0.13837409019470215, ‘total_latency’: 0.3278985247892492, ‘total_tokens’: 149, ‘total_tokens_per_sec’: 454.4088757208256}, ‘model_name’: ‘Meta-Llama-3.1-70B-Instruct’, ‘system_fingerprint’: ‘fastcoe’, ‘created’: 1731528164}, id=’15261eaf-8a25-42ef-8ed5-f63d8bf5b1b0’),

# ‘parsed’: { # ‘answer’: ‘它们的重量相同’, # ‘justification’: ‘磅是重量或质量的单位,因此一磅砖和一磅羽毛的重量相同,都是一磅。区别在于它们的密度和体积。由于密度的不同,一磅羽毛会比一磅砖占据更多的空间。’}, # }, # ‘parsing_error’: None # }

Example: schema=None, method=”json_schema”, include_raw=True:
from langchain_community.chat_models import ChatSambaNovaCloud

class AnswerWithJustification(BaseModel):
    answer: str
    justification: str

llm = ChatSambaNovaCloud(model="Meta-Llama-3.1-70B-Instruct", temperature=0)
structured_llm = llm.with_structured_output(AnswerWithJustification, method="json_schema", include_raw=True)

structured_llm.invoke(
    "Answer the following question. "
    "Make sure to return a JSON blob with keys 'answer' and 'justification'.

“一磅砖和一磅羽毛,哪个更重?”

) # -> { # ‘raw’: AIMessage(content=’{

“answer”: “它们的重量相同”, “justification”: “磅是重量或质量的单位,因此一磅砖和一磅羽毛的重量相同,都是一磅。区别在于它们的密度和体积。由于密度的不同,一磅羽毛会比一磅砖占据更多的空间。”

}’, additional_kwargs={}, response_metadata={‘finish_reason’: ‘stop’, ‘usage’: {‘acceptance_rate’: 5.3125, ‘completion_tokens’: 79, ‘completion_tokens_after_first_per_sec’: 292.65701089829776, ‘completion_tokens_after_first_per_sec_first_ten’: 346.43324678555325, ‘completion_tokens_per_sec’: 200.012158915008, ‘end_time’: 1731528071.1708555, ‘is_last_response’: True, ‘prompt_tokens’: 70, ‘start_time’: 1731528070.737394, ‘time_to_first_token’: 0.16693782806396484, ‘total_latency’: 0.3949759876026827, ‘total_tokens’: 149, ‘total_tokens_per_sec’: 377.2381225105847}, ‘model_name’: ‘Meta-Llama-3.1-70B-Instruct’, ‘system_fingerprint’: ‘fastcoe’, ‘created’: 1731528070}, id=’83208297-3eb9-4021-a856-ca78a15758df’),

# ‘parsed’: AnswerWithJustification(answer=’它们的重量相同’, justification=’磅是重量或质量的单位,因此一磅砖和一磅羽毛的重量相同,都是一磅。区别在于它们的密度和体积。由于密度的不同,一磅羽毛会比一磅砖占据更多的空间。’), # ‘parsing_error’: None # }

Parameters:
  • schema (Dict[str, Any] | Type[BaseModel] | None)

  • 方法 (字面量['function_calling', 'json_mode', 'json_schema'])

  • include_raw (bool)

  • kwargs (Any)

Return type:

Runnable[PromptValue | str | Sequence[BaseMessage | list[str] | tuple[str, str] | str | dict[str, Any]], Dict[str, Any] | BaseModel]

with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) Runnable[Input, Output]#

将输入和输出类型绑定到一个Runnable,返回一个新的Runnable。

Parameters:
  • input_type (type[Input] | None) – 要绑定到Runnable的输入类型。默认为None。

  • output_type (type[Output] | None) – 要绑定到Runnable的输出类型。默认为None。

Returns:

一个带有类型绑定的新Runnable。

Return type:

Runnable[Input, Output]