Skip to main content

PipelineDecorator

使用函数装饰器创建管道

使用 PipelineDecorator 类从现有函数创建管道。使用 @PipelineDecorator.component 来标记构成管道步骤的函数,并使用 @PipelineDecorator.pipeline 来标记主管道执行逻辑函数。

@PipelineDecorator.pipeline

使用@PipelineDecorator.pipeline 装饰器将实现管道执行逻辑的函数转换为ClearML管道控制器, 一个独立执行的任务。

Multi-file Pipeline Implementation

如果你的管道是在多个文件中实现的,请确保在@PipelineDecorator.pipeline之前导入管道步骤实现(包含用@PipelineDecorator.component装饰的函数的文件)。

@PipelineDecorator.pipeline(
name='pipeline', project='examples', version='0.1',
args_map={'General':['pickle_url'], 'Mock':['mock_parameter']}
)
def main(pickle_url, mock_parameter='mock'):
data_frame = step_one(pickle_url)
X_train, X_test, y_train, y_test = step_two(data_frame)
model = step_three(X_train, y_train)
accuracy = 100 * step_four(model, X_data=X_test, Y_data=y_test)
print(f"Accuracy={accuracy}%")

参数

  • name - 管道控制器任务的名称
  • project - 存储管道控制器任务的ClearML项目
  • version - 编号版本字符串(例如,1.2.3)。如果未设置,则查找管道的最新版本并递增。如果未找到此类版本,则默认为1.0.0
  • default_queue - 默认的 ClearML 队列,用于将所有管道步骤加入队列(除非在管道步骤中另有指定)。
  • args_map - 将参数映射到其配置部分,格式如下:{'section_name':['param_name']]}。例如,上述代码中的管道将存储pickle_url参数在General部分,mock_parameterMock部分。默认情况下,参数将存储在Args部分。
  • pool_frequency - 监控实验/状态的轮询频率(以分钟为单位)。
  • add_pipeline_tags - 如果为True,则向此管道创建的所有步骤(任务)添加pipe: 标签 (这在具有多个管道的项目中创建更好的可见性以及便于选择时非常有用)(默认值: False)。
  • target_project - 如果提供,所有管道步骤都将克隆到目标项目中。如果未提供,管道步骤将存储在管道本身所在的同一项目中。目标子文件夹有助于更轻松地组织管道执行逻辑(管道任务)和步骤执行任务。示例:"pipeline/component_execution"。
  • abort_on_failure - 如果为False(默认值),失败的管道步骤不会导致管道立即停止。 相反,任何未连接(或间接连接)到失败步骤的步骤仍将被执行。 尽管如此,管道本身将被标记为失败(除非失败步骤特别定义为continue_on_fail=True)。如果为True,任何失败的步骤都将导致管道立即中止,停止所有正在运行的步骤,并将管道标记为失败。
  • pipeline_execution_queue - 用于将管道控制器任务排入的队列。默认值为services队列。要在组件远程执行时在本地运行管道逻辑,请传递pipeline_execution_queue=None
  • skip_global_imports – 如果为True,全局导入将不会包含在步骤的执行中。如果为False(默认值),所有全局导入将在每个步骤执行开始时自动导入。

当函数被调用时,会创建一个相应的ClearML控制器任务:其参数会被记录为任务的参数。当从UI启动新的管道运行时,您可以为新运行修改它们的值。

Pipeline 新运行

@PipelineDecorator.component

使用 @PipelineDecorator.component 装饰器将函数转换为 ClearML 管道步骤,当从管道控制器调用时。

当管道控制器调用管道步骤时,会创建一个相应的ClearML任务。

Package Imports

@PipelineDecorator.pipelineskip_global_imports参数设置为False的情况下,所有全局导入将在每个步骤执行开始时自动导入。否则,如果设置为True,请确保构成管道步骤的每个函数都包含包导入,这些导入将自动记录为管道执行步骤所需的包。

from clearml.automation.controller import PipelineDecorator

@PipelineDecorator.component(return_values=['data_frame'], cache=True)
def step_one(pickle_data_url: str, extra: int = 43):
import sklearn # noqa
import pickle
import pandas as pd
from clearml import StorageManager
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f:
iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
data_frame.columns += ['target']
data_frame['target'] = iris['target']
return data_frame

参数

  • return_values - 步骤对应的ClearML任务中用于存储步骤返回对象的工件名称。 在上面的示例中,返回了一个对象并存储为名为data_frame的工件。

  • name (可选) - 管道步骤的名称。如果未提供,则使用函数名称

  • cache - 如果为True,管道控制器会检查是否已经执行了具有相同代码(包括设置,请参见任务执行部分)和输入参数的步骤。如果找到,则使用缓存的步骤输出,而不是重新运行该步骤。

  • packages - 所需包的列表或本地的requirements.txt文件。示例:["tqdm>=2.1", "scikit-learn"]"./requirements.txt"。如果未提供,将根据函数内部使用的导入自动添加包。

  • execution_queue(可选)- 用于将特定步骤加入队列的队列。这将覆盖使用 PipelineDecorator.set_default_execution_queue 方法 设置的队列。

  • continue_on_fail - 如果为True,失败的步骤不会导致管道停止(或标记为失败)。请注意,连接到(或间接连接到)失败步骤的步骤将被跳过(默认为False

  • docker - 指定在远程执行流水线步骤时要使用的Docker镜像

  • docker_args - 为远程执行添加Docker执行参数(使用单个字符串表示所有Docker参数)。

  • docker_bash_setup_script - 在设置任务环境之前,添加一个要在docker内部执行的bash脚本

  • task_type (可选) - 要创建的任务类型

  • repo(可选)- 指定一个仓库以在远程执行时附加到函数。允许用户在指定的仓库内执行函数,使他们能够从仓库加载模块/脚本。注意,执行工作目录将是仓库的根文件夹。支持git仓库URL链接和本地仓库路径(自动转换为远程git/提交,如当前检出的那样)。

    • 示例:
      • 远程URL: "https://github.com/user/repo.git"
      • 本地仓库副本: "./repo" -> 将自动存储远程仓库URL和基于本地克隆副本的提交ID
  • repo_branch(可选)- 指定远程仓库分支(如果使用本地仓库路径,则忽略)

  • repo_commit(可选)- 指定仓库提交ID(如果使用本地仓库路径,则忽略)

  • helper_functions(可选)- 一个辅助函数列表,用于使独立管道步骤可用。默认情况下,管道步骤函数无法访问任何其他函数,通过在此处指定额外的函数,远程管道步骤可以调用这些额外的函数。 例如,假设你有两个函数,parse_data()load_data()[parse_data, load_data]

  • parents(可选)- 管道中父步骤的列表。只有在所有父步骤成功执行后,管道中的当前步骤才会被发送执行。

  • retry_on_failure - 失败时重试步骤的次数。您也可以按照以下格式输入一个可调用函数:

    def example_retry_on_failure_callback(pipeline, node, retries):
    print(node.name, ' failed')
    # allow up to 5 retries (total of 6 runs)
    return retries

    回调函数接受以下参数:

    • PipelineController instance
    • PipelineController.Node that failed
    • Number of times to retry the step if it fails

    该函数必须返回一个布尔值。如果返回True,节点将被重试,并且重试次数减1。如果函数返回False,节点将不会被重试。

  • 回调 - 使用回调函数控制管道执行流程

    • pre_execute_callback and post_execute_callback - Control pipeline flow with callback functions that can be called before and/or after a step's execution. See here.
    • status_change_callback - Callback function called when the status of a step changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look like this:
      def status_change_callback(
      pipeline, # type: PipelineController,
      node, # type: PipelineController.Node,
      previous_status # type: str
      ):
      pass

此外,您可以使用以下参数启用步骤的指标/工件/模型到管道任务的自动日志记录:

  • monitor_metrics(可选)- 自动将步骤报告的指标也记录在管道任务上。预期的格式是以下之一:
    • 要记录的指标(标题,系列)对列表:[(step_metric_title,step_metric_series),]。示例:[('test', 'accuracy'), ]
    • 元组对列表,用于指定在管道任务上使用的不同目标指标:[((step_metric_title,step_metric_series),(target_metric_title,target_metric_series)),]。示例:[[('test', 'accuracy'), ('model', 'accuracy')], ]
  • monitor_artifacts(可选)- 自动记录管道任务上的步骤工件。
    • 提供了由步骤函数创建的工件名称列表,这些工件也将自动记录在管道任务本身上。示例:['processed_data', ](管道任务上的目标工件名称将与原始工件名称相同)。
    • 或者,提供一对(source_artifact_name, target_artifact_name)的列表,其中第一个字符串是组件任务上显示的工件名称,第二个是要放在管道任务上的目标工件名称。示例:[('processed_data', 'final_processed_data'), ]
  • monitor_models (可选) - 自动记录管道任务中步骤的输出模型。
    • 提供由步骤任务创建的模型名称列表,它们也将出现在管道本身上。示例:['model_weights', ]
    • 要选择最新的(字典序)模型,请使用model_*,或使用*选择最后创建的模型。示例:['model_weights_*', ]
    • 或者,提供一对(source_model_name, target_model_name)的列表,其中第一个字符串是组件任务上显示的模型名称,第二个是要放在管道任务上的目标模型名称。示例:[('model_weights', 'final_model_weights'), ]

您还可以使用以下参数控制管道组件的自动日志记录:

  • auto_connect_frameworks - 控制组件的框架日志记录。您可以完全禁用框架日志记录,或指定要记录的框架。请参阅 Task.initauto_connect_framework 参数
  • auto_connect_arg_parser - 控制argparse对象的自动日志记录。参见Task.initauto_connect_arg_parser 参数

你也可以直接从步骤上传模型或工件到管道控制器,使用 PipelineDecorator.upload_modelPipelineDecorator.upload_artifact 方法分别。

控制管道执行

默认执行队列

PipelineDecorator.set_default_execution_queue 方法允许您设置一个默认队列,所有管道步骤都将通过该队列执行。一旦设置,可以通过 @PipelineDecorator.component 装饰器指定特定步骤的覆盖。

运行管道

要运行管道,请调用管道控制器函数。

ClearML 管道可以在以下模式之一中运行:

远程模式

远程模式是管道控制器的默认模式。在此模式下,管道控制器逻辑在服务队列上执行,所有管道步骤都在其各自的队列上远程启动。

示例:

if __name__ == '__main__':
executing_pipeline(pickle_url='https://example.com/iris_dataset.pkl')
print('pipeline completed')
RUN PIPELINE CONTROLLER LOCALLY

您可以在本地运行管道逻辑,同时保持管道组件的执行在远程进行(由clearml-agent排队和执行)。将pipeline_execution_queue=None传递给@PipelineDecorator.pipeline装饰器。

@PipelineDecorator.pipeline(
name='custom pipeline logic', project='examples', version='0.0.5', pipeline_execution_queue=None
)

调试模式

在调试模式下,管道控制器和所有组件被视为常规的Python函数,组件被同步调用。此模式非常适合调试组件和设计管道,因为整个管道在开发人员的机器上执行,具有完全调试每个函数调用的能力。在主管道逻辑函数调用之前调用PipelineDecorator.debug_pipeline

示例:

if __name__ == '__main__':
PipelineDecorator.debug_pipeline()
executing_pipeline(pickle_url='https://example.com/iris_dataset.pkl')
print('pipeline completed')

本地模式

在本地模式下,管道控制器为每个组件创建任务,组件函数调用被转换为在同一台机器上运行的子进程。请注意,数据在组件和逻辑之间的传递机制与远程模式完全相同(即超参数/工件),唯一的区别是执行本身是本地进行的。请注意,每个子进程使用与主管道逻辑完全相同的Python环境。在主管道逻辑函数之前调用PipelineDecorator.run_locally

示例:

if __name__ == '__main__':
PipelineDecorator.run_locally()
executing_pipeline(pickle_url='https://example.com/iris_dataset.pkl')
print('pipeline completed')