Source code for langchain_community.callbacks.streamlit.streamlit_callback_handler

"""回调处理程序,用于打印到streamlit。"""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional

from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult

from langchain_community.callbacks.streamlit.mutable_expander import MutableExpander

if TYPE_CHECKING:
    from streamlit.delta_generator import DeltaGenerator


def _convert_newlines(text: str) -> str:
    """将换行符转换为markdown换行序列(空格、空格、换行符)。
"""
    return text.replace("\n", "  \n")


CHECKMARK_EMOJI = "✅"
THINKING_EMOJI = ":thinking_face:"
HISTORY_EMOJI = ":books:"
EXCEPTION_EMOJI = "⚠️"


[docs]class LLMThoughtState(Enum): """LLMThought状态的枚举器。""" # The LLM is thinking about what to do next. We don't know which tool we'll run. THINKING = "THINKING" # The LLM has decided to run a tool. We don't have results from the tool yet. RUNNING_TOOL = "RUNNING_TOOL" # We have results from the tool. COMPLETE = "COMPLETE"
[docs]class ToolRecord(NamedTuple): """将工具记录为一个NamedTuple。""" name: str input_str: str
[docs]class LLMThoughtLabeler: """生成LLMThought容器的Markdown标签。将自定义的子类传递给StreamlitCallbackHandler,以覆盖其默认的标记逻辑。"""
[docs] def get_initial_label(self) -> str: """返回一个新的LLMThought的markdown标签,该LLMThought尚未关联任何工具。 """ return f"{THINKING_EMOJI} **Thinking...**"
[docs] def get_tool_label(self, tool: ToolRecord, is_complete: bool) -> str: """返回具有关联工具的LLMThought的标签。 参数 ---------- tool 工具的ToolRecord is_complete 如果思考完成,则为True;如果思考仍在接收输入,则为False。 返回 ------- 思考容器的Markdown标签。 """ input = tool.input_str name = tool.name emoji = CHECKMARK_EMOJI if is_complete else THINKING_EMOJI if name == "_Exception": emoji = EXCEPTION_EMOJI name = "Parsing error" idx = min([60, len(input)]) input = input[0:idx] if len(tool.input_str) > idx: input = input + "..." input = input.replace("\n", " ") label = f"{emoji} **{name}:** {input}" return label
[docs] def get_history_label(self) -> str: """返回一个markdown标签,用于表示包含溢出思维的特殊“history”容器。 """ return f"{HISTORY_EMOJI} **History**"
[docs] def get_final_agent_thought_label(self) -> str: """返回代理的最终思考的标签 - “现在我有答案”思考,不涉及工具。 """ return f"{CHECKMARK_EMOJI} **Complete!**"
[docs]class LLMThought: """LLM思维流中的一个思想。"""
[docs] def __init__( self, parent_container: DeltaGenerator, labeler: LLMThoughtLabeler, expanded: bool, collapse_on_complete: bool, ): """初始化LLMThought。 参数: parent_container:我们要写入的容器。 labeler:用于此思想的标签器。 expanded:思想是否应默认展开。 collapse_on_complete:思想是否应在完成时折叠。 """ self._container = MutableExpander( parent_container=parent_container, label=labeler.get_initial_label(), expanded=expanded, ) self._state = LLMThoughtState.THINKING self._llm_token_stream = "" self._llm_token_writer_idx: Optional[int] = None self._last_tool: Optional[ToolRecord] = None self._collapse_on_complete = collapse_on_complete self._labeler = labeler
@property def container(self) -> MutableExpander: """我们正在写入的容器。""" return self._container @property def last_tool(self) -> Optional[ToolRecord]: """这个想法执行的最后一个工具""" return self._last_tool def _reset_llm_token_stream(self) -> None: self._llm_token_stream = "" self._llm_token_writer_idx = None
[docs] def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str]) -> None: self._reset_llm_token_stream()
[docs] def on_llm_new_token(self, token: str, **kwargs: Any) -> None: # This is only called when the LLM is initialized with `streaming=True` self._llm_token_stream += _convert_newlines(token) self._llm_token_writer_idx = self._container.markdown( self._llm_token_stream, index=self._llm_token_writer_idx )
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: # `response` is the concatenation of all the tokens received by the LLM. # If we're receiving streaming tokens from `on_llm_new_token`, this response # data is redundant self._reset_llm_token_stream()
[docs] def on_llm_error(self, error: BaseException, **kwargs: Any) -> None: self._container.markdown("**LLM encountered an error...**") self._container.exception(error)
[docs] def on_tool_start( self, serialized: Dict[str, Any], input_str: str, **kwargs: Any ) -> None: # Called with the name of the tool we're about to run (in `serialized[name]`), # and its input. We change our container's label to be the tool name. self._state = LLMThoughtState.RUNNING_TOOL tool_name = serialized["name"] self._last_tool = ToolRecord(name=tool_name, input_str=input_str) self._container.update( new_label=self._labeler.get_tool_label(self._last_tool, is_complete=False) )
[docs] def on_tool_end( self, output: Any, color: Optional[str] = None, observation_prefix: Optional[str] = None, llm_prefix: Optional[str] = None, **kwargs: Any, ) -> None: self._container.markdown(f"**{str(output)}**")
[docs] def on_tool_error(self, error: BaseException, **kwargs: Any) -> None: self._container.markdown("**Tool encountered an error...**") self._container.exception(error)
[docs] def on_agent_action( self, action: AgentAction, color: Optional[str] = None, **kwargs: Any ) -> Any: # Called when we're about to kick off a new tool. The `action` data # tells us the tool we're about to use, and the input we'll give it. # We don't output anything here, because we'll receive this same data # when `on_tool_start` is called immediately after. pass
[docs] def complete(self, final_label: Optional[str] = None) -> None: """完成这个想法。""" if final_label is None and self._state == LLMThoughtState.RUNNING_TOOL: assert ( self._last_tool is not None ), "_last_tool should never be null when _state == RUNNING_TOOL" final_label = self._labeler.get_tool_label( self._last_tool, is_complete=True ) self._state = LLMThoughtState.COMPLETE if self._collapse_on_complete: self._container.update(new_label=final_label, new_expanded=False) else: self._container.update(new_label=final_label)
[docs] def clear(self) -> None: """从屏幕上移除这个想法。清除的想法无法被重复使用。""" self._container.clear()
[docs]class StreamlitCallbackHandler(BaseCallbackHandler): """回调处理程序,用于向Streamlit应用程序写入。"""
[docs] def __init__( self, parent_container: DeltaGenerator, *, max_thought_containers: int = 4, expand_new_thoughts: bool = True, collapse_completed_thoughts: bool = True, thought_labeler: Optional[LLMThoughtLabeler] = None, ): """创建一个StreamlitCallbackHandler实例。 参数 ---------- parent_container 包含Handler创建的所有Streamlit元素的`st.container`。 max_thought_containers 一次显示的已完成的LLM思考容器的最大数量。当达到此阈值时,新的思考将导致最旧的思考被折叠到一个“历史”展开器中。默认为4。 expand_new_thoughts 每个LLM“思考”都有自己的`st.expander`。此参数控制该展开器是否默认展开。默认为True。 collapse_completed_thoughts 如果为True,则完成时LLM思考展开器将被折叠。默认为True。 thought_labeler 可选的自定义LLMThoughtLabeler实例。如果未指定,处理程序将使用默认的思考标记逻辑。默认为None。 """ self._parent_container = parent_container self._history_parent = parent_container.container() self._history_container: Optional[MutableExpander] = None self._current_thought: Optional[LLMThought] = None self._completed_thoughts: List[LLMThought] = [] self._max_thought_containers = max(max_thought_containers, 1) self._expand_new_thoughts = expand_new_thoughts self._collapse_completed_thoughts = collapse_completed_thoughts self._thought_labeler = thought_labeler or LLMThoughtLabeler()
def _require_current_thought(self) -> LLMThought: """返回我们当前的LLMThought。如果我们没有当前的想法,则引发错误。 """ if self._current_thought is None: raise RuntimeError("Current LLMThought is unexpectedly None!") return self._current_thought def _get_last_completed_thought(self) -> Optional[LLMThought]: """返回我们最近完成的LLMThought,如果没有则返回None。""" if len(self._completed_thoughts) > 0: return self._completed_thoughts[len(self._completed_thoughts) - 1] return None @property def _num_thought_containers(self) -> int: """当前显示的“思想容器”数量:已完成的思想容器数量,历史容器(如果存在),以及当前的思想容器(如果存在)。 """ count = len(self._completed_thoughts) if self._history_container is not None: count += 1 if self._current_thought is not None: count += 1 return count def _complete_current_thought(self, final_label: Optional[str] = None) -> None: """完成当前的想法,可选择为其分配一个新标签。 将其添加到我们的_completed_thoughts列表中。 """ thought = self._require_current_thought() thought.complete(final_label) self._completed_thoughts.append(thought) self._current_thought = None def _prune_old_thought_containers(self) -> None: """如果屏幕上有太多的想法,将较旧的想法移动到“历史容器”中。 """ while ( self._num_thought_containers > self._max_thought_containers and len(self._completed_thoughts) > 0 ): # Create our history container if it doesn't exist, and if # max_thought_containers is > 1. (if max_thought_containers is 1, we don't # have room to show history.) if self._history_container is None and self._max_thought_containers > 1: self._history_container = MutableExpander( self._history_parent, label=self._thought_labeler.get_history_label(), expanded=False, ) oldest_thought = self._completed_thoughts.pop(0) if self._history_container is not None: self._history_container.markdown(oldest_thought.container.label) self._history_container.append_copy(oldest_thought.container) oldest_thought.clear()
[docs] def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any ) -> None: if self._current_thought is None: self._current_thought = LLMThought( parent_container=self._parent_container, expanded=self._expand_new_thoughts, collapse_on_complete=self._collapse_completed_thoughts, labeler=self._thought_labeler, ) self._current_thought.on_llm_start(serialized, prompts)
# We don't prune_old_thought_containers here, because our container won't # be visible until it has a child.
[docs] def on_llm_new_token(self, token: str, **kwargs: Any) -> None: self._require_current_thought().on_llm_new_token(token, **kwargs) self._prune_old_thought_containers()
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: self._require_current_thought().on_llm_end(response, **kwargs) self._prune_old_thought_containers()
[docs] def on_llm_error(self, error: BaseException, **kwargs: Any) -> None: self._require_current_thought().on_llm_error(error, **kwargs) self._prune_old_thought_containers()
[docs] def on_tool_start( self, serialized: Dict[str, Any], input_str: str, **kwargs: Any ) -> None: self._require_current_thought().on_tool_start(serialized, input_str, **kwargs) self._prune_old_thought_containers()
[docs] def on_tool_end( self, output: Any, color: Optional[str] = None, observation_prefix: Optional[str] = None, llm_prefix: Optional[str] = None, **kwargs: Any, ) -> None: output = str(output) self._require_current_thought().on_tool_end( output, color, observation_prefix, llm_prefix, **kwargs ) self._complete_current_thought()
[docs] def on_tool_error(self, error: BaseException, **kwargs: Any) -> None: self._require_current_thought().on_tool_error(error, **kwargs) self._prune_old_thought_containers()
[docs] def on_text( self, text: str, color: Optional[str] = None, end: str = "", **kwargs: Any, ) -> None: pass
[docs] def on_chain_start( self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any ) -> None: pass
[docs] def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None: pass
[docs] def on_chain_error(self, error: BaseException, **kwargs: Any) -> None: pass
[docs] def on_agent_action( self, action: AgentAction, color: Optional[str] = None, **kwargs: Any ) -> Any: self._require_current_thought().on_agent_action(action, color, **kwargs) self._prune_old_thought_containers()
[docs] def on_agent_finish( self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any ) -> None: if self._current_thought is not None: self._current_thought.complete( self._thought_labeler.get_final_agent_thought_label() ) self._current_thought = None