19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 | class SentenceTransformerRerank(BaseNodePostprocessor):
model: str = Field(description="Sentence transformer model name.")
top_n: int = Field(description="Number of nodes to return sorted by score.")
device: str = Field(
default="cpu",
description="Device to use for sentence transformer.",
)
keep_retrieval_score: bool = Field(
default=False,
description="Whether to keep the retrieval score in metadata.",
)
_model: Any = PrivateAttr()
def __init__(
self,
top_n: int = 2,
model: str = "cross-encoder/stsb-distilroberta-base",
device: Optional[str] = None,
keep_retrieval_score: Optional[bool] = False,
):
try:
from sentence_transformers import CrossEncoder
except ImportError:
raise ImportError(
"Cannot import sentence-transformers or torch package,",
"please `pip install torch sentence-transformers`",
)
device = infer_torch_device() if device is None else device
self._model = CrossEncoder(
model, max_length=DEFAULT_SENTENCE_TRANSFORMER_MAX_LENGTH, device=device
)
super().__init__(
top_n=top_n,
model=model,
device=device,
keep_retrieval_score=keep_retrieval_score,
)
@classmethod
def class_name(cls) -> str:
return "SentenceTransformerRerank"
def _postprocess_nodes(
self,
nodes: List[NodeWithScore],
query_bundle: Optional[QueryBundle] = None,
) -> List[NodeWithScore]:
dispatch_event = dispatcher.get_dispatch_event()
dispatch_event(
ReRankStartEvent(
query=query_bundle,
nodes=nodes,
top_n=self.top_n,
model_name=self.model,
)
)
if query_bundle is None:
raise ValueError("Missing query bundle in extra info.")
if len(nodes) == 0:
return []
query_and_nodes = [
(
query_bundle.query_str,
node.node.get_content(metadata_mode=MetadataMode.EMBED),
)
for node in nodes
]
with self.callback_manager.event(
CBEventType.RERANKING,
payload={
EventPayload.NODES: nodes,
EventPayload.MODEL_NAME: self.model,
EventPayload.QUERY_STR: query_bundle.query_str,
EventPayload.TOP_K: self.top_n,
},
) as event:
scores = self._model.predict(query_and_nodes)
assert len(scores) == len(nodes)
for node, score in zip(nodes, scores):
if self.keep_retrieval_score:
# keep the retrieval score in metadata
node.node.metadata["retrieval_score"] = node.score
node.score = float(score)
new_nodes = sorted(nodes, key=lambda x: -x.score if x.score else 0)[
: self.top_n
]
event.on_end(payload={EventPayload.NODES: new_nodes})
dispatch_event(ReRankEndEvent(nodes=new_nodes))
return new_nodes
|