"""
更新训练回调处理程序
UpTrain是一个开源平台,用于评估和改进LLM应用程序。它提供了20多个预配置的检查等级(涵盖语言、代码、嵌入用例),对失败案例的根本原因进行分析,并提供解决方案的指导。
该模块包含一个回调处理程序,可将UpTrain无缝集成到您的流水线中,并促进各种评估。回调处理程序自动化各种评估,以评估流水线中组件的性能和有效性。
进行的评估包括:
1. RAG:
- 上下文相关性:确定从查询中提取的上下文与响应的相关性。
- 事实准确性:评估语言模型(LLM)是否提供准确信息或虚构信息。
- 响应完整性:检查响应是否包含查询请求的所有信息。
2. 多查询生成:
MultiQueryRetriever生成具有类似含义的问题的多个变体与原始问题。此评估包括先前的评估,并添加:
- 多查询准确性:确保生成的多个查询传达与原始查询相同的含义。
3. 上下文压缩和重新排序:
重新排序涉及根据与查询相关性重新排序节点并选择前n个节点。
由于重新排序后节点数量可能减少,因此除了RAG评估外,还执行以下评估:
- 上下文重新排序:确定重新排序节点的顺序是否与查询更相关。
- 上下文简洁性:检查减少的节点数量是否仍提供所有所需信息。
这些评估共同确保RAG查询引擎、MultiQueryRetriever和流水线内的重新排序过程的稳健性和有效性。
有用的链接:
Github: https://github.com/uptrain-ai/uptrain
网站: https://uptrain.ai/
文档: https://docs.uptrain.ai/getting-started/introduction
"""
import logging
import sys
from collections import defaultdict
from typing import (
Any,
DefaultDict,
Dict,
List,
Optional,
Sequence,
Set,
)
from uuid import UUID
from langchain_core.callbacks.base import BaseCallbackHandler
from langchain_core.documents import Document
from langchain_core.outputs import LLMResult
from langchain_core.utils import guard_import
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
[docs]def import_uptrain() -> Any:
"""导入`uptrain`包。"""
return guard_import("uptrain")
[docs]class UpTrainDataSchema:
"""跟踪评估结果的UpTrain数据架构。
参数:
project_name (str): 在UpTrain仪表板中显示的项目名称。
属性:
project_name (str): 在UpTrain仪表板中显示的项目名称。
uptrain_results (DefaultDict[str, Any]): 用于存储评估结果的字典。
eval_types (Set[str]): 用于存储评估类型的集合。
query (str): RAG评估的查询。
context (str): RAG评估的上下文。
response (str): RAG评估的响应。
old_context (List[str]): 上下文简洁性评估的旧上下文节点。
new_context (List[str]): 上下文简洁性评估的新上下文节点。
context_conciseness_run_id (str): 上下文简洁性评估的运行ID。
multi_queries (List[str]): 多查询评估的多个查询列表。
multi_query_run_id (str): 多查询评估的运行ID。
multi_query_daugher_run_id (str): 多查询子评估的运行ID。"""
[docs] def __init__(self, project_name: str) -> None:
"""初始化UpTrain数据模式。"""
# For tracking project name and results
self.project_name: str = project_name
self.uptrain_results: DefaultDict[str, Any] = defaultdict(list)
# For tracking event types
self.eval_types: Set[str] = set()
## RAG
self.query: str = ""
self.context: str = ""
self.response: str = ""
## CONTEXT CONCISENESS
self.old_context: List[str] = []
self.new_context: List[str] = []
self.context_conciseness_run_id: UUID = UUID(int=0)
# MULTI QUERY
self.multi_queries: List[str] = []
self.multi_query_run_id: UUID = UUID(int=0)
self.multi_query_daugher_run_id: UUID = UUID(int=0)
[docs]class UpTrainCallbackHandler(BaseCallbackHandler):
"""回调处理程序,将评估结果记录到UpTrain和控制台。
参数:
project_name (str): 在UpTrain仪表板中显示的项目名称。
key_type (str): 要使用的密钥类型。必须是'uptrain'或'openai'。
api_key (str): UpTrain或OpenAI API的API密钥。
(此密钥用于使用GPT进行评估。)
引发:
ValueError: 如果密钥类型无效。
ImportError: 如果未安装`uptrain`包。"""
[docs] def __init__(
self,
*,
project_name: str = "langchain",
key_type: str = "openai",
api_key: str = "sk-****************", # The API key to use for evaluation
model: str = "gpt-3.5-turbo", # The model to use for evaluation
log_results: bool = True,
) -> None:
"""初始化`UpTrainCallbackHandler`。"""
super().__init__()
uptrain = import_uptrain()
self.log_results = log_results
# Set uptrain variables
self.schema = UpTrainDataSchema(project_name=project_name)
self.first_score_printed_flag = False
if key_type == "uptrain":
settings = uptrain.Settings(uptrain_access_token=api_key, model=model)
self.uptrain_client = uptrain.APIClient(settings=settings)
elif key_type == "openai":
settings = uptrain.Settings(
openai_api_key=api_key, evaluate_locally=True, model=model
)
self.uptrain_client = uptrain.EvalLLM(settings=settings)
else:
raise ValueError("Invalid key type: Must be 'uptrain' or 'openai'")
[docs] def uptrain_evaluate(
self,
evaluation_name: str,
data: List[Dict[str, Any]],
checks: List[str],
) -> None:
"""在UpTrain客户端上对UpTrain服务器进行评估。"""
if self.uptrain_client.__class__.__name__ == "APIClient":
uptrain_result = self.uptrain_client.log_and_evaluate(
project_name=self.schema.project_name,
evaluation_name=evaluation_name,
data=data,
checks=checks,
)
else:
uptrain_result = self.uptrain_client.evaluate(
project_name=self.schema.project_name,
evaluation_name=evaluation_name,
data=data,
checks=checks,
)
self.schema.uptrain_results[self.schema.project_name].append(uptrain_result)
score_name_map = {
"score_context_relevance": "Context Relevance Score",
"score_factual_accuracy": "Factual Accuracy Score",
"score_response_completeness": "Response Completeness Score",
"score_sub_query_completeness": "Sub Query Completeness Score",
"score_context_reranking": "Context Reranking Score",
"score_context_conciseness": "Context Conciseness Score",
"score_multi_query_accuracy": "Multi Query Accuracy Score",
}
if self.log_results:
# Set logger level to INFO to print the evaluation results
logger.setLevel(logging.INFO)
for row in uptrain_result:
columns = list(row.keys())
for column in columns:
if column == "question":
logger.info(f"\nQuestion: {row[column]}")
self.first_score_printed_flag = False
elif column == "response":
logger.info(f"Response: {row[column]}")
self.first_score_printed_flag = False
elif column == "variants":
logger.info("Multi Queries:")
for variant in row[column]:
logger.info(f" - {variant}")
self.first_score_printed_flag = False
elif column.startswith("score"):
if not self.first_score_printed_flag:
logger.info("")
self.first_score_printed_flag = True
if column in score_name_map:
logger.info(f"{score_name_map[column]}: {row[column]}")
else:
logger.info(f"{column}: {row[column]}")
if self.log_results:
# Set logger level back to WARNING
# (We are doing this to avoid printing the logs from HTTP requests)
logger.setLevel(logging.WARNING)
[docs] def on_llm_end(
self,
response: LLMResult,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> None:
"""当LLM结束时,将日志记录到uptrain。"""
uptrain = import_uptrain()
self.schema.response = response.generations[0][0].text
if (
"qa_rag" in self.schema.eval_types
and parent_run_id != self.schema.multi_query_daugher_run_id
):
data = [
{
"question": self.schema.query,
"context": self.schema.context,
"response": self.schema.response,
}
]
self.uptrain_evaluate(
evaluation_name="rag",
data=data,
checks=[
uptrain.Evals.CONTEXT_RELEVANCE,
uptrain.Evals.FACTUAL_ACCURACY,
uptrain.Evals.RESPONSE_COMPLETENESS,
],
)
[docs] def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
*,
run_id: UUID,
tags: Optional[List[str]] = None,
parent_run_id: Optional[UUID] = None,
metadata: Optional[Dict[str, Any]] = None,
run_type: Optional[str] = None,
name: Optional[str] = None,
**kwargs: Any,
) -> None:
"""当链条开始时不执行任何操作"""
if parent_run_id == self.schema.multi_query_run_id:
self.schema.multi_query_daugher_run_id = run_id
if isinstance(inputs, dict) and set(inputs.keys()) == {"context", "question"}:
self.schema.eval_types.add("qa_rag")
context = ""
if isinstance(inputs["context"], Document):
context = inputs["context"].page_content
elif isinstance(inputs["context"], list):
for doc in inputs["context"]:
context += doc.page_content + "\n"
elif isinstance(inputs["context"], str):
context = inputs["context"]
self.schema.context = context
self.schema.query = inputs["question"]
pass
[docs] def on_retriever_start(
self,
serialized: Dict[str, Any],
query: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
if "contextual_compression" in serialized["id"]:
self.schema.eval_types.add("contextual_compression")
self.schema.query = query
self.schema.context_conciseness_run_id = run_id
if "multi_query" in serialized["id"]:
self.schema.eval_types.add("multi_query")
self.schema.multi_query_run_id = run_id
self.schema.query = query
elif "multi_query" in self.schema.eval_types:
self.schema.multi_queries.append(query)
[docs] def on_retriever_end(
self,
documents: Sequence[Document],
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
"""当Retriever运行结束时运行。"""
uptrain = import_uptrain()
if run_id == self.schema.multi_query_run_id:
data = [
{
"question": self.schema.query,
"variants": self.schema.multi_queries,
}
]
self.uptrain_evaluate(
evaluation_name="multi_query",
data=data,
checks=[uptrain.Evals.MULTI_QUERY_ACCURACY],
)
if "contextual_compression" in self.schema.eval_types:
if parent_run_id == self.schema.context_conciseness_run_id:
for doc in documents:
self.schema.old_context.append(doc.page_content)
elif run_id == self.schema.context_conciseness_run_id:
for doc in documents:
self.schema.new_context.append(doc.page_content)
context = "\n".join(
[
f"{index}. {string}"
for index, string in enumerate(self.schema.old_context, start=1)
]
)
reranked_context = "\n".join(
[
f"{index}. {string}"
for index, string in enumerate(self.schema.new_context, start=1)
]
)
data = [
{
"question": self.schema.query,
"context": context,
"concise_context": reranked_context,
"reranked_context": reranked_context,
}
]
self.uptrain_evaluate(
evaluation_name="context_reranking",
data=data,
checks=[
uptrain.Evals.CONTEXT_CONCISENESS,
uptrain.Evals.CONTEXT_RERANKING,
],
)