sentence_transformers.evaluation.InformationRetrievalEvaluator 源代码
from __future__ import annotations
import heapq
import logging
import os
from contextlib import nullcontext
from typing import TYPE_CHECKING, Callable
import numpy as np
import torch
from torch import Tensor
from tqdm import trange
from sentence_transformers.evaluation.SentenceEvaluator import SentenceEvaluator
from sentence_transformers.similarity_functions import SimilarityFunction
from sentence_transformers.util import cos_sim, dot_score
if TYPE_CHECKING:
from sentence_transformers.SentenceTransformer import SentenceTransformer
logger = logging.getLogger(__name__)
[文档]
class InformationRetrievalEvaluator(SentenceEvaluator):
"""
This class evaluates an Information Retrieval (IR) setting.
Given a set of queries and a large corpus set. It will retrieve for each query the top-k most similar document. It measures
Mean Reciprocal Rank (MRR), Recall@k, and Normalized Discounted Cumulative Gain (NDCG)
Example:
::
import random
from sentence_transformers import SentenceTransformer
from sentence_transformers.evaluation import InformationRetrievalEvaluator
from datasets import load_dataset
# Load a model
model = SentenceTransformer('all-mpnet-base-v2')
# Load the Quora IR dataset (https://huggingface.co/datasets/BeIR/quora, https://huggingface.co/datasets/BeIR/quora-qrels)
corpus = load_dataset("BeIR/quora", "corpus", split="corpus")
queries = load_dataset("BeIR/quora", "queries", split="queries")
relevant_docs_data = load_dataset("BeIR/quora-qrels", split="validation")
# Shrink the corpus size heavily to only the relevant documents + 10,000 random documents
required_corpus_ids = list(map(str, relevant_docs_data["corpus-id"]))
required_corpus_ids += random.sample(corpus["_id"], k=10_000)
corpus = corpus.filter(lambda x: x["_id"] in required_corpus_ids)
# Convert the datasets to dictionaries
corpus = dict(zip(corpus["_id"], corpus["text"])) # Our corpus (cid => document)
queries = dict(zip(queries["_id"], queries["text"])) # Our queries (qid => question)
relevant_docs = {} # Query ID to relevant documents (qid => set([relevant_cids])
for qid, corpus_ids in zip(relevant_docs_data["query-id"], relevant_docs_data["corpus-id"]):
qid = str(qid)
corpus_ids = str(corpus_ids)
if qid not in relevant_docs:
relevant_docs[qid] = set()
relevant_docs[qid].add(corpus_ids)
# Given queries, a corpus and a mapping with relevant documents, the InformationRetrievalEvaluator computes different IR metrics.
ir_evaluator = InformationRetrievalEvaluator(
queries=queries,
corpus=corpus,
relevant_docs=relevant_docs,
name="BeIR-quora-dev",
)
results = ir_evaluator(model)
'''
Information Retrieval Evaluation of the model on the BeIR-quora-dev dataset:
Queries: 5000
Corpus: 17476
Score-Function: cosine
Accuracy@1: 96.26%
Accuracy@3: 99.38%
Accuracy@5: 99.74%
Accuracy@10: 99.94%
Precision@1: 96.26%
Precision@3: 43.01%
Precision@5: 27.66%
Precision@10: 14.58%
Recall@1: 82.93%
Recall@3: 96.28%
Recall@5: 98.38%
Recall@10: 99.55%
MRR@10: 0.9782
NDCG@10: 0.9807
MAP@100: 0.9732
Score-Function: dot
Accuracy@1: 96.26%
Accuracy@3: 99.38%
Accuracy@5: 99.74%
Accuracy@10: 99.94%
Precision@1: 96.26%
Precision@3: 43.01%
Precision@5: 27.66%
Precision@10: 14.58%
Recall@1: 82.93%
Recall@3: 96.28%
Recall@5: 98.38%
Recall@10: 99.55%
MRR@10: 0.9782
NDCG@10: 0.9807
MAP@100: 0.9732
'''
print(ir_evaluator.primary_metric)
# => "BeIR-quora-dev_cosine_map@100"
print(results[ir_evaluator.primary_metric])
# => 0.9732046108457585
"""
def __init__(
self,
queries: dict[str, str], # qid => query
corpus: dict[str, str], # cid => doc
relevant_docs: dict[str, set[str]], # qid => Set[cid]
corpus_chunk_size: int = 50000,
mrr_at_k: list[int] = [10],
ndcg_at_k: list[int] = [10],
accuracy_at_k: list[int] = [1, 3, 5, 10],
precision_recall_at_k: list[int] = [1, 3, 5, 10],
map_at_k: list[int] = [100],
show_progress_bar: bool = False,
batch_size: int = 32,
name: str = "",
write_csv: bool = True,
truncate_dim: int | None = None,
score_functions: dict[str, Callable[[Tensor, Tensor], Tensor]] = {
SimilarityFunction.COSINE.value: cos_sim,
SimilarityFunction.DOT_PRODUCT.value: dot_score,
}, # Score function, higher=more similar
main_score_function: str | SimilarityFunction | None = None,
) -> None:
"""
Initializes the InformationRetrievalEvaluator.
Args:
queries (Dict[str, str]): A dictionary mapping query IDs to queries.
corpus (Dict[str, str]): A dictionary mapping document IDs to documents.
relevant_docs (Dict[str, Set[str]]): A dictionary mapping query IDs to a set of relevant document IDs.
corpus_chunk_size (int): The size of each chunk of the corpus. Defaults to 50000.
mrr_at_k (List[int]): A list of integers representing the values of k for MRR calculation. Defaults to [10].
ndcg_at_k (List[int]): A list of integers representing the values of k for NDCG calculation. Defaults to [10].
accuracy_at_k (List[int]): A list of integers representing the values of k for accuracy calculation. Defaults to [1, 3, 5, 10].
precision_recall_at_k (List[int]): A list of integers representing the values of k for precision and recall calculation. Defaults to [1, 3, 5, 10].
map_at_k (List[int]): A list of integers representing the values of k for MAP calculation. Defaults to [100].
show_progress_bar (bool): Whether to show a progress bar during evaluation. Defaults to False.
batch_size (int): The batch size for evaluation. Defaults to 32.
name (str): A name for the evaluation. Defaults to "".
write_csv (bool): Whether to write the evaluation results to a CSV file. Defaults to True.
truncate_dim (int, optional): The dimension to truncate the embeddings to. Defaults to None.
score_functions (Dict[str, Callable[[Tensor, Tensor], Tensor]]): A dictionary mapping score function names to score functions. Defaults to {SimilarityFunction.COSINE.value: cos_sim, SimilarityFunction.DOT_PRODUCT.value: dot_score}.
main_score_function (Union[str, SimilarityFunction], optional): The main score function to use for evaluation. Defaults to None.
"""
super().__init__()
self.queries_ids = []
for qid in queries:
if qid in relevant_docs and len(relevant_docs[qid]) > 0:
self.queries_ids.append(qid)
self.queries = [queries[qid] for qid in self.queries_ids]
self.corpus_ids = list(corpus.keys())
self.corpus = [corpus[cid] for cid in self.corpus_ids]
self.relevant_docs = relevant_docs
self.corpus_chunk_size = corpus_chunk_size
self.mrr_at_k = mrr_at_k
self.ndcg_at_k = ndcg_at_k
self.accuracy_at_k = accuracy_at_k
self.precision_recall_at_k = precision_recall_at_k
self.map_at_k = map_at_k
self.show_progress_bar = show_progress_bar
self.batch_size = batch_size
self.name = name
self.write_csv = write_csv
self.score_functions = score_functions
self.score_function_names = sorted(list(self.score_functions.keys()))
self.main_score_function = SimilarityFunction(main_score_function) if main_score_function else None
self.truncate_dim = truncate_dim
if name:
name = "_" + name
self.csv_file: str = "Information-Retrieval_evaluation" + name + "_results.csv"
self.csv_headers = ["epoch", "steps"]
for score_name in self.score_function_names:
for k in accuracy_at_k:
self.csv_headers.append(f"{score_name}-Accuracy@{k}")
for k in precision_recall_at_k:
self.csv_headers.append(f"{score_name}-Precision@{k}")
self.csv_headers.append(f"{score_name}-Recall@{k}")
for k in mrr_at_k:
self.csv_headers.append(f"{score_name}-MRR@{k}")
for k in ndcg_at_k:
self.csv_headers.append(f"{score_name}-NDCG@{k}")
for k in map_at_k:
self.csv_headers.append(f"{score_name}-MAP@{k}")
def __call__(
self, model: SentenceTransformer, output_path: str = None, epoch: int = -1, steps: int = -1, *args, **kwargs
) -> dict[str, float]:
if epoch != -1:
if steps == -1:
out_txt = f" after epoch {epoch}"
else:
out_txt = f" in epoch {epoch} after {steps} steps"
else:
out_txt = ""
if self.truncate_dim is not None:
out_txt += f" (truncated to {self.truncate_dim})"
logger.info(f"Information Retrieval Evaluation of the model on the {self.name} dataset{out_txt}:")
scores = self.compute_metrices(model, *args, **kwargs)
# Write results to disc
if output_path is not None and self.write_csv:
csv_path = os.path.join(output_path, self.csv_file)
if not os.path.isfile(csv_path):
fOut = open(csv_path, mode="w", encoding="utf-8")
fOut.write(",".join(self.csv_headers))
fOut.write("\n")
else:
fOut = open(csv_path, mode="a", encoding="utf-8")
output_data = [epoch, steps]
for name in self.score_function_names:
for k in self.accuracy_at_k:
output_data.append(scores[name]["accuracy@k"][k])
for k in self.precision_recall_at_k:
output_data.append(scores[name]["precision@k"][k])
output_data.append(scores[name]["recall@k"][k])
for k in self.mrr_at_k:
output_data.append(scores[name]["mrr@k"][k])
for k in self.ndcg_at_k:
output_data.append(scores[name]["ndcg@k"][k])
for k in self.map_at_k:
output_data.append(scores[name]["map@k"][k])
fOut.write(",".join(map(str, output_data)))
fOut.write("\n")
fOut.close()
if not self.primary_metric:
if self.main_score_function is None:
score_function = max(
[(name, scores[name]["map@k"][max(self.map_at_k)]) for name in self.score_function_names],
key=lambda x: x[1],
)[0]
self.primary_metric = f"{score_function}_map@{max(self.map_at_k)}"
else:
self.primary_metric = f"{self.main_score_function.value}_map@{max(self.map_at_k)}"
metrics = {
f"{score_function}_{metric_name.replace('@k', '@' + str(k))}": value
for score_function, values_dict in scores.items()
for metric_name, values in values_dict.items()
for k, value in values.items()
}
metrics = self.prefix_name_to_metrics(metrics, self.name)
self.store_metrics_in_model_card_data(model, metrics)
return metrics
def compute_metrices(
self, model: SentenceTransformer, corpus_model=None, corpus_embeddings: Tensor = None
) -> dict[str, float]:
if corpus_model is None:
corpus_model = model
max_k = max(
max(self.mrr_at_k),
max(self.ndcg_at_k),
max(self.accuracy_at_k),
max(self.precision_recall_at_k),
max(self.map_at_k),
)
# Compute embedding for the queries
with nullcontext() if self.truncate_dim is None else model.truncate_sentence_embeddings(self.truncate_dim):
query_embeddings = model.encode(
self.queries,
show_progress_bar=self.show_progress_bar,
batch_size=self.batch_size,
convert_to_tensor=True,
)
queries_result_list = {}
for name in self.score_functions:
queries_result_list[name] = [[] for _ in range(len(query_embeddings))]
# Iterate over chunks of the corpus
for corpus_start_idx in trange(
0, len(self.corpus), self.corpus_chunk_size, desc="Corpus Chunks", disable=not self.show_progress_bar
):
corpus_end_idx = min(corpus_start_idx + self.corpus_chunk_size, len(self.corpus))
# Encode chunk of corpus
if corpus_embeddings is None:
with nullcontext() if self.truncate_dim is None else corpus_model.truncate_sentence_embeddings(
self.truncate_dim
):
sub_corpus_embeddings = corpus_model.encode(
self.corpus[corpus_start_idx:corpus_end_idx],
show_progress_bar=False,
batch_size=self.batch_size,
convert_to_tensor=True,
)
else:
sub_corpus_embeddings = corpus_embeddings[corpus_start_idx:corpus_end_idx]
# Compute cosine similarites
for name, score_function in self.score_functions.items():
pair_scores = score_function(query_embeddings, sub_corpus_embeddings)
# Get top-k values
pair_scores_top_k_values, pair_scores_top_k_idx = torch.topk(
pair_scores, min(max_k, len(pair_scores[0])), dim=1, largest=True, sorted=False
)
pair_scores_top_k_values = pair_scores_top_k_values.cpu().tolist()
pair_scores_top_k_idx = pair_scores_top_k_idx.cpu().tolist()
for query_itr in range(len(query_embeddings)):
for sub_corpus_id, score in zip(
pair_scores_top_k_idx[query_itr], pair_scores_top_k_values[query_itr]
):
corpus_id = self.corpus_ids[corpus_start_idx + sub_corpus_id]
if len(queries_result_list[name][query_itr]) < max_k:
heapq.heappush(
queries_result_list[name][query_itr], (score, corpus_id)
) # heaqp tracks the quantity of the first element in the tuple
else:
heapq.heappushpop(queries_result_list[name][query_itr], (score, corpus_id))
for name in queries_result_list:
for query_itr in range(len(queries_result_list[name])):
for doc_itr in range(len(queries_result_list[name][query_itr])):
score, corpus_id = queries_result_list[name][query_itr][doc_itr]
queries_result_list[name][query_itr][doc_itr] = {"corpus_id": corpus_id, "score": score}
logger.info(f"Queries: {len(self.queries)}")
logger.info(f"Corpus: {len(self.corpus)}\n")
# Compute scores
scores = {name: self.compute_metrics(queries_result_list[name]) for name in self.score_functions}
# Output
for name in self.score_function_names:
logger.info(f"Score-Function: {name}")
self.output_scores(scores[name])
return scores
def compute_metrics(self, queries_result_list: list[object]):
# Init score computation values
num_hits_at_k = {k: 0 for k in self.accuracy_at_k}
precisions_at_k = {k: [] for k in self.precision_recall_at_k}
recall_at_k = {k: [] for k in self.precision_recall_at_k}
MRR = {k: 0 for k in self.mrr_at_k}
ndcg = {k: [] for k in self.ndcg_at_k}
AveP_at_k = {k: [] for k in self.map_at_k}
# Compute scores on results
for query_itr in range(len(queries_result_list)):
query_id = self.queries_ids[query_itr]
# Sort scores
top_hits = sorted(queries_result_list[query_itr], key=lambda x: x["score"], reverse=True)
query_relevant_docs = self.relevant_docs[query_id]
# Accuracy@k - We count the result correct, if at least one relevant doc is across the top-k documents
for k_val in self.accuracy_at_k:
for hit in top_hits[0:k_val]:
if hit["corpus_id"] in query_relevant_docs:
num_hits_at_k[k_val] += 1
break
# Precision and Recall@k
for k_val in self.precision_recall_at_k:
num_correct = 0
for hit in top_hits[0:k_val]:
if hit["corpus_id"] in query_relevant_docs:
num_correct += 1
precisions_at_k[k_val].append(num_correct / k_val)
recall_at_k[k_val].append(num_correct / len(query_relevant_docs))
# MRR@k
for k_val in self.mrr_at_k:
for rank, hit in enumerate(top_hits[0:k_val]):
if hit["corpus_id"] in query_relevant_docs:
MRR[k_val] += 1.0 / (rank + 1)
break
# NDCG@k
for k_val in self.ndcg_at_k:
predicted_relevance = [
1 if top_hit["corpus_id"] in query_relevant_docs else 0 for top_hit in top_hits[0:k_val]
]
true_relevances = [1] * len(query_relevant_docs)
ndcg_value = self.compute_dcg_at_k(predicted_relevance, k_val) / self.compute_dcg_at_k(
true_relevances, k_val
)
ndcg[k_val].append(ndcg_value)
# MAP@k
for k_val in self.map_at_k:
num_correct = 0
sum_precisions = 0
for rank, hit in enumerate(top_hits[0:k_val]):
if hit["corpus_id"] in query_relevant_docs:
num_correct += 1
sum_precisions += num_correct / (rank + 1)
avg_precision = sum_precisions / min(k_val, len(query_relevant_docs))
AveP_at_k[k_val].append(avg_precision)
# Compute averages
for k in num_hits_at_k:
num_hits_at_k[k] /= len(self.queries)
for k in precisions_at_k:
precisions_at_k[k] = np.mean(precisions_at_k[k])
for k in recall_at_k:
recall_at_k[k] = np.mean(recall_at_k[k])
for k in ndcg:
ndcg[k] = np.mean(ndcg[k])
for k in MRR:
MRR[k] /= len(self.queries)
for k in AveP_at_k:
AveP_at_k[k] = np.mean(AveP_at_k[k])
return {
"accuracy@k": num_hits_at_k,
"precision@k": precisions_at_k,
"recall@k": recall_at_k,
"ndcg@k": ndcg,
"mrr@k": MRR,
"map@k": AveP_at_k,
}
def output_scores(self, scores):
for k in scores["accuracy@k"]:
logger.info("Accuracy@{}: {:.2f}%".format(k, scores["accuracy@k"][k] * 100))
for k in scores["precision@k"]:
logger.info("Precision@{}: {:.2f}%".format(k, scores["precision@k"][k] * 100))
for k in scores["recall@k"]:
logger.info("Recall@{}: {:.2f}%".format(k, scores["recall@k"][k] * 100))
for k in scores["mrr@k"]:
logger.info("MRR@{}: {:.4f}".format(k, scores["mrr@k"][k]))
for k in scores["ndcg@k"]:
logger.info("NDCG@{}: {:.4f}".format(k, scores["ndcg@k"][k]))
for k in scores["map@k"]:
logger.info("MAP@{}: {:.4f}".format(k, scores["map@k"][k]))
@staticmethod
def compute_dcg_at_k(relevances, k):
dcg = 0
for i in range(min(len(relevances), k)):
dcg += relevances[i] / np.log2(i + 2) # +2 as we start our idx at 0
return dcg