Source code for langchain_community.callbacks.arthur_callback

"""ArthurAI的回调处理程序。"""
from __future__ import annotations

import os
import uuid
from collections import defaultdict
from datetime import datetime
from time import time
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, List, Optional

import numpy as np
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult

if TYPE_CHECKING:
    import arthurai
    from arthurai.core.models import ArthurModel

PROMPT_TOKENS = "prompt_tokens"
COMPLETION_TOKENS = "completion_tokens"
TOKEN_USAGE = "token_usage"
FINISH_REASON = "finish_reason"
DURATION = "duration"


def _lazy_load_arthur() -> arthurai:
    """延迟加载Arthur。"""
    try:
        import arthurai
    except ImportError as e:
        raise ImportError(
            "To use the ArthurCallbackHandler you need the"
            " `arthurai` package. Please install it with"
            " `pip install arthurai`.",
            e,
        )

    return arthurai


[docs]class ArthurCallbackHandler(BaseCallbackHandler): """回调处理程序,用于记录到Arthur平台。 Arthur帮助企业团队在规模上优化模型操作和性能。Arthur API跟踪表格、NLP和CV模型的性能、可解释性和公平性。我们的API与模型和平台无关,并且持续扩展以满足复杂和动态的企业需求。要了解更多关于Arthur的信息,请访问我们的网站https://www.arthur.ai/或阅读Arthur文档https://docs.arthur.ai/"""
[docs] def __init__( self, arthur_model: ArthurModel, ) -> None: """初始化回调处理程序。""" super().__init__() arthurai = _lazy_load_arthur() Stage = arthurai.common.constants.Stage ValueType = arthurai.common.constants.ValueType self.arthur_model = arthur_model # save the attributes of this model to be used when preparing # inferences to log to Arthur in on_llm_end() self.attr_names = set([a.name for a in self.arthur_model.get_attributes()]) self.input_attr = [ x for x in self.arthur_model.get_attributes() if x.stage == Stage.ModelPipelineInput and x.value_type == ValueType.Unstructured_Text ][0].name self.output_attr = [ x for x in self.arthur_model.get_attributes() if x.stage == Stage.PredictedValue and x.value_type == ValueType.Unstructured_Text ][0].name self.token_likelihood_attr = None if ( len( [ x for x in self.arthur_model.get_attributes() if x.value_type == ValueType.TokenLikelihoods ] ) > 0 ): self.token_likelihood_attr = [ x for x in self.arthur_model.get_attributes() if x.value_type == ValueType.TokenLikelihoods ][0].name self.run_map: DefaultDict[str, Any] = defaultdict(dict)
[docs] @classmethod def from_credentials( cls, model_id: str, arthur_url: Optional[str] = "https://app.arthur.ai", arthur_login: Optional[str] = None, arthur_password: Optional[str] = None, ) -> ArthurCallbackHandler: """从Arthur凭据初始化回调处理程序。 参数: model_id(str):要记录日志的Arthur模型的ID。 arthur_url(str,可选):要记录日志的Arthur实例的URL。 默认为"https://app.arthur.ai"。 arthur_login(str,可选):用于连接到Arthur的登录名。 默认为None。 arthur_password(str,可选):用于连接到Arthur的密码。 默认为None。 返回: ArthurCallbackHandler:初始化的回调处理程序。 """ arthurai = _lazy_load_arthur() ArthurAI = arthurai.ArthurAI ResponseClientError = arthurai.common.exceptions.ResponseClientError # connect to Arthur if arthur_login is None: try: arthur_api_key = os.environ["ARTHUR_API_KEY"] except KeyError: raise ValueError( "No Arthur authentication provided. Either give" " a login to the ArthurCallbackHandler" " or set an ARTHUR_API_KEY as an environment variable." ) arthur = ArthurAI(url=arthur_url, access_key=arthur_api_key) else: if arthur_password is None: arthur = ArthurAI(url=arthur_url, login=arthur_login) else: arthur = ArthurAI( url=arthur_url, login=arthur_login, password=arthur_password ) # get model from Arthur by the provided model ID try: arthur_model = arthur.get_model(model_id) except ResponseClientError: raise ValueError( f"Was unable to retrieve model with id {model_id} from Arthur." " Make sure the ID corresponds to a model that is currently" " registered with your Arthur account." ) return cls(arthur_model)
[docs] def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any ) -> None: """在LLM启动时,保存输入提示信息""" run_id = kwargs["run_id"] self.run_map[run_id]["input_texts"] = prompts self.run_map[run_id]["start_time"] = time()
[docs] def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None: """在LLM端,将数据发送给Arthur。""" try: import pytz except ImportError as e: raise ImportError( "Could not import pytz. Please install it with 'pip install pytz'." ) from e run_id = kwargs["run_id"] # get the run params from this run ID, # or raise an error if this run ID has no corresponding metadata in self.run_map try: run_map_data = self.run_map[run_id] except KeyError as e: raise KeyError( "This function has been called with a run_id" " that was never registered in on_llm_start()." " Restart and try running the LLM again" ) from e # mark the duration time between on_llm_start() and on_llm_end() time_from_start_to_end = time() - run_map_data["start_time"] # create inferences to log to Arthur inferences = [] for i, generations in enumerate(response.generations): for generation in generations: inference = { "partner_inference_id": str(uuid.uuid4()), "inference_timestamp": datetime.now(tz=pytz.UTC), self.input_attr: run_map_data["input_texts"][i], self.output_attr: generation.text, } if generation.generation_info is not None: # add finish reason to the inference # if generation info contains a finish reason and # if the ArthurModel was registered to monitor finish_reason if ( FINISH_REASON in generation.generation_info and FINISH_REASON in self.attr_names ): inference[FINISH_REASON] = generation.generation_info[ FINISH_REASON ] # add token likelihoods data to the inference if the ArthurModel # was registered to monitor token likelihoods logprobs_data = generation.generation_info["logprobs"] if ( logprobs_data is not None and self.token_likelihood_attr is not None ): logprobs = logprobs_data["top_logprobs"] likelihoods = [ {k: np.exp(v) for k, v in logprobs[i].items()} for i in range(len(logprobs)) ] inference[self.token_likelihood_attr] = likelihoods # add token usage counts to the inference if the # ArthurModel was registered to monitor token usage if ( isinstance(response.llm_output, dict) and TOKEN_USAGE in response.llm_output ): token_usage = response.llm_output[TOKEN_USAGE] if ( PROMPT_TOKENS in token_usage and PROMPT_TOKENS in self.attr_names ): inference[PROMPT_TOKENS] = token_usage[PROMPT_TOKENS] if ( COMPLETION_TOKENS in token_usage and COMPLETION_TOKENS in self.attr_names ): inference[COMPLETION_TOKENS] = token_usage[COMPLETION_TOKENS] # add inference duration to the inference if the ArthurModel # was registered to monitor inference duration if DURATION in self.attr_names: inference[DURATION] = time_from_start_to_end inferences.append(inference) # send inferences to arthur self.arthur_model.send_inferences(inferences)
[docs] def on_chain_start( self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any ) -> None: """在链启动时,不执行任何操作。"""
[docs] def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None: """在链的末端,不执行任何操作。"""
[docs] def on_llm_error(self, error: BaseException, **kwargs: Any) -> None: """LLM 输出错误时不执行任何操作。"""
[docs] def on_llm_new_token(self, token: str, **kwargs: Any) -> None: """在新的令牌上,传递。"""
[docs] def on_chain_error(self, error: BaseException, **kwargs: Any) -> None: """当LLM链输出错误时不执行任何操作。"""
[docs] def on_tool_start( self, serialized: Dict[str, Any], input_str: str, **kwargs: Any, ) -> None: """工具启动时不执行任何操作。"""
[docs] def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any: """当代理执行特定动作时不执行任何操作。"""
[docs] def on_tool_end( self, output: Any, observation_prefix: Optional[str] = None, llm_prefix: Optional[str] = None, **kwargs: Any, ) -> None: """工具结束时不执行任何操作。"""
[docs] def on_tool_error(self, error: BaseException, **kwargs: Any) -> None: """当工具输出错误时不执行任何操作。"""
[docs] def on_text(self, text: str, **kwargs: Any) -> None: """什么都不做"""
[docs] def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None: """什么都不做"""