Source code for langchain_experimental.rl_chain.pick_best_chain

from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional, Tuple, Type, Union

from langchain.base_language import BaseLanguageModel
from langchain.chains.llm import LLMChain
from langchain_core.callbacks.manager import CallbackManagerForChainRun
from langchain_core.prompts import BasePromptTemplate

import langchain_experimental.rl_chain.base as base

logger = logging.getLogger(__name__)

# 用作区分不同情况的哨兵对象
SENTINEL = object()


[docs]class PickBestSelected(base.Selected): """选择用于PickBest链的类。""" index: Optional[int] probability: Optional[float] score: Optional[float]
[docs] def __init__( self, index: Optional[int] = None, probability: Optional[float] = None, score: Optional[float] = None, ): self.index = index self.probability = probability self.score = score
[docs]class PickBestEvent(base.Event[PickBestSelected]): """PickBest链的事件类。"""
[docs] def __init__( self, inputs: Dict[str, Any], to_select_from: Dict[str, Any], based_on: Dict[str, Any], selected: Optional[PickBestSelected] = None, ): super().__init__(inputs=inputs, selected=selected) self.to_select_from = to_select_from self.based_on = based_on
[docs]class PickBestFeatureEmbedder(base.Embedder[PickBestEvent]): """将`BasedOn`和`ToSelectFrom`输入嵌入到可以被学习策略使用的格式中。 属性: model name (Any, optional): 用于特征表示的嵌入类型。默认为BERT SentenceTransformer。""" # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。
[docs] def __init__( self, auto_embed: bool, model: Optional[Any] = None, *args: Any, **kwargs: Any ): super().__init__(*args, **kwargs) if model is None: from sentence_transformers import SentenceTransformer model = SentenceTransformer("all-mpnet-base-v2") self.model = model self.auto_embed = auto_embed
@staticmethod def _str(embedding: List[float]) -> str: return " ".join([f"{i}:{e}" for i, e in enumerate(embedding)])
[docs] def get_label(self, event: PickBestEvent) -> tuple: cost = None if event.selected: chosen_action = event.selected.index cost = ( -1.0 * event.selected.score if event.selected.score is not None else None ) prob = event.selected.probability return chosen_action, cost, prob else: return None, None, None
[docs] def get_context_and_action_embeddings(self, event: PickBestEvent) -> tuple: context_emb = base.embed(event.based_on, self.model) if event.based_on else None to_select_from_var_name, to_select_from = next( iter(event.to_select_from.items()), (None, None) ) action_embs = ( ( base.embed(to_select_from, self.model, to_select_from_var_name) if event.to_select_from else None ) if to_select_from else None ) if not context_emb or not action_embs: raise ValueError( "Context and to_select_from must be provided in the inputs dictionary" ) return context_emb, action_embs
[docs] def get_indexed_dot_product(self, context_emb: List, action_embs: List) -> Dict: import numpy as np unique_contexts = set() for context_item in context_emb: for ns, ee in context_item.items(): if isinstance(ee, list): for ea in ee: unique_contexts.add(f"{ns}={ea}") else: unique_contexts.add(f"{ns}={ee}") encoded_contexts = self.model.encode(list(unique_contexts)) context_embeddings = dict(zip(unique_contexts, encoded_contexts)) unique_actions = set() for action in action_embs: for ns, e in action.items(): if isinstance(e, list): for ea in e: unique_actions.add(f"{ns}={ea}") else: unique_actions.add(f"{ns}={e}") encoded_actions = self.model.encode(list(unique_actions)) action_embeddings = dict(zip(unique_actions, encoded_actions)) action_matrix = np.stack([v for k, v in action_embeddings.items()]) context_matrix = np.stack([v for k, v in context_embeddings.items()]) dot_product_matrix = np.dot(context_matrix, action_matrix.T) indexed_dot_product: Dict = {} for i, context_key in enumerate(context_embeddings.keys()): indexed_dot_product[context_key] = {} for j, action_key in enumerate(action_embeddings.keys()): indexed_dot_product[context_key][action_key] = dot_product_matrix[i, j] return indexed_dot_product
[docs] def format_auto_embed_on(self, event: PickBestEvent) -> str: chosen_action, cost, prob = self.get_label(event) context_emb, action_embs = self.get_context_and_action_embeddings(event) indexed_dot_product = self.get_indexed_dot_product(context_emb, action_embs) action_lines = [] for i, action in enumerate(action_embs): line_parts = [] dot_prods = [] if cost is not None and chosen_action == i: line_parts.append(f"{chosen_action}:{cost}:{prob}") for ns, action in action.items(): line_parts.append(f"|{ns}") elements = action if isinstance(action, list) else [action] nsa = [] for elem in elements: line_parts.append(f"{elem}") ns_a = f"{ns}={elem}" nsa.append(ns_a) for k, v in indexed_dot_product.items(): dot_prods.append(v[ns_a]) nsa_str = " ".join(nsa) line_parts.append(f"|# {nsa_str}") line_parts.append(f"|dotprod {self._str(dot_prods)}") action_lines.append(" ".join(line_parts)) shared = [] for item in context_emb: for ns, context in item.items(): shared.append(f"|{ns}") elements = context if isinstance(context, list) else [context] nsc = [] for elem in elements: shared.append(f"{elem}") nsc.append(f"{ns}={elem}") nsc_str = " ".join(nsc) shared.append(f"|@ {nsc_str}") return "shared " + " ".join(shared) + "\n" + "\n".join(action_lines)
[docs] def format_auto_embed_off(self, event: PickBestEvent) -> str: """ 将`BasedOn`和`ToSelectFrom`转换为VW可用的格式 """ chosen_action, cost, prob = self.get_label(event) context_emb, action_embs = self.get_context_and_action_embeddings(event) example_string = "" example_string += "shared " for context_item in context_emb: for ns, based_on in context_item.items(): e = " ".join(based_on) if isinstance(based_on, list) else based_on example_string += f"|{ns} {e} " example_string += "\n" for i, action in enumerate(action_embs): if cost is not None and chosen_action == i: example_string += f"{chosen_action}:{cost}:{prob} " for ns, action_embedding in action.items(): e = ( " ".join(action_embedding) if isinstance(action_embedding, list) else action_embedding ) example_string += f"|{ns} {e} " example_string += "\n" # 去掉最后的换行符 return example_string[:-1]
[docs] def format(self, event: PickBestEvent) -> str: if self.auto_embed: return self.format_auto_embed_on(event) else: return self.format_auto_embed_off(event)
[docs]class PickBestRandomPolicy(base.Policy[PickBestEvent]): """PickBest链的随机策略。"""
[docs] def __init__(self, feature_embedder: base.Embedder, **kwargs: Any): self.feature_embedder = feature_embedder
[docs] def predict(self, event: PickBestEvent) -> List[Tuple[int, float]]: num_items = len(event.to_select_from) return [(i, 1.0 / num_items) for i in range(num_items)]
[docs] def learn(self, event: PickBestEvent) -> None: pass
[docs] def log(self, event: PickBestEvent) -> None: pass
[docs]class PickBest(base.RLChain[PickBestEvent]): """利用Vowpal Wabbit(VW)模型进行强化学习的链,带有上下文,旨在在LLM调用之前修改提示。 该链的每次`run()`方法调用应配备一组潜在动作(`ToSelectFrom`),并将基于`BasedOn`输入选择特定动作。然后选择的动作通知LLM(语言模型)提示,用于生成后续响应。 该链的标准操作流程包括: 1. 使用包含`BasedOn`标准和潜在动作列表(`ToSelectFrom`)的输入调用链。 2. 基于`BasedOn`输入选择一个动作。 3. 使用动态提示调用LLM,生成响应。 4. 如果提供了`selection_scorer`,则用于对选择进行评分。 5. 使用`BasedOn`输入、选择的`ToSelectFrom`动作以及评分器的结果更新内部Vowpal Wabbit模型。 6. 返回最终响应。 预期输入字典格式: - 至少一个封装在`BasedOn`中的变量,作为选择标准。 - `ToSelectFrom`中的单个列表变量,表示VW模型的潜在动作。该列表可以采用以下形式: - 字符串列表,例如`action = ToSelectFrom(["action1", "action2", "action3"])` - 字符串列表的列表,例如`action = ToSelectFrom([["action1", "action1的另一个标识"], ["action2", "action2的另一个标识"]])` - 字典列表,其中每个字典表示具有命名空间名称作为键和相应动作字符串作为值的动作。例如,`action = ToSelectFrom([{"namespace1": ["action1", "action1的另一个标识"], "namespace2": "action2"}, {"namespace1": "action3", "namespace2": "action4"}])`。 扩展: RLChain 属性: feature_embedder(PickBestFeatureEmbedder,可选):是一个高级属性。负责嵌入`BasedOn`和`ToSelectFrom`输入。如果省略,将使用默认的嵌入器。 """ # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。 def __init__( self, *args: Any, **kwargs: Any, ): auto_embed = kwargs.get("auto_embed", False) feature_embedder = kwargs.get("feature_embedder", None) if feature_embedder: if "auto_embed" in kwargs: logger.warning( "auto_embed will take no effect when explicit feature_embedder is provided" # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。 ) # 关闭自动嵌入以设置以下cli设置 auto_embed = False else: feature_embedder = PickBestFeatureEmbedder(auto_embed=auto_embed) kwargs["feature_embedder"] = feature_embedder vw_cmd = kwargs.get("vw_cmd", []) if vw_cmd: if "--cb_explore_adf" not in vw_cmd: raise ValueError( "If vw_cmd is specified, it must include --cb_explore_adf" ) else: interactions = ["--interactions=::"] if auto_embed: interactions = [ "--interactions=@#", "--ignore_linear=@", "--ignore_linear=#", ] vw_cmd = interactions + [ "--cb_explore_adf", "--coin", "--squarecb", "--quiet", ] kwargs["vw_cmd"] = vw_cmd super().__init__(*args, **kwargs) def _call_before_predict(self, inputs: Dict[str, Any]) -> PickBestEvent: context, actions = base.get_based_on_and_to_select_from(inputs=inputs) if not actions: raise ValueError( "No variables using 'ToSelectFrom' found in the inputs. Please include at least one variable containing a list to select from." # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。 ) if len(list(actions.values())) > 1: raise ValueError( "Only one variable using 'ToSelectFrom' can be provided in the inputs for the PickBest chain. Please provide only one variable containing a list to select from." # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。 ) if not context: raise ValueError( "No variables using 'BasedOn' found in the inputs. Please include at least one variable containing information to base the selected of ToSelectFrom on." # noqa E501 表示忽略 PEP 8 中的行长度限制错误。 表示忽略PEP 8规范中的行长度限制。 表示不要对代码进行 E501 格式的检查。 E501 通常是指代码行超过了最大长度限制。 表示忽略 PEP 8 中的行长度限制错误。 不要检查行的最大长度限制 表示忽略PEP 8规范中的行长度限制。 ) event = PickBestEvent(inputs=inputs, to_select_from=actions, based_on=context) return event def _call_after_predict_before_llm( self, inputs: Dict[str, Any], event: PickBestEvent, prediction: List[Tuple[int, float]], ) -> Tuple[Dict[str, Any], PickBestEvent]: import numpy as np prob_sum = sum(prob for _, prob in prediction) probabilities = [prob / prob_sum for _, prob in prediction] ## 从概率质量函数中取样 sampled_index = np.random.choice(len(prediction), p=probabilities) sampled_ap = prediction[sampled_index] sampled_action = sampled_ap[0] sampled_prob = sampled_ap[1] selected = PickBestSelected(index=sampled_action, probability=sampled_prob) event.selected = selected # 事件中只有一个键值对可供选择 key, value = next(iter(event.to_select_from.items())) next_chain_inputs = inputs.copy() next_chain_inputs.update({key: value[event.selected.index]}) return next_chain_inputs, event def _call_after_llm_before_scoring( self, llm_response: str, event: PickBestEvent ) -> Tuple[Dict[str, Any], PickBestEvent]: next_chain_inputs = event.inputs.copy() # 事件中只有一个键值对可供选择 value = next(iter(event.to_select_from.values())) v = ( value[event.selected.index] if event.selected else event.to_select_from.values() ) next_chain_inputs.update( { self.selected_based_on_input_key: str(event.based_on), self.selected_input_key: v, } ) return next_chain_inputs, event def _call_after_scoring_before_learning( self, event: PickBestEvent, score: Optional[float] ) -> PickBestEvent: if event.selected: event.selected.score = score return event def _call( self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] = None, ) -> Dict[str, Any]: return super()._call(run_manager=run_manager, inputs=inputs) @property def _chain_type(self) -> str: return "rl_chain_pick_best"
[docs] @classmethod def from_llm( cls: Type[PickBest], llm: BaseLanguageModel, prompt: BasePromptTemplate, selection_scorer: Union[base.AutoSelectionScorer, object] = SENTINEL, **kwargs: Any, ) -> PickBest: llm_chain = LLMChain(llm=llm, prompt=prompt) if selection_scorer is SENTINEL: selection_scorer = base.AutoSelectionScorer(llm=llm_chain.llm) return PickBest( llm_chain=llm_chain, prompt=prompt, selection_scorer=selection_scorer, **kwargs, )