UpstashRatelimitHandler#

class langchain_community.callbacks.upstash_ratelimit_callback.UpstashRatelimitHandler(identifier: str, *, token_ratelimit: None = None, request_ratelimit: None = None, include_output_tokens: bool = False)[source]#

回调函数,用于根据请求数量或输入中的令牌数量处理速率限制。

它使用Upstash Ratelimit来跟踪速率限制,该限制利用Upstash Redis来跟踪状态。

在初始化链时不应传递给链。 这是因为处理程序有一个状态,每次调用时都应该是新的。 相反,每次调用时都应初始化和传递一个处理程序。

创建UpstashRatelimitHandler。必须传递一个标识符来进行速率限制,例如用户ID或IP地址。

此外,必须至少传递token_ratelimit或request_ratelimit参数中的一个。

Parameters:
  • Union[int (标识符) – 标识符

  • str] – 标识符

  • 可选[Ratelimit] (request_ratelimit) – 用于限制令牌数量的速率限制。仅适用于OpenAI模型,因为只有这些模型在其输出中提供令牌数量信息。

  • 可选[Ratelimit] – 用于限制请求数量的速率限制

  • bool (include_output_tokens) – 是否在基于令牌数量进行速率限制时计算输出令牌。仅在传递token_ratelimit时使用。默认为False。

  • identifier (str)

  • token_ratelimit (None)

  • request_ratelimit (None)

  • include_output_tokens (bool)

示例

from upstash_redis import Redis
from upstash_ratelimit import Ratelimit, FixedWindow

redis = Redis.from_env()
ratelimit = Ratelimit(
    redis=redis,
    # fixed window to allow 10 requests every 10 seconds:
    limiter=FixedWindow(max_requests=10, window=10),
)

user_id = "foo"
handler = UpstashRatelimitHandler(
    identifier=user_id,
    request_ratelimit=ratelimit
)

# Initialize a simple runnable to test
chain = RunnableLambda(str)

# pass handler as callback:
output = chain.invoke(
    "input",
    config={
        "callbacks": [handler]
    }
)

属性

ignore_agent

是否忽略代理回调。

ignore_chain

是否忽略链式回调。

ignore_chat_model

是否忽略聊天模型回调。

ignore_custom_event

忽略自定义事件。

ignore_llm

是否忽略LLM回调。

ignore_retriever

是否忽略检索器回调。

ignore_retry

是否忽略重试回调。

raise_error

如果发生异常,是否引发错误。

run_inline

是否内联运行回调。

方法

__init__(identifier, *[, token_ratelimit, ...])

创建 UpstashRatelimitHandler。

on_agent_action(action, *, run_id[, ...])

在代理操作时运行。

on_agent_finish(finish, *, run_id[, ...])

在代理结束时运行。

on_chain_end(outputs, *, run_id[, parent_run_id])

当链结束运行时执行。

on_chain_error(error, *, run_id[, parent_run_id])

当链发生错误时运行。

on_chain_start(serialized, inputs, **kwargs)

当链开始运行时执行。

on_chat_model_start(serialized, messages, *, ...)

当聊天模型开始运行时执行。

on_custom_event(name, data, *, run_id[, ...])

重写以定义自定义事件的处理程序。

on_llm_end(response, **kwargs)

当LLM结束运行时执行

on_llm_error(error, *, run_id[, parent_run_id])

当LLM出错时运行。

on_llm_new_token(token, *[, chunk, ...])

在新的LLM令牌上运行。

on_llm_start(serialized, prompts, **kwargs)

当LLM开始运行时执行

on_retriever_end(documents, *, run_id[, ...])

当检索器结束运行时执行。

on_retriever_error(error, *, run_id[, ...])

当检索器出错时运行。

on_retriever_start(serialized, query, *, run_id)

当检索器开始运行时执行。

on_retry(retry_state, *, run_id[, parent_run_id])

在重试事件时运行。

on_text(text, *, run_id[, parent_run_id])

在任意文本上运行。

on_tool_end(output, *, run_id[, parent_run_id])

当工具结束运行时执行。

on_tool_error(error, *, run_id[, parent_run_id])

当工具出错时运行。

on_tool_start(serialized, input_str, *, run_id)

当工具开始运行时执行。

reset([identifier])

创建一个新的UpstashRatelimitHandler对象,具有相同的速率限制配置,但如果提供了新的标识符,则使用新的标识符。

__init__(identifier: str, *, token_ratelimit: None = None, request_ratelimit: None = None, include_output_tokens: bool = False)[source]#

创建UpstashRatelimitHandler。必须传递一个标识符来进行速率限制,例如用户ID或IP地址。

此外,必须至少传递token_ratelimit或request_ratelimit参数中的一个。

Parameters:
  • Union[int (标识符) – 标识符

  • str] – 标识符

  • 可选[Ratelimit] (request_ratelimit) – 用于限制令牌数量的速率限制。仅适用于OpenAI模型,因为只有这些模型在其输出中提供令牌数量信息。

  • 可选[Ratelimit] – 用于限制请求数量的速率限制

  • bool (include_output_tokens) – 是否在基于令牌数量进行速率限制时计算输出令牌。仅在传递token_ratelimit时使用。默认为False。

  • identifier (str)

  • token_ratelimit (None)

  • request_ratelimit (None)

  • include_output_tokens (bool)

示例

from upstash_redis import Redis
from upstash_ratelimit import Ratelimit, FixedWindow

redis = Redis.from_env()
ratelimit = Ratelimit(
    redis=redis,
    # fixed window to allow 10 requests every 10 seconds:
    limiter=FixedWindow(max_requests=10, window=10),
)

user_id = "foo"
handler = UpstashRatelimitHandler(
    identifier=user_id,
    request_ratelimit=ratelimit
)

# Initialize a simple runnable to test
chain = RunnableLambda(str)

# pass handler as callback:
output = chain.invoke(
    "input",
    config={
        "callbacks": [handler]
    }
)
on_agent_action(action: AgentAction, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

在代理操作上运行。

Parameters:
  • action (AgentAction) – 代理动作。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_agent_finish(finish: AgentFinish, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

在代理端运行。

Parameters:
  • finish (AgentFinish) – 代理完成。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_chain_end(outputs: dict[str, Any], *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

当链结束运行时执行。

Parameters:
  • outputs (Dict[str, Any]) – 链的输出。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_chain_error(error: BaseException, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

当链发生错误时运行。

Parameters:
  • error (BaseException) – 发生的错误。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_chain_start(serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) Any[source]#

当链开始运行时执行。

在链执行期间,on_chain_start 会多次运行。为了确保它只被调用一次,我们保持一个布尔状态 _checked。如果 self._checked 为假,我们使用 request_ratelimit 调用限制,并在标识符被限速时抛出 UpstashRatelimitError

Parameters:
  • serialized (Dict[str, Any])

  • inputs (Dict[str, Any])

  • kwargs (Any)

Return type:

任何

on_chat_model_start(serialized: dict[str, Any], messages: list[list[BaseMessage]], *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) Any#

当聊天模型开始运行时执行。

ATTENTION: This method is called for chat models. If you’re implementing

对于非聊天模型的处理程序,您应该使用 on_llm_start 代替。

Parameters:
  • serialized (Dict[str, Any]) – 序列化的聊天模型。

  • messages (List[List[BaseMessage]]) – 消息。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • tags (可选[列表[字符串]]) – 标签。

  • metadata (可选[字典[字符串, 任意类型]]) – 元数据。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_custom_event(name: str, data: Any, *, run_id: UUID, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) Any#

重写以定义自定义事件的处理程序。

Parameters:
  • name (str) – 自定义事件的名称。

  • data (Any) – 自定义事件的数据。格式将与用户指定的格式匹配。

  • run_id (UUID) – 运行的ID。

  • tags (list[str] | None) – 与自定义事件关联的标签(包括继承的标签)。

  • metadata (dict[str, Any] | None) – 与自定义事件关联的元数据 (包括继承的元数据)。

  • kwargs (Any)

Return type:

任何

在版本0.2.15中添加。

on_llm_end(response: LLMResult, **kwargs: Any) None[source]#

当LLM结束运行时执行

如果include_output_tokens设置为True,则会计算LLM完成中的令牌数量以进行速率限制

Parameters:
Return type:

on_llm_error(error: BaseException, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

当LLM出错时运行。

Parameters:
  • error (BaseException) – 发生的错误。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_llm_new_token(token: str, *, chunk: GenerationChunk | ChatGenerationChunk | None = None, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

在新的LLM令牌上运行。仅在启用流式传输时可用。

Parameters:
  • token (str) – 新的token。

  • chunk (GenerationChunk | ChatGenerationChunk) – 新生成的块,包含内容和其他信息。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_llm_start(serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) None[source]#

当LLM开始运行时执行

Parameters:
  • serialized (Dict[str, Any])

  • 提示 (列表[字符串])

  • kwargs (Any)

Return type:

on_retriever_end(documents: Sequence[Document], *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

当Retriever结束运行时执行。

Parameters:
  • documents (Sequence[Document]) – 检索到的文档。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_retriever_error(error: BaseException, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

当Retriever出错时运行。

Parameters:
  • error (BaseException) – 发生的错误。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_retriever_start(serialized: dict[str, Any], query: str, *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) Any#

当Retriever开始运行时执行。

Parameters:
  • serialized (Dict[str, Any]) – 序列化的检索器。

  • query (str) – 查询。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • tags (可选[列表[字符串]]) – 标签。

  • metadata (可选[字典[字符串, 任意类型]]) – 元数据。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_retry(retry_state: RetryCallState, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

在重试事件上运行。

Parameters:
  • retry_state (RetryCallState) – 重试状态。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_text(text: str, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

在任意文本上运行。

Parameters:
  • 文本 (str) – 文本内容。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_tool_end(output: Any, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

当工具结束运行时执行。

Parameters:
  • output (Any) – 工具的输出。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_tool_error(error: BaseException, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) Any#

当工具出错时运行。

Parameters:
  • error (BaseException) – 发生的错误。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

on_tool_start(serialized: dict[str, Any], input_str: str, *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, inputs: dict[str, Any] | None = None, **kwargs: Any) Any#

当工具开始运行时执行。

Parameters:
  • serialized (Dict[str, Any]) – 序列化的工具。

  • input_str (str) – 输入字符串。

  • run_id (UUID) – 运行ID。这是当前运行的ID。

  • parent_run_id (UUID) – 父运行ID。这是父运行的ID。

  • tags (可选[列表[字符串]]) – 标签。

  • metadata (可选[字典[字符串, 任意类型]]) – 元数据。

  • inputs (可选[Dict[str, Any]]) – 输入。

  • kwargs (Any) – 额外的关键字参数。

Return type:

任何

reset(identifier: str | None = None) UpstashRatelimitHandler[source]#

创建一个新的UpstashRatelimitHandler对象,具有相同的速率限制配置,但如果提供了新的标识符,则使用新的标识符。

同时重置处理程序的状态。

Parameters:

identifier (str | None)

Return type:

UpstashRatelimitHandler

使用 UpstashRatelimitHandler 的示例