from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from langchain.base_language import BaseLanguageModel
from langchain.chains.llm import LLMChain
from langchain_core.callbacks.manager import CallbackManagerForChainRun
from langchain_core.prompts import BasePromptTemplate
import langchain_experimental.rl_chain.base as base
logger = logging.getLogger(__name__)
# 用作区分不同情况的哨兵对象
SENTINEL = object()
[docs]class PickBestSelected(base.Selected):
"""选择用于PickBest链的类。"""
index: Optional[int]
probability: Optional[float]
score: Optional[float]
[docs] def __init__(
self,
index: Optional[int] = None,
probability: Optional[float] = None,
score: Optional[float] = None,
):
self.index = index
self.probability = probability
self.score = score
[docs]class PickBestEvent(base.Event[PickBestSelected]):
"""PickBest链的事件类。"""
[docs] def __init__(
self,
inputs: Dict[str, Any],
to_select_from: Dict[str, Any],
based_on: Dict[str, Any],
selected: Optional[PickBestSelected] = None,
):
super().__init__(inputs=inputs, selected=selected)
self.to_select_from = to_select_from
self.based_on = based_on
[docs]class PickBestFeatureEmbedder(base.Embedder[PickBestEvent]):
"""将`BasedOn`和`ToSelectFrom`输入嵌入到可以被学习策略使用的格式中。
属性:
model name (Any, optional): 用于特征表示的嵌入类型。默认为BERT SentenceTransformer。""" # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。
[docs] def __init__(
self, auto_embed: bool, model: Optional[Any] = None, *args: Any, **kwargs: Any
):
super().__init__(*args, **kwargs)
if model is None:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-mpnet-base-v2")
self.model = model
self.auto_embed = auto_embed
@staticmethod
def _str(embedding: List[float]) -> str:
return " ".join([f"{i}:{e}" for i, e in enumerate(embedding)])
[docs] def get_label(self, event: PickBestEvent) -> tuple:
cost = None
if event.selected:
chosen_action = event.selected.index
cost = (
-1.0 * event.selected.score
if event.selected.score is not None
else None
)
prob = event.selected.probability
return chosen_action, cost, prob
else:
return None, None, None
[docs] def get_context_and_action_embeddings(self, event: PickBestEvent) -> tuple:
context_emb = base.embed(event.based_on, self.model) if event.based_on else None
to_select_from_var_name, to_select_from = next(
iter(event.to_select_from.items()), (None, None)
)
action_embs = (
(
base.embed(to_select_from, self.model, to_select_from_var_name)
if event.to_select_from
else None
)
if to_select_from
else None
)
if not context_emb or not action_embs:
raise ValueError(
"Context and to_select_from must be provided in the inputs dictionary"
)
return context_emb, action_embs
[docs] def get_indexed_dot_product(self, context_emb: List, action_embs: List) -> Dict:
import numpy as np
unique_contexts = set()
for context_item in context_emb:
for ns, ee in context_item.items():
if isinstance(ee, list):
for ea in ee:
unique_contexts.add(f"{ns}={ea}")
else:
unique_contexts.add(f"{ns}={ee}")
encoded_contexts = self.model.encode(list(unique_contexts))
context_embeddings = dict(zip(unique_contexts, encoded_contexts))
unique_actions = set()
for action in action_embs:
for ns, e in action.items():
if isinstance(e, list):
for ea in e:
unique_actions.add(f"{ns}={ea}")
else:
unique_actions.add(f"{ns}={e}")
encoded_actions = self.model.encode(list(unique_actions))
action_embeddings = dict(zip(unique_actions, encoded_actions))
action_matrix = np.stack([v for k, v in action_embeddings.items()])
context_matrix = np.stack([v for k, v in context_embeddings.items()])
dot_product_matrix = np.dot(context_matrix, action_matrix.T)
indexed_dot_product: Dict = {}
for i, context_key in enumerate(context_embeddings.keys()):
indexed_dot_product[context_key] = {}
for j, action_key in enumerate(action_embeddings.keys()):
indexed_dot_product[context_key][action_key] = dot_product_matrix[i, j]
return indexed_dot_product
[docs]class PickBestRandomPolicy(base.Policy[PickBestEvent]):
"""PickBest链的随机策略。"""
[docs] def __init__(self, feature_embedder: base.Embedder, **kwargs: Any):
self.feature_embedder = feature_embedder
[docs] def predict(self, event: PickBestEvent) -> List[Tuple[int, float]]:
num_items = len(event.to_select_from)
return [(i, 1.0 / num_items) for i in range(num_items)]
[docs] def learn(self, event: PickBestEvent) -> None:
pass
[docs] def log(self, event: PickBestEvent) -> None:
pass
[docs]class PickBest(base.RLChain[PickBestEvent]):
"""利用Vowpal Wabbit(VW)模型进行强化学习的链,带有上下文,旨在在LLM调用之前修改提示。
该链的每次`run()`方法调用应配备一组潜在动作(`ToSelectFrom`),并将基于`BasedOn`输入选择特定动作。然后选择的动作通知LLM(语言模型)提示,用于生成后续响应。
该链的标准操作流程包括:
1. 使用包含`BasedOn`标准和潜在动作列表(`ToSelectFrom`)的输入调用链。
2. 基于`BasedOn`输入选择一个动作。
3. 使用动态提示调用LLM,生成响应。
4. 如果提供了`selection_scorer`,则用于对选择进行评分。
5. 使用`BasedOn`输入、选择的`ToSelectFrom`动作以及评分器的结果更新内部Vowpal Wabbit模型。
6. 返回最终响应。
预期输入字典格式:
- 至少一个封装在`BasedOn`中的变量,作为选择标准。
- `ToSelectFrom`中的单个列表变量,表示VW模型的潜在动作。该列表可以采用以下形式:
- 字符串列表,例如`action = ToSelectFrom(["action1", "action2", "action3"])`
- 字符串列表的列表,例如`action = ToSelectFrom([["action1", "action1的另一个标识"], ["action2", "action2的另一个标识"]])`
- 字典列表,其中每个字典表示具有命名空间名称作为键和相应动作字符串作为值的动作。例如,`action = ToSelectFrom([{"namespace1": ["action1", "action1的另一个标识"], "namespace2": "action2"}, {"namespace1": "action3", "namespace2": "action4"}])`。
扩展:
RLChain
属性:
feature_embedder(PickBestFeatureEmbedder,可选):是一个高级属性。负责嵌入`BasedOn`和`ToSelectFrom`输入。如果省略,将使用默认的嵌入器。
""" # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。
def __init__(
self,
*args: Any,
**kwargs: Any,
):
auto_embed = kwargs.get("auto_embed", False)
feature_embedder = kwargs.get("feature_embedder", None)
if feature_embedder:
if "auto_embed" in kwargs:
logger.warning(
"auto_embed will take no effect when explicit feature_embedder is provided" # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。
)
# 关闭自动嵌入以设置以下cli设置
auto_embed = False
else:
feature_embedder = PickBestFeatureEmbedder(auto_embed=auto_embed)
kwargs["feature_embedder"] = feature_embedder
vw_cmd = kwargs.get("vw_cmd", [])
if vw_cmd:
if "--cb_explore_adf" not in vw_cmd:
raise ValueError(
"If vw_cmd is specified, it must include --cb_explore_adf"
)
else:
interactions = ["--interactions=::"]
if auto_embed:
interactions = [
"--interactions=@#",
"--ignore_linear=@",
"--ignore_linear=#",
]
vw_cmd = interactions + [
"--cb_explore_adf",
"--coin",
"--squarecb",
"--quiet",
]
kwargs["vw_cmd"] = vw_cmd
super().__init__(*args, **kwargs)
def _call_before_predict(self, inputs: Dict[str, Any]) -> PickBestEvent:
context, actions = base.get_based_on_and_to_select_from(inputs=inputs)
if not actions:
raise ValueError(
"No variables using 'ToSelectFrom' found in the inputs. Please include at least one variable containing a list to select from." # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。
)
if len(list(actions.values())) > 1:
raise ValueError(
"Only one variable using 'ToSelectFrom' can be provided in the inputs for the PickBest chain. Please provide only one variable containing a list to select from." # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。
)
if not context:
raise ValueError(
"No variables using 'BasedOn' found in the inputs. Please include at least one variable containing information to base the selected of ToSelectFrom on." # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。
)
event = PickBestEvent(inputs=inputs, to_select_from=actions, based_on=context)
return event
def _call_after_predict_before_llm(
self,
inputs: Dict[str, Any],
event: PickBestEvent,
prediction: List[Tuple[int, float]],
) -> Tuple[Dict[str, Any], PickBestEvent]:
import numpy as np
prob_sum = sum(prob for _, prob in prediction)
probabilities = [prob / prob_sum for _, prob in prediction]
## 从概率质量函数中取样
sampled_index = np.random.choice(len(prediction), p=probabilities)
sampled_ap = prediction[sampled_index]
sampled_action = sampled_ap[0]
sampled_prob = sampled_ap[1]
selected = PickBestSelected(index=sampled_action, probability=sampled_prob)
event.selected = selected
# 事件中只有一个键值对可供选择
key, value = next(iter(event.to_select_from.items()))
next_chain_inputs = inputs.copy()
next_chain_inputs.update({key: value[event.selected.index]})
return next_chain_inputs, event
def _call_after_llm_before_scoring(
self, llm_response: str, event: PickBestEvent
) -> Tuple[Dict[str, Any], PickBestEvent]:
next_chain_inputs = event.inputs.copy()
# 事件中只有一个键值对可供选择
value = next(iter(event.to_select_from.values()))
v = (
value[event.selected.index]
if event.selected
else event.to_select_from.values()
)
next_chain_inputs.update(
{
self.selected_based_on_input_key: str(event.based_on),
self.selected_input_key: v,
}
)
return next_chain_inputs, event
def _call_after_scoring_before_learning(
self, event: PickBestEvent, score: Optional[float]
) -> PickBestEvent:
if event.selected:
event.selected.score = score
return event
def _call(
self,
inputs: Dict[str, Any],
run_manager: Optional[CallbackManagerForChainRun] = None,
) -> Dict[str, Any]:
return super()._call(run_manager=run_manager, inputs=inputs)
@property
def _chain_type(self) -> str:
return "rl_chain_pick_best"
[docs] @classmethod
def from_llm(
cls: Type[PickBest],
llm: BaseLanguageModel,
prompt: BasePromptTemplate,
selection_scorer: Union[base.AutoSelectionScorer, object] = SENTINEL,
**kwargs: Any,
) -> PickBest:
llm_chain = LLMChain(llm=llm, prompt=prompt)
if selection_scorer is SENTINEL:
selection_scorer = base.AutoSelectionScorer(llm=llm_chain.llm)
return PickBest(
llm_chain=llm_chain,
prompt=prompt,
selection_scorer=selection_scorer,
**kwargs,
)