Transformers 文档

如何创建一个自定义管道?

如何创建自定义管道?

在本指南中,我们将了解如何创建自定义管道并将其分享到Hub或添加到🤗 Transformers库中。

首先,你需要决定管道能够接受的原始输入。它可以是字符串、原始字节、字典或任何看起来最有可能的输入。尽量保持这些输入尽可能纯Python,因为这使得兼容性更容易(即使通过其他语言通过JSON)。这些将是管道的inputspreprocess)。

然后定义outputs。与inputs相同的策略。越简单越好。这些将是postprocess方法的输出。

首先继承基类 Pipeline,并实现所需的4个方法:preprocess_forwardpostprocess_sanitize_parameters

from transformers import Pipeline


class MyPipeline(Pipeline):
    def _sanitize_parameters(self, **kwargs):
        preprocess_kwargs = {}
        if "maybe_arg" in kwargs:
            preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
        return preprocess_kwargs, {}, {}

    def preprocess(self, inputs, maybe_arg=2):
        model_input = Tensor(inputs["input_ids"])
        return {"model_input": model_input}

    def _forward(self, model_inputs):
        # model_inputs == {"model_input": model_input}
        outputs = self.model(**model_inputs)
        # Maybe {"logits": Tensor(...)}
        return outputs

    def postprocess(self, model_outputs):
        best_class = model_outputs["logits"].softmax(-1)
        return best_class

这种分解结构的目的是为了相对无缝地支持CPU/GPU,同时支持在不同的线程上在CPU上进行预处理/后处理。

preprocess 将获取最初定义的输入,并将它们转换为可以输入到模型中的内容。它可能包含更多信息,通常是一个 Dict

_forward 是实现细节,不打算直接调用。forward 是首选的调用方法,因为它包含确保一切在预期设备上工作的保护措施。如果任何内容链接到真实模型,它属于 _forward 方法,其他内容则在预处理/后处理中。

postprocess 方法将获取 _forward 的输出并将其转换为之前决定的最终输出。

_sanitize_parameters 的存在是为了允许用户在任何时候传递任何参数,无论是在初始化时 pipeline(...., maybe_arg=4) 还是在调用时 pipe = pipeline(...); output = pipe(...., maybe_arg=4)

_sanitize_parameters 的返回值是3个kwargs字典,它们将直接传递给 preprocess_forwardpostprocess。如果调用者没有传递任何额外参数,请不要填充任何内容。这样可以保留函数定义中的默认参数,这通常更“自然”。

一个经典的例子是分类任务后处理中的top_k参数。

>>> pipe = pipeline("my-new-task")
>>> pipe("This is a test")
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}, {"label": "3-star", "score": 0.05}
{"label": "4-star", "score": 0.025}, {"label": "5-star", "score": 0.025}]

>>> pipe("This is a test", top_k=2)
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}]

为了实现这一点,我们将更新我们的postprocess方法,添加一个默认参数为5,并编辑_sanitize_parameters以允许这个新参数。

def postprocess(self, model_outputs, top_k=5):
    best_class = model_outputs["logits"].softmax(-1)
    # Add logic to handle top_k
    return best_class


def _sanitize_parameters(self, **kwargs):
    preprocess_kwargs = {}
    if "maybe_arg" in kwargs:
        preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]

    postprocess_kwargs = {}
    if "top_k" in kwargs:
        postprocess_kwargs["top_k"] = kwargs["top_k"]
    return preprocess_kwargs, {}, postprocess_kwargs

尽量保持输入/输出非常简单,最好是JSON可序列化的,因为这样可以使管道的使用非常容易,而不需要用户理解新的对象类型。为了便于使用,支持许多不同类型的参数也是相对常见的(例如音频文件,可以是文件名、URL或纯字节)。

将其添加到支持的任务列表中

要将您的new-task注册到支持的任务列表中,您需要将其添加到PIPELINE_REGISTRY中:

from transformers.pipelines import PIPELINE_REGISTRY

PIPELINE_REGISTRY.register_pipeline(
    "new-task",
    pipeline_class=MyPipeline,
    pt_model=AutoModelForSequenceClassification,
)

如果您愿意,可以指定一个默认模型,在这种情况下,它应该附带一个特定的修订版本(可以是分支名称或提交哈希,这里我们使用了"abcdef")以及类型:

PIPELINE_REGISTRY.register_pipeline(
    "new-task",
    pipeline_class=MyPipeline,
    pt_model=AutoModelForSequenceClassification,
    default={"pt": ("user/awesome_model", "abcdef")},
    type="text",  # current support type: text, audio, image, multimodal
)

在Hub上分享你的管道

要在Hub上分享您的自定义管道,您只需将Pipeline子类的自定义代码保存在一个python文件中。例如,假设我们想使用一个自定义管道来进行句子对分类,如下所示:

import numpy as np

from transformers import Pipeline


def softmax(outputs):
    maxes = np.max(outputs, axis=-1, keepdims=True)
    shifted_exp = np.exp(outputs - maxes)
    return shifted_exp / shifted_exp.sum(axis=-1, keepdims=True)


class PairClassificationPipeline(Pipeline):
    def _sanitize_parameters(self, **kwargs):
        preprocess_kwargs = {}
        if "second_text" in kwargs:
            preprocess_kwargs["second_text"] = kwargs["second_text"]
        return preprocess_kwargs, {}, {}

    def preprocess(self, text, second_text=None):
        return self.tokenizer(text, text_pair=second_text, return_tensors=self.framework)

    def _forward(self, model_inputs):
        return self.model(**model_inputs)

    def postprocess(self, model_outputs):
        logits = model_outputs.logits[0].numpy()
        probabilities = softmax(logits)

        best_class = np.argmax(probabilities)
        label = self.model.config.id2label[best_class]
        score = probabilities[best_class].item()
        logits = logits.tolist()
        return {"label": label, "score": score, "logits": logits}

该实现与框架无关,适用于PyTorch和TensorFlow模型。如果我们将其保存在名为pair_classification.py的文件中,我们可以像这样导入并注册它:

from pair_classification import PairClassificationPipeline
from transformers.pipelines import PIPELINE_REGISTRY
from transformers import AutoModelForSequenceClassification, TFAutoModelForSequenceClassification

PIPELINE_REGISTRY.register_pipeline(
    "pair-classification",
    pipeline_class=PairClassificationPipeline,
    pt_model=AutoModelForSequenceClassification,
    tf_model=TFAutoModelForSequenceClassification,
)

一旦完成,我们可以将其与预训练模型一起使用。例如,sgugger/finetuned-bert-mrpc已经在MRPC数据集上进行了微调,该数据集将句子对分类为是否互为释义。

from transformers import pipeline

classifier = pipeline("pair-classification", model="sgugger/finetuned-bert-mrpc")

然后我们可以使用push_to_hub方法将其分享到Hub上:

classifier.push_to_hub("test-dynamic-pipeline")

这将复制你在文件夹 "test-dynamic-pipeline" 中定义的 PairClassificationPipeline 文件, 同时保存管道的模型和分词器,然后将所有内容推送到仓库 {your_username}/test-dynamic-pipeline。之后,只要他们提供选项 trust_remote_code=True,任何人都可以使用它:

from transformers import pipeline

classifier = pipeline(model="{your_username}/test-dynamic-pipeline", trust_remote_code=True)

将管道添加到 🤗 Transformers

如果你想将你的管道贡献给🤗 Transformers,你需要在pipelines子模块中添加一个新模块,包含你的管道代码,然后将其添加到pipelines/__init__.py中定义的任务列表中。

然后你需要添加测试。创建一个新文件 tests/test_pipelines_MY_PIPELINE.py 并参考其他测试的示例。

run_pipeline_test 函数将非常通用,并在由 model_mappingtf_model_mapping 定义的每种可能的架构上运行小型随机模型。

这对于测试未来的兼容性非常重要,意味着如果有人为XXXForQuestionAnswering添加了一个新模型,那么管道测试将尝试在其上运行。由于模型是随机的,无法检查实际值,这就是为什么有一个辅助工具ANY,它将简单地尝试匹配管道类型的输出。

你还需要实现2个(最好是4个)测试。

  • test_small_model_pt : 为此管道定义一个小的模型(即使结果没有意义也没关系) 并测试管道输出。结果应与test_small_model_tf相同。
  • test_small_model_tf : 为此管道定义一个小的模型(即使结果没有意义也没关系) 并测试管道输出。结果应与test_small_model_pt相同。
  • test_large_model_pt (optional): 在实际的管道上测试管道,结果应该是有意义的。这些测试很慢,应该被标记为这样。这里的目的是展示管道,并确保在未来的版本中没有漂移。
  • test_large_model_tf (optional): 在实际的管道上测试管道,结果应该是有意义的。这些测试很慢,应该被标记为这样。这里的目的是展示管道,并确保在未来的版本中没有漂移。
< > Update on GitHub