"""回调处理程序,用于打印到streamlit。"""
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from langchain_community.callbacks.streamlit.mutable_expander import MutableExpander
if TYPE_CHECKING:
from streamlit.delta_generator import DeltaGenerator
def _convert_newlines(text: str) -> str:
"""将换行符转换为markdown换行序列(空格、空格、换行符)。
"""
return text.replace("\n", " \n")
CHECKMARK_EMOJI = "✅"
THINKING_EMOJI = ":thinking_face:"
HISTORY_EMOJI = ":books:"
EXCEPTION_EMOJI = "⚠️"
[docs]class LLMThoughtState(Enum):
"""LLMThought状态的枚举器。"""
# The LLM is thinking about what to do next. We don't know which tool we'll run.
THINKING = "THINKING"
# The LLM has decided to run a tool. We don't have results from the tool yet.
RUNNING_TOOL = "RUNNING_TOOL"
# We have results from the tool.
COMPLETE = "COMPLETE"
[docs]class LLMThoughtLabeler:
"""生成LLMThought容器的Markdown标签。将自定义的子类传递给StreamlitCallbackHandler,以覆盖其默认的标记逻辑。"""
[docs] def get_initial_label(self) -> str:
"""返回一个新的LLMThought的markdown标签,该LLMThought尚未关联任何工具。
"""
return f"{THINKING_EMOJI} **Thinking...**"
[docs] def get_history_label(self) -> str:
"""返回一个markdown标签,用于表示包含溢出思维的特殊“history”容器。
"""
return f"{HISTORY_EMOJI} **History**"
[docs] def get_final_agent_thought_label(self) -> str:
"""返回代理的最终思考的标签 - “现在我有答案”思考,不涉及工具。
"""
return f"{CHECKMARK_EMOJI} **Complete!**"
[docs]class LLMThought:
"""LLM思维流中的一个思想。"""
[docs] def __init__(
self,
parent_container: DeltaGenerator,
labeler: LLMThoughtLabeler,
expanded: bool,
collapse_on_complete: bool,
):
"""初始化LLMThought。
参数:
parent_container:我们要写入的容器。
labeler:用于此思想的标签器。
expanded:思想是否应默认展开。
collapse_on_complete:思想是否应在完成时折叠。
"""
self._container = MutableExpander(
parent_container=parent_container,
label=labeler.get_initial_label(),
expanded=expanded,
)
self._state = LLMThoughtState.THINKING
self._llm_token_stream = ""
self._llm_token_writer_idx: Optional[int] = None
self._last_tool: Optional[ToolRecord] = None
self._collapse_on_complete = collapse_on_complete
self._labeler = labeler
@property
def container(self) -> MutableExpander:
"""我们正在写入的容器。"""
return self._container
@property
def last_tool(self) -> Optional[ToolRecord]:
"""这个想法执行的最后一个工具"""
return self._last_tool
def _reset_llm_token_stream(self) -> None:
self._llm_token_stream = ""
self._llm_token_writer_idx = None
[docs] def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str]) -> None:
self._reset_llm_token_stream()
[docs] def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
# This is only called when the LLM is initialized with `streaming=True`
self._llm_token_stream += _convert_newlines(token)
self._llm_token_writer_idx = self._container.markdown(
self._llm_token_stream, index=self._llm_token_writer_idx
)
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
# `response` is the concatenation of all the tokens received by the LLM.
# If we're receiving streaming tokens from `on_llm_new_token`, this response
# data is redundant
self._reset_llm_token_stream()
[docs] def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
self._container.markdown("**LLM encountered an error...**")
self._container.exception(error)
[docs] def on_agent_action(
self, action: AgentAction, color: Optional[str] = None, **kwargs: Any
) -> Any:
# Called when we're about to kick off a new tool. The `action` data
# tells us the tool we're about to use, and the input we'll give it.
# We don't output anything here, because we'll receive this same data
# when `on_tool_start` is called immediately after.
pass
[docs] def complete(self, final_label: Optional[str] = None) -> None:
"""完成这个想法。"""
if final_label is None and self._state == LLMThoughtState.RUNNING_TOOL:
assert (
self._last_tool is not None
), "_last_tool should never be null when _state == RUNNING_TOOL"
final_label = self._labeler.get_tool_label(
self._last_tool, is_complete=True
)
self._state = LLMThoughtState.COMPLETE
if self._collapse_on_complete:
self._container.update(new_label=final_label, new_expanded=False)
else:
self._container.update(new_label=final_label)
[docs] def clear(self) -> None:
"""从屏幕上移除这个想法。清除的想法无法被重复使用。"""
self._container.clear()
[docs]class StreamlitCallbackHandler(BaseCallbackHandler):
"""回调处理程序,用于向Streamlit应用程序写入。"""
[docs] def __init__(
self,
parent_container: DeltaGenerator,
*,
max_thought_containers: int = 4,
expand_new_thoughts: bool = True,
collapse_completed_thoughts: bool = True,
thought_labeler: Optional[LLMThoughtLabeler] = None,
):
"""创建一个StreamlitCallbackHandler实例。
参数
----------
parent_container
包含Handler创建的所有Streamlit元素的`st.container`。
max_thought_containers
一次显示的已完成的LLM思考容器的最大数量。当达到此阈值时,新的思考将导致最旧的思考被折叠到一个“历史”展开器中。默认为4。
expand_new_thoughts
每个LLM“思考”都有自己的`st.expander`。此参数控制该展开器是否默认展开。默认为True。
collapse_completed_thoughts
如果为True,则完成时LLM思考展开器将被折叠。默认为True。
thought_labeler
可选的自定义LLMThoughtLabeler实例。如果未指定,处理程序将使用默认的思考标记逻辑。默认为None。
"""
self._parent_container = parent_container
self._history_parent = parent_container.container()
self._history_container: Optional[MutableExpander] = None
self._current_thought: Optional[LLMThought] = None
self._completed_thoughts: List[LLMThought] = []
self._max_thought_containers = max(max_thought_containers, 1)
self._expand_new_thoughts = expand_new_thoughts
self._collapse_completed_thoughts = collapse_completed_thoughts
self._thought_labeler = thought_labeler or LLMThoughtLabeler()
def _require_current_thought(self) -> LLMThought:
"""返回我们当前的LLMThought。如果我们没有当前的想法,则引发错误。
"""
if self._current_thought is None:
raise RuntimeError("Current LLMThought is unexpectedly None!")
return self._current_thought
def _get_last_completed_thought(self) -> Optional[LLMThought]:
"""返回我们最近完成的LLMThought,如果没有则返回None。"""
if len(self._completed_thoughts) > 0:
return self._completed_thoughts[len(self._completed_thoughts) - 1]
return None
@property
def _num_thought_containers(self) -> int:
"""当前显示的“思想容器”数量:已完成的思想容器数量,历史容器(如果存在),以及当前的思想容器(如果存在)。
"""
count = len(self._completed_thoughts)
if self._history_container is not None:
count += 1
if self._current_thought is not None:
count += 1
return count
def _complete_current_thought(self, final_label: Optional[str] = None) -> None:
"""完成当前的想法,可选择为其分配一个新标签。
将其添加到我们的_completed_thoughts列表中。
"""
thought = self._require_current_thought()
thought.complete(final_label)
self._completed_thoughts.append(thought)
self._current_thought = None
def _prune_old_thought_containers(self) -> None:
"""如果屏幕上有太多的想法,将较旧的想法移动到“历史容器”中。
"""
while (
self._num_thought_containers > self._max_thought_containers
and len(self._completed_thoughts) > 0
):
# Create our history container if it doesn't exist, and if
# max_thought_containers is > 1. (if max_thought_containers is 1, we don't
# have room to show history.)
if self._history_container is None and self._max_thought_containers > 1:
self._history_container = MutableExpander(
self._history_parent,
label=self._thought_labeler.get_history_label(),
expanded=False,
)
oldest_thought = self._completed_thoughts.pop(0)
if self._history_container is not None:
self._history_container.markdown(oldest_thought.container.label)
self._history_container.append_copy(oldest_thought.container)
oldest_thought.clear()
[docs] def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
if self._current_thought is None:
self._current_thought = LLMThought(
parent_container=self._parent_container,
expanded=self._expand_new_thoughts,
collapse_on_complete=self._collapse_completed_thoughts,
labeler=self._thought_labeler,
)
self._current_thought.on_llm_start(serialized, prompts)
# We don't prune_old_thought_containers here, because our container won't
# be visible until it has a child.
[docs] def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
self._require_current_thought().on_llm_new_token(token, **kwargs)
self._prune_old_thought_containers()
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
self._require_current_thought().on_llm_end(response, **kwargs)
self._prune_old_thought_containers()
[docs] def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
self._require_current_thought().on_llm_error(error, **kwargs)
self._prune_old_thought_containers()
[docs] def on_text(
self,
text: str,
color: Optional[str] = None,
end: str = "",
**kwargs: Any,
) -> None:
pass
[docs] def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> None:
pass
[docs] def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
pass
[docs] def on_chain_error(self, error: BaseException, **kwargs: Any) -> None:
pass
[docs] def on_agent_action(
self, action: AgentAction, color: Optional[str] = None, **kwargs: Any
) -> Any:
self._require_current_thought().on_agent_action(action, color, **kwargs)
self._prune_old_thought_containers()
[docs] def on_agent_finish(
self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any
) -> None:
if self._current_thought is not None:
self._current_thought.complete(
self._thought_labeler.get_final_agent_thought_label()
)
self._current_thought = None