如何创建自定义管道?
在本指南中,我们将了解如何创建自定义管道并将其分享到Hub或添加到🤗 Transformers库中。
首先,你需要决定管道能够接受的原始输入。它可以是字符串、原始字节、字典或任何看起来最有可能的输入。尽量保持这些输入尽可能纯Python,因为这使得兼容性更容易(即使通过其他语言通过JSON)。这些将是管道的inputs
(preprocess
)。
然后定义outputs
。与inputs
相同的策略。越简单越好。这些将是postprocess
方法的输出。
首先继承基类 Pipeline
,并实现所需的4个方法:preprocess
、_forward
、postprocess
和 _sanitize_parameters
。
from transformers import Pipeline
class MyPipeline(Pipeline):
def _sanitize_parameters(self, **kwargs):
preprocess_kwargs = {}
if "maybe_arg" in kwargs:
preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
return preprocess_kwargs, {}, {}
def preprocess(self, inputs, maybe_arg=2):
model_input = Tensor(inputs["input_ids"])
return {"model_input": model_input}
def _forward(self, model_inputs):
# model_inputs == {"model_input": model_input}
outputs = self.model(**model_inputs)
# Maybe {"logits": Tensor(...)}
return outputs
def postprocess(self, model_outputs):
best_class = model_outputs["logits"].softmax(-1)
return best_class
这种分解结构的目的是为了相对无缝地支持CPU/GPU,同时支持在不同的线程上在CPU上进行预处理/后处理。
preprocess
将获取最初定义的输入,并将它们转换为可以输入到模型中的内容。它可能包含更多信息,通常是一个 Dict
。
_forward
是实现细节,不打算直接调用。forward
是首选的调用方法,因为它包含确保一切在预期设备上工作的保护措施。如果任何内容链接到真实模型,它属于 _forward
方法,其他内容则在预处理/后处理中。
postprocess
方法将获取 _forward
的输出并将其转换为之前决定的最终输出。
_sanitize_parameters
的存在是为了允许用户在任何时候传递任何参数,无论是在初始化时 pipeline(...., maybe_arg=4)
还是在调用时 pipe = pipeline(...); output = pipe(...., maybe_arg=4)
。
_sanitize_parameters
的返回值是3个kwargs字典,它们将直接传递给 preprocess
、_forward
和 postprocess
。如果调用者没有传递任何额外参数,请不要填充任何内容。这样可以保留函数定义中的默认参数,这通常更“自然”。
一个经典的例子是分类任务后处理中的top_k
参数。
>>> pipe = pipeline("my-new-task")
>>> pipe("This is a test")
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}, {"label": "3-star", "score": 0.05}
{"label": "4-star", "score": 0.025}, {"label": "5-star", "score": 0.025}]
>>> pipe("This is a test", top_k=2)
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}]
为了实现这一点,我们将更新我们的postprocess
方法,添加一个默认参数为5
,并编辑_sanitize_parameters
以允许这个新参数。
def postprocess(self, model_outputs, top_k=5):
best_class = model_outputs["logits"].softmax(-1)
# Add logic to handle top_k
return best_class
def _sanitize_parameters(self, **kwargs):
preprocess_kwargs = {}
if "maybe_arg" in kwargs:
preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
postprocess_kwargs = {}
if "top_k" in kwargs:
postprocess_kwargs["top_k"] = kwargs["top_k"]
return preprocess_kwargs, {}, postprocess_kwargs
尽量保持输入/输出非常简单,最好是JSON可序列化的,因为这样可以使管道的使用非常容易,而不需要用户理解新的对象类型。为了便于使用,支持许多不同类型的参数也是相对常见的(例如音频文件,可以是文件名、URL或纯字节)。
将其添加到支持的任务列表中
要将您的new-task
注册到支持的任务列表中,您需要将其添加到PIPELINE_REGISTRY
中:
from transformers.pipelines import PIPELINE_REGISTRY
PIPELINE_REGISTRY.register_pipeline(
"new-task",
pipeline_class=MyPipeline,
pt_model=AutoModelForSequenceClassification,
)
如果您愿意,可以指定一个默认模型,在这种情况下,它应该附带一个特定的修订版本(可以是分支名称或提交哈希,这里我们使用了"abcdef"
)以及类型:
PIPELINE_REGISTRY.register_pipeline(
"new-task",
pipeline_class=MyPipeline,
pt_model=AutoModelForSequenceClassification,
default={"pt": ("user/awesome_model", "abcdef")},
type="text", # current support type: text, audio, image, multimodal
)
在Hub上分享你的管道
要在Hub上分享您的自定义管道,您只需将Pipeline
子类的自定义代码保存在一个python文件中。例如,假设我们想使用一个自定义管道来进行句子对分类,如下所示:
import numpy as np
from transformers import Pipeline
def softmax(outputs):
maxes = np.max(outputs, axis=-1, keepdims=True)
shifted_exp = np.exp(outputs - maxes)
return shifted_exp / shifted_exp.sum(axis=-1, keepdims=True)
class PairClassificationPipeline(Pipeline):
def _sanitize_parameters(self, **kwargs):
preprocess_kwargs = {}
if "second_text" in kwargs:
preprocess_kwargs["second_text"] = kwargs["second_text"]
return preprocess_kwargs, {}, {}
def preprocess(self, text, second_text=None):
return self.tokenizer(text, text_pair=second_text, return_tensors=self.framework)
def _forward(self, model_inputs):
return self.model(**model_inputs)
def postprocess(self, model_outputs):
logits = model_outputs.logits[0].numpy()
probabilities = softmax(logits)
best_class = np.argmax(probabilities)
label = self.model.config.id2label[best_class]
score = probabilities[best_class].item()
logits = logits.tolist()
return {"label": label, "score": score, "logits": logits}
该实现与框架无关,适用于PyTorch和TensorFlow模型。如果我们将其保存在名为pair_classification.py
的文件中,我们可以像这样导入并注册它:
from pair_classification import PairClassificationPipeline
from transformers.pipelines import PIPELINE_REGISTRY
from transformers import AutoModelForSequenceClassification, TFAutoModelForSequenceClassification
PIPELINE_REGISTRY.register_pipeline(
"pair-classification",
pipeline_class=PairClassificationPipeline,
pt_model=AutoModelForSequenceClassification,
tf_model=TFAutoModelForSequenceClassification,
)
一旦完成,我们可以将其与预训练模型一起使用。例如,sgugger/finetuned-bert-mrpc
已经在MRPC数据集上进行了微调,该数据集将句子对分类为是否互为释义。
from transformers import pipeline
classifier = pipeline("pair-classification", model="sgugger/finetuned-bert-mrpc")
然后我们可以使用push_to_hub
方法将其分享到Hub上:
classifier.push_to_hub("test-dynamic-pipeline")
这将复制你在文件夹 "test-dynamic-pipeline"
中定义的 PairClassificationPipeline
文件,
同时保存管道的模型和分词器,然后将所有内容推送到仓库
{your_username}/test-dynamic-pipeline
。之后,只要他们提供选项
trust_remote_code=True
,任何人都可以使用它:
from transformers import pipeline
classifier = pipeline(model="{your_username}/test-dynamic-pipeline", trust_remote_code=True)
将管道添加到 🤗 Transformers
如果你想将你的管道贡献给🤗 Transformers,你需要在pipelines
子模块中添加一个新模块,包含你的管道代码,然后将其添加到pipelines/__init__.py
中定义的任务列表中。
然后你需要添加测试。创建一个新文件 tests/test_pipelines_MY_PIPELINE.py
并参考其他测试的示例。
run_pipeline_test
函数将非常通用,并在由 model_mapping
和 tf_model_mapping
定义的每种可能的架构上运行小型随机模型。
这对于测试未来的兼容性非常重要,意味着如果有人为XXXForQuestionAnswering
添加了一个新模型,那么管道测试将尝试在其上运行。由于模型是随机的,无法检查实际值,这就是为什么有一个辅助工具ANY
,它将简单地尝试匹配管道类型的输出。
你还需要实现2个(最好是4个)测试。
test_small_model_pt
: 为此管道定义一个小的模型(即使结果没有意义也没关系) 并测试管道输出。结果应与test_small_model_tf
相同。test_small_model_tf
: 为此管道定义一个小的模型(即使结果没有意义也没关系) 并测试管道输出。结果应与test_small_model_pt
相同。test_large_model_pt
(optional
): 在实际的管道上测试管道,结果应该是有意义的。这些测试很慢,应该被标记为这样。这里的目的是展示管道,并确保在未来的版本中没有漂移。test_large_model_tf
(optional
): 在实际的管道上测试管道,结果应该是有意义的。这些测试很慢,应该被标记为这样。这里的目的是展示管道,并确保在未来的版本中没有漂移。