Source code for langchain_community.callbacks.uptrain_callback

"""
更新训练回调处理程序

UpTrain是一个开源平台,用于评估和改进LLM应用程序。它提供了20多个预配置的检查等级(涵盖语言、代码、嵌入用例),对失败案例的根本原因进行分析,并提供解决方案的指导。

该模块包含一个回调处理程序,可将UpTrain无缝集成到您的流水线中,并促进各种评估。回调处理程序自动化各种评估,以评估流水线中组件的性能和有效性。

进行的评估包括:

1. RAG:
   - 上下文相关性:确定从查询中提取的上下文与响应的相关性。
   - 事实准确性:评估语言模型(LLM)是否提供准确信息或虚构信息。
   - 响应完整性:检查响应是否包含查询请求的所有信息。

2. 多查询生成:
   MultiQueryRetriever生成具有类似含义的问题的多个变体与原始问题。此评估包括先前的评估,并添加:
   - 多查询准确性:确保生成的多个查询传达与原始查询相同的含义。

3. 上下文压缩和重新排序:
   重新排序涉及根据与查询相关性重新排序节点并选择前n个节点。
   由于重新排序后节点数量可能减少,因此除了RAG评估外,还执行以下评估:
   - 上下文重新排序:确定重新排序节点的顺序是否与查询更相关。
   - 上下文简洁性:检查减少的节点数量是否仍提供所有所需信息。

这些评估共同确保RAG查询引擎、MultiQueryRetriever和流水线内的重新排序过程的稳健性和有效性。

有用的链接:
Github: https://github.com/uptrain-ai/uptrain
网站: https://uptrain.ai/
文档: https://docs.uptrain.ai/getting-started/introduction

"""

import logging
import sys
from collections import defaultdict
from typing import (
    Any,
    DefaultDict,
    Dict,
    List,
    Optional,
    Sequence,
    Set,
)
from uuid import UUID

from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_core.documents import Document
from langchain_core.outputs import LLMResult
from langchain_core.utils import guard_import

logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)


[docs]def import_uptrain() -> Any: """导入`uptrain`包。""" return guard_import("uptrain")
[docs]class UpTrainDataSchema: """跟踪评估结果的UpTrain数据架构。 参数: project_name (str): 在UpTrain仪表板中显示的项目名称。 属性: project_name (str): 在UpTrain仪表板中显示的项目名称。 uptrain_results (DefaultDict[str, Any]): 用于存储评估结果的字典。 eval_types (Set[str]): 用于存储评估类型的集合。 query (str): RAG评估的查询。 context (str): RAG评估的上下文。 response (str): RAG评估的响应。 old_context (List[str]): 上下文简洁性评估的旧上下文节点。 new_context (List[str]): 上下文简洁性评估的新上下文节点。 context_conciseness_run_id (str): 上下文简洁性评估的运行ID。 multi_queries (List[str]): 多查询评估的多个查询列表。 multi_query_run_id (str): 多查询评估的运行ID。 multi_query_daugher_run_id (str): 多查询子评估的运行ID。"""
[docs] def __init__(self, project_name: str) -> None: """初始化UpTrain数据模式。""" # For tracking project name and results self.project_name: str = project_name self.uptrain_results: DefaultDict[str, Any] = defaultdict(list) # For tracking event types self.eval_types: Set[str] = set() ## RAG self.query: str = "" self.context: str = "" self.response: str = "" ## CONTEXT CONCISENESS self.old_context: List[str] = [] self.new_context: List[str] = [] self.context_conciseness_run_id: UUID = UUID(int=0) # MULTI QUERY self.multi_queries: List[str] = [] self.multi_query_run_id: UUID = UUID(int=0) self.multi_query_daugher_run_id: UUID = UUID(int=0)
[docs]class UpTrainCallbackHandler(BaseCallbackHandler): """回调处理程序,将评估结果记录到UpTrain和控制台。 参数: project_name (str): 在UpTrain仪表板中显示的项目名称。 key_type (str): 要使用的密钥类型。必须是'uptrain'或'openai'。 api_key (str): UpTrain或OpenAI API的API密钥。 (此密钥用于使用GPT进行评估。) 引发: ValueError: 如果密钥类型无效。 ImportError: 如果未安装`uptrain`包。"""
[docs] def __init__( self, *, project_name: str = "langchain", key_type: str = "openai", api_key: str = "sk-****************", # The API key to use for evaluation model: str = "gpt-3.5-turbo", # The model to use for evaluation log_results: bool = True, ) -> None: """初始化`UpTrainCallbackHandler`。""" super().__init__() uptrain = import_uptrain() self.log_results = log_results # Set uptrain variables self.schema = UpTrainDataSchema(project_name=project_name) self.first_score_printed_flag = False if key_type == "uptrain": settings = uptrain.Settings(uptrain_access_token=api_key, model=model) self.uptrain_client = uptrain.APIClient(settings=settings) elif key_type == "openai": settings = uptrain.Settings( openai_api_key=api_key, evaluate_locally=True, model=model ) self.uptrain_client = uptrain.EvalLLM(settings=settings) else: raise ValueError("Invalid key type: Must be 'uptrain' or 'openai'")
[docs] def uptrain_evaluate( self, evaluation_name: str, data: List[Dict[str, Any]], checks: List[str], ) -> None: """在UpTrain客户端上对UpTrain服务器进行评估。""" if self.uptrain_client.__class__.__name__ == "APIClient": uptrain_result = self.uptrain_client.log_and_evaluate( project_name=self.schema.project_name, evaluation_name=evaluation_name, data=data, checks=checks, ) else: uptrain_result = self.uptrain_client.evaluate( project_name=self.schema.project_name, evaluation_name=evaluation_name, data=data, checks=checks, ) self.schema.uptrain_results[self.schema.project_name].append(uptrain_result) score_name_map = { "score_context_relevance": "Context Relevance Score", "score_factual_accuracy": "Factual Accuracy Score", "score_response_completeness": "Response Completeness Score", "score_sub_query_completeness": "Sub Query Completeness Score", "score_context_reranking": "Context Reranking Score", "score_context_conciseness": "Context Conciseness Score", "score_multi_query_accuracy": "Multi Query Accuracy Score", } if self.log_results: # Set logger level to INFO to print the evaluation results logger.setLevel(logging.INFO) for row in uptrain_result: columns = list(row.keys()) for column in columns: if column == "question": logger.info(f"\nQuestion: {row[column]}") self.first_score_printed_flag = False elif column == "response": logger.info(f"Response: {row[column]}") self.first_score_printed_flag = False elif column == "variants": logger.info("Multi Queries:") for variant in row[column]: logger.info(f" - {variant}") self.first_score_printed_flag = False elif column.startswith("score"): if not self.first_score_printed_flag: logger.info("") self.first_score_printed_flag = True if column in score_name_map: logger.info(f"{score_name_map[column]}: {row[column]}") else: logger.info(f"{column}: {row[column]}") if self.log_results: # Set logger level back to WARNING # (We are doing this to avoid printing the logs from HTTP requests) logger.setLevel(logging.WARNING)
[docs] def on_llm_end( self, response: LLMResult, *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> None: """当LLM结束时,将日志记录到uptrain。""" uptrain = import_uptrain() self.schema.response = response.generations[0][0].text if ( "qa_rag" in self.schema.eval_types and parent_run_id != self.schema.multi_query_daugher_run_id ): data = [ { "question": self.schema.query, "context": self.schema.context, "response": self.schema.response, } ] self.uptrain_evaluate( evaluation_name="rag", data=data, checks=[ uptrain.Evals.CONTEXT_RELEVANCE, uptrain.Evals.FACTUAL_ACCURACY, uptrain.Evals.RESPONSE_COMPLETENESS, ], )
[docs] def on_chain_start( self, serialized: Dict[str, Any], inputs: Dict[str, Any], *, run_id: UUID, tags: Optional[List[str]] = None, parent_run_id: Optional[UUID] = None, metadata: Optional[Dict[str, Any]] = None, run_type: Optional[str] = None, name: Optional[str] = None, **kwargs: Any, ) -> None: """当链条开始时不执行任何操作""" if parent_run_id == self.schema.multi_query_run_id: self.schema.multi_query_daugher_run_id = run_id if isinstance(inputs, dict) and set(inputs.keys()) == {"context", "question"}: self.schema.eval_types.add("qa_rag") context = "" if isinstance(inputs["context"], Document): context = inputs["context"].page_content elif isinstance(inputs["context"], list): for doc in inputs["context"]: context += doc.page_content + "\n" elif isinstance(inputs["context"], str): context = inputs["context"] self.schema.context = context self.schema.query = inputs["question"] pass
[docs] def on_retriever_start( self, serialized: Dict[str, Any], query: str, *, run_id: UUID, parent_run_id: Optional[UUID] = None, tags: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> None: if "contextual_compression" in serialized["id"]: self.schema.eval_types.add("contextual_compression") self.schema.query = query self.schema.context_conciseness_run_id = run_id if "multi_query" in serialized["id"]: self.schema.eval_types.add("multi_query") self.schema.multi_query_run_id = run_id self.schema.query = query elif "multi_query" in self.schema.eval_types: self.schema.multi_queries.append(query)
[docs] def on_retriever_end( self, documents: Sequence[Document], *, run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any, ) -> Any: """当Retriever运行结束时运行。""" uptrain = import_uptrain() if run_id == self.schema.multi_query_run_id: data = [ { "question": self.schema.query, "variants": self.schema.multi_queries, } ] self.uptrain_evaluate( evaluation_name="multi_query", data=data, checks=[uptrain.Evals.MULTI_QUERY_ACCURACY], ) if "contextual_compression" in self.schema.eval_types: if parent_run_id == self.schema.context_conciseness_run_id: for doc in documents: self.schema.old_context.append(doc.page_content) elif run_id == self.schema.context_conciseness_run_id: for doc in documents: self.schema.new_context.append(doc.page_content) context = "\n".join( [ f"{index}. {string}" for index, string in enumerate(self.schema.old_context, start=1) ] ) reranked_context = "\n".join( [ f"{index}. {string}" for index, string in enumerate(self.schema.new_context, start=1) ] ) data = [ { "question": self.schema.query, "context": context, "concise_context": reranked_context, "reranked_context": reranked_context, } ] self.uptrain_evaluate( evaluation_name="context_reranking", data=data, checks=[ uptrain.Evals.CONTEXT_CONCISENESS, uptrain.Evals.CONTEXT_RERANKING, ], )