Skip to main content

PipelineController

class PipelineController()

管道控制器。 管道是一个基础任务的有向无环图(DAG),每个任务将被克隆(根据需要更改参数)、执行和监控。 管道进程(任务)本身可以手动执行或由clearml-agent服务队列执行。 注意:管道控制器的生命周期与管道本身的执行时间相同。

创建一个新的管道控制器。新创建的对象将启动并监控新的实验。

  • 参数

    • name (str) – 提供管道名称(如果存在主任务,它将覆盖其名称)

    • 项目 (str) – 提供存储管道的项目(如果存在主任务,它将覆盖其项目)

    • 版本 (Optional[str]) – 管道版本。此版本允许唯一标识管道模板执行。语义版本的示例:version=’1.0.1’ , version=’23’, version=’1.2’. 如果未设置,则查找管道的最新版本并递增。如果未找到此类版本,则默认为‘1.0.0’

    • pool_frequency (float ) – 监控实验/状态的池化频率(以分钟为单位)。

    • add_pipeline_tags (bool ) – (默认: False) 如果为 True,则为由此管道创建的所有步骤(任务)添加 pipe: 标签。

    • target_project (str ) – 如果提供,所有管道步骤将被克隆到目标项目中。 如果为True,管道步骤将被存储到管道项目中

    • auto_version_bump (bool ) – (已弃用) 如果为True,如果相同的管道版本已经存在 (与当前版本有任何差异),当前管道版本将升级到一个新版本 版本升级示例:1.0.0 -> 1.0.1 , 1.2 -> 1.3, 10 -> 11 等。

    • abort_on_failure (bool ) – 如果为False(默认值),失败的管道步骤不会导致管道立即停止,而是任何未连接到(或间接连接到)失败步骤的步骤仍将被执行。尽管如此,管道本身将被标记为失败,除非失败步骤特别定义为“continue_on_fail=True”。 如果为True,任何失败的步骤将导致管道立即中止,停止所有正在运行的步骤,并将管道标记为失败。

    • add_run_number (bool) – 如果为True(默认),将管道的运行编号添加到管道名称中。 例如,第二次启动管道“best pipeline”时,我们将其重命名为“best pipeline #2”

    • retry_on_failure (Union[int, Callable[[PipelineController, Node, int], bool], None]) – 整数(重试次数)或返回True以允许重试的回调函数

      • 整数:在节点故障的情况下,按照此参数指定的次数重试该节点。

      • 可调用:在节点失败时调用的函数。参数如下:

      PipelineController实例,失败的PipelineController.Node以及一个表示失败节点先前重试次数的整数。 如果节点应重试,则函数必须返回True,否则返回False。 如果为True,节点将重新排队,剩余的重试次数将减少1。 默认情况下,如果未指定此回调,函数将按照retry_on_failure指示的次数重试。

      def example_retry_on_failure_callback(pipeline, node, retries):
      print(node.name, ' failed')
      # allow up to 5 retries (total of 6 runs)
      return retries < 5
    • docker (Optional[str]) – 选择要在远程会话中执行的docker镜像

    • docker_args (Optional[str]) – 添加docker参数,传递一个单独的字符串

    • docker_bash_setup_script (Optional[str]) – 添加要在设置任务环境之前在docker内部执行的bash脚本

    • packages (Union[bool, str, Sequence[str], None]) – 手动指定所需的包列表或本地的requirements.txt文件。 示例:[“tqdm>=2.1”, “scikit-learn”] 或 “./requirements.txt” 如果未提供,包将自动添加。 使用False从您的git仓库中的“requirements.txt”安装需求

    • repo (Optional[str]) – 可选,指定一个仓库附加到管道控制器,当远程执行时。 允许用户在指定的仓库内执行控制器,使他们能够从仓库加载模块/脚本。 注意执行工作目录将是仓库的根文件夹。 支持git仓库的url链接,以及本地仓库路径(自动转换为远程git/提交,如当前检出的)。 示例远程url: ‘https://github.com/user/repo.git’ 示例本地仓库副本: ‘./repo’ -> 将自动存储远程仓库url和基于本地克隆副本的提交ID 使用空字符串(“”)来禁用任何仓库的自动检测

    • repo_branch (Optional[str]) – 可选,指定远程仓库分支(如果使用本地仓库路径,则忽略)

    • repo_commit (Optional[str]) – 可选,指定仓库提交ID(如果使用本地仓库路径,则忽略)

    • always_create_from_code (bool) – 如果为True(默认),管道始终从代码构建, 如果为False,管道将从管道任务本身的管道配置部分生成。 这允许在不更改原始代码库的情况下编辑(也可以添加/删除)管道步骤。

    • artifact_serialization_function (Optional[Callable[[Any], Union[bytes, bytearray]]]) – 一个序列化函数,它接受一个任意类型的参数,该参数是要序列化的对象。该函数应返回一个表示序列化对象的 bytes 或 bytearray 对象。管道上传的所有参数/返回工件都将使用此函数进行序列化。所有相关的导入必须在此函数中完成。例如:

    def serialize(obj):
    import dill
    return dill.dumps(obj)
    • artifact_deserialization_function (Optional[Callable[[bytes], Any]]) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:
    def deserialize(bytes_):
    import dill
    return dill.loads(bytes_)
    • output_uri (Union[str, bool, None]) – 此管道的存储/输出URL。这是输出模型和其他工件的默认位置。有关更多信息,请查看Task.init参考文档(output_uri是一个参数)。此管道步骤的output_uri将默认为此值。

    • skip_global_imports (bool) – 如果为True,在从函数创建步骤时,全局导入将不会包含在步骤的执行中,否则所有全局导入将在每个步骤执行开始时以安全的方式自动导入。默认值为False

    • working_dir (Optional[str]) – 启动管道的工作目录。

    • enable_local_imports (bool) – 如果为True,允许管道步骤通过将管道控制器脚本所在的目录附加到每个步骤的PYTHONPATH(sys.path[0])来从本地文件导入。 如果为False,该目录将不会附加到PYTHONPATH。默认值为True。 在远程运行时忽略。


添加函数步骤

add_function_step(name, function, function_kwargs=None, function_return=None, project_name=None, task_name=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, packages=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, docker=None, docker_args=None, docker_bash_setup_script=None, parents=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, retry_on_failure=None, status_change_callback=None, tags=None, output_uri=None, draft=False, working_dir=None, continue_behaviour=None)

从函数创建任务,包括将函数输入参数包装到超参数部分作为kwargs,并将函数结果存储为命名的工件

示例:

def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2

create_task_from_function(mock_func, function_return=['mul', 'square'])

来自其他任务的示例参数(artifact):

def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c

create_task_from_function(
mock_func,
function_kwargs={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
  • 参数

    • name (str ) – 步骤的唯一标识。例如 stage1

    • function (Callable ) – 一个全局函数,用于转换为独立的Task

    • function_kwargs (可选 [ 字典 [ 字符串 *, * 任意 ] ] ) – 可选,提供函数参数和默认值的子集以暴露。 如果未提供,则自动获取所有函数参数和默认值 可选,从其他任务的输出工件中传递输入参数给函数。 例如,从任务ID aabbcc的工件名称answer中传递名为numpy_matrix的参数: {'numpy_matrix': 'aabbcc.answer'}

    • function_return (可选 [ 列表 [ 字符串 ] ] ) – 提供所有结果的名称列表。 如果未提供,则不会将结果存储为工件。

    • project_name (可选 [ str ] ) – 设置任务的名称。如果 base_task_id 为 None,则必须设置。

    • task_name (可选 [ str ] ) – 设置远程任务的名称,如果未提供,则使用名称参数。

    • task_type (可选 [ str ] ) – 可选,要创建的任务类型。支持的值:'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'

    • auto_connect_frameworks (可选 [ 字典 ] ) – 控制框架的自动连接,参见 Task.init auto_connect_frameworks

    • auto_connect_arg_parser (可选 [ 字典 ] ) – 控制ArgParser的自动连接,参见Task.init auto_connect_arg_parser

    • packages (可选 [ Union [ bool *, * str *, * Sequence [ str ] ] ] ) – 手动指定所需的包列表或本地的requirements.txt文件。 示例:[“tqdm>=2.1”, “scikit-learn”] 或 “./requirements.txt” 如果未提供,将根据函数中使用的导入自动添加包。 使用False从git仓库中的“requirements.txt”安装需求

    • repo (可选 [ str ] ) – 可选,指定一个仓库附加到函数上,当远程执行时。 允许用户在指定的仓库内执行函数,从而能够从仓库加载模块/脚本 注意执行工作目录将是仓库的根文件夹。 支持git仓库的url链接,以及本地仓库路径。 示例远程url: ‘https://github.com/user/repo.git’ 示例本地仓库副本: ‘./repo’ -> 将自动存储远程仓库的url和基于本地克隆副本的提交ID

    • repo_branch (可选 [ str ] ) – 可选,指定远程仓库分支(如果使用本地仓库路径,则忽略)

    • repo_commit (可选 [ str ] ) – 可选,指定仓库提交ID(如果使用本地仓库路径,则忽略)

    • helper_functions (可选 [ 序列 [ 可调用 ] ] ) – 可选,一个辅助函数列表,用于独立的函数任务。

    • docker (可选 [ str ] ) – 选择要在远程会话中执行的docker镜像

    • docker_args (可选 [ str ] ) – 添加docker参数,传递一个字符串

    • docker_bash_setup_script (可选 [ str ] ) – 添加要在设置任务环境之前在docker内部执行的bash脚本

    • parents (可选 [ 序列 [ str ] ] ) – DAG 中父节点的可选列表。 只有在所有父节点成功执行后,管道中的当前步骤才会被发送执行。

    • execution_queue (可选 [ str ] ) – 可选,用于执行此特定步骤的队列。 如果未提供,任务将被发送到默认的执行队列,如类中所定义。

    • monitor_metrics (可选 [ 列表 [ 联合 [ 元组 [ str *, * str ] *, * 元组 [ ( str *, * str ) *, * ( str *, * str ) ] ] ] ] ) – 可选,记录管道任务步骤的指标。 格式是要记录的指标对(标题,系列)的列表:

    [(步骤指标标题, 步骤指标系列), ] 示例: [(‘test’, ‘accuracy’), ]

    或者是一个元组对的列表,用于指定在管道任务上使用的不同目标指标:

    [((步骤指标标题, 步骤指标系列), (目标指标标题, 目标指标系列)), ] 示例: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]

    • monitor_artifacts (可选 [ 列表 [ 联合 [ str *, * 元组 [ str *, * str ] ] ] ] ) – 可选,记录管道任务中步骤的工件。 提供了步骤任务中存在的工件名称列表,它们也将出现在管道本身上。 示例:[(‘processed_data’, ‘final_processed_data’), ] 或者用户也可以提供一个要监控的工件列表 (目标工件名称将与原始工件名称相同) 示例:[‘processed_data’, ]

    • monitor_models (可选 [ 列表 [ 联合 [ 字符串 *, * 元组 [ 字符串 *, * 字符串 ] ] ] ] ) – 可选,记录管道任务上步骤的输出模型。 提供步骤任务上存在的模型名称列表,它们也将出现在管道本身上。 示例:[(‘model_weights’, ‘final_model_weights’), ] 或者用户也可以提供一个要监控的模型列表 (目标模型名称将与原始模型相同) 示例:[‘model_weights’, ] 要选择最新的(字典序)模型,请使用“model_*”,或者使用“*”选择最后创建的模型 示例: [‘model_weights_*’, ]

    • time_limit (可选 [ float ] ) – 默认值为 None,表示没有时间限制。 步骤执行时间限制,如果超过此限制,任务将被中止,管道将停止并标记为失败。

    • continue_on_fail (bool ) – (已弃用,请使用 continue_behaviour 代替)。 如果为 True,失败的步骤不会导致管道停止(或标记为失败)。请注意,连接到(或间接连接到)失败步骤的步骤将被跳过。默认为 False

    • pre_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node *, * dict ] *, * bool ] *] * # noqa ) – 回调函数,在步骤(任务)创建时调用,并在发送执行之前调用。允许用户在启动前修改任务。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 参数是传递给 ClearmlJob 的配置参数。

    如果回调返回值为False,则该节点将被跳过,依赖于此节点的DAG中的任何节点也将被跳过。

    请注意参数已经被解析, 例如 ${step1.parameters.Args/param} 已被替换为相关值。

    def step_created_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    parameters, # type: dict
    ):
    pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node ] *, * None ] *] * # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
    def step_completed_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    ):
    pass
    • cache_executed_step (bool ) – 如果为True,在启动新步骤之前, 在更新最新配置后,检查是否已经执行了具有相同参数/代码的完全相同的任务。如果找到,则使用它而不是启动新任务。 默认值:False,始终使用base_task的新克隆副本。 注意:如果git仓库引用没有特定的提交ID,则永远不会使用该任务。

    • retry_on_failure (可选 [ 联合 [ 整数 *, * 可调用 [ [ PipelineController *, * PipelineController.Node *, * 整数 ] *, * 布尔值 ] ] *] * # noqa ) – 整数(重试次数)或返回True以允许重试的回调函数

      • 整数:在节点故障的情况下,按照此参数指定的次数重试该节点。

      • 可调用:在节点失败时调用的函数。参数如下:

      PipelineController实例,失败的PipelineController.Node以及一个表示失败节点先前重试次数的整数。 如果节点应重试,则函数必须返回True,否则返回False。 如果为True,节点将重新排队,剩余的重试次数将减少1。 默认情况下,如果未指定此回调,函数将按照retry_on_failure指示的次数重试。

      def example_retry_on_failure_callback(pipeline, node, retries):
      print(node.name, ' failed')
      # allow up to 5 retries (total of 6 runs)
      return retries < 5
    • status_change_callback (可选 [ 可调用 [ [ PipelineController *, * PipelineController.Node *, * str ] *, * None ] *] * # noqa ) – 回调函数,当步骤(任务)的状态发生变化时调用。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 函数的签名必须如下所示:

    def status_change_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    previous_status # type: str
    ):
    pass
    • tags (可选 [ 联合 [ str *, * 序列 [ str ] ] ] ) – 特定管道步骤的标签列表。 当远程执行管道时 (即从UI启动管道/将其加入队列),此方法无效。

    • output_uri (可选 [ 联合 [ str *, * bool ] ] ) – 此步骤的存储/输出URL。这是输出模型和其他工件的默认位置。有关更多信息,请查看Task.init参考文档(output_uri是一个参数)。

    • 草稿 (可选 [ 布尔值 ] ) – (默认 False). 如果为 True,任务将作为草稿任务创建。

    • working_dir (可选 [ str ] ) – 启动脚本的工作目录。

    • continue_behaviour (可选 [ 字典 ] ) – 控制管道在步骤失败/中止后是否继续运行。可以使用布尔选项的字典来设置不同的行为。支持的选项有:

    • continue_on_fail - If True, the pipeline will continue even if the step failed.

    如果为False,管道将停止

    • continue_on_abort - If True, the pipeline will continue even if the step was aborted.

    如果为False,管道将停止

    • skip_children_on_fail - If True, the children of this step will be skipped if it failed.

    如果为False,即使此步骤失败,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    • skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.

    如果为False,即使此步骤被中止,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    如果字典中不存在这些键,它们的值将默认为 True

  • 返回类型

    布尔

  • 返回

    如果成功则为真


add_parameter

add_parameter(名称, 默认值=None, 描述=None, 参数类型=None)

向管道任务添加一个参数。 该参数可以用作管道中任何步骤的输入参数。 注意所有参数将出现在PipelineController任务的高级参数 -> 管道部分 示例:pipeline.add_parameter(name=’dataset’, description=’要处理管道的数据集ID’) 然后在其中一个步骤中,我们可以使用'${pipeline.dataset}'来引用参数的值

  • 参数

    • name (str) – 参数的字符串名称。

    • default (Optional[Any]) – 默认值,将作为默认值设置(可以在UI中稍后更改)

    • 描述 (Optional[str]) – 参数的字符串描述及其在管道中的使用

    • param_type (Optional[str]) – 可选,参数类型信息(用于提示类型转换和描述)

  • 返回类型

    None


添加步骤

add_step(名称, 基础任务ID=无, 父任务=无, 参数覆盖=无, 配置覆盖=无, 任务覆盖=无, 执行队列=无, 监控指标=无, 监控工件=无, 监控模型=无, 时间限制=无, 基础任务项目=无, 基础任务名称=无, 克隆基础任务=真, 失败继续=假, 预执行回调=无, 后执行回调=无, 缓存执行步骤=假, 基础任务工厂=无, 失败重试=无, 状态变更回调=无, 递归解析参数=假, 输出URI=无, 继续行为=无)

向管道执行DAG添加一个步骤。 每个步骤必须具有唯一的名称(此名称稍后将用于寻址步骤)

  • 参数

    • name (str ) – 步骤的唯一标识。例如 stage1

    • base_task_id (可选 [ str ] ) – 用于步骤的任务ID。每次执行步骤时, 基础任务会被克隆,然后克隆的任务将被发送执行。

    • parents (可选 [ 序列 [ str ] ] ) – DAG 中父节点的可选列表。 只有在所有父节点成功执行后,管道中的当前步骤才会被发送执行。

    • parameter_override (可选 [ 映射 [ str *, * 任意 ] ] ) – 可选的参数覆盖字典。

      字典值可以使用以下形式引用先前执行的步骤 '${step_name}'。示例:

      • 工件访问 parameter_override={'Args/input_file': '${<step_name>.artifacts.<artifact_name>.url}' }

      • 模型访问(最后使用的模型) parameter_override={'Args/input_file': '${<step_name>.models.output.-1.url}' }

      • 参数访问 parameter_override={'Args/input_file': '${<step_name>.parameters.Args/input_file}' }

      • 管道任务参数(参见 Pipeline.add_parameter)parameter_override={'Args/input_file': '${pipeline.<pipeline_parameter>}' }

      • 任务ID parameter_override={'Args/input_file': '${stage3.id}' }

    • recursively_parse_parameters (bool ) – 如果为True,则递归地从parameter_override中的列表、字典或元组中解析参数。 示例:

      • parameter_override={'Args/input_file': ['${&lt;step_name&gt;.artifacts.&lt;artifact_name&gt;.url}', 'file2.txt']} will be correctly parsed.
        • parameter_override={'Args/input_file': ('${.parameters.Args/input_file}', '${.parameters.Args/input_file}')} 将被正确解析。

      • configuration_overrides (可选 [ 映射 [ str *, * 联合 [ str *, * 映射 ] ] ] ) – 可选,覆盖任务配置对象。 预期的配置对象名称和配置对象内容的字典。 示例:

    {'General': dict(key='value')} {'General': '配置文件内容'} {'OmegaConf': YAML.dumps(full_hydra_dict)}

    • task_overrides (可选 [ 映射 [ str *, * 任意 ] ] ) – 可选的任务部分覆盖字典。

      字典值可以使用以下形式引用先前执行的步骤 '${step_name}'。示例:

      • 从特定分支获取最新提交 task_overrides={'script.version_num': '', 'script.branch': 'main'}

      • 将git仓库分支匹配到上一步 task_overrides={'script.branch': '${stage1.script.branch}', 'script.version_num': ''}

      • 更改容器镜像 task_overrides={'container.image': 'nvidia/cuda:11.6.0-devel-ubuntu20.04', 'container.arguments': '--ipc=host'}

      • 将容器镜像与上一步匹配 task_overrides={'container.image': '${stage1.container.image}'}

      • 重置需求(代理将使用仓库内的“requirements.txt”) task_overrides={'script.requirements.pip': ""}

    • execution_queue (可选 [ str ] ) – 可选,用于执行此特定步骤的队列。 如果未提供,任务将被发送到默认的执行队列,如类中所定义。

    • monitor_metrics (可选 [ 列表 [ 联合 [ 元组 [ str *, * str ] *, * 元组 [ ( str *, * str ) *, * ( str *, * str ) ] ] ] ] ) – 可选,记录管道任务步骤的指标。 格式是要记录的指标对(标题,系列)的列表:

    [(步骤指标标题, 步骤指标系列), ] 示例: [(‘test’, ‘accuracy’), ]

    或者是一个元组对的列表,用于指定在管道任务上使用的不同目标指标:

    [((步骤指标标题, 步骤指标系列), (目标指标标题, 目标指标系列)), ] 示例: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]

    • monitor_artifacts (可选 [ 列表 [ 联合 [ str *, * 元组 [ str *, * str ] ] ] ] ) – 可选,记录管道任务中步骤的工件。 提供了步骤任务中存在的工件名称列表,它们也将出现在管道本身上。 示例:[(‘processed_data’, ‘final_processed_data’), ] 或者用户也可以提供一个要监控的工件列表 (目标工件名称将与原始工件名称相同) 示例:[‘processed_data’, ]

    • monitor_models (可选 [ 列表 [ 联合 [ 字符串 *, * 元组 [ 字符串 *, * 字符串 ] ] ] ] ) – 可选,记录管道任务上步骤的输出模型。 提供步骤任务上存在的模型名称列表,它们也将出现在管道本身上。 示例:[(‘model_weights’, ‘final_model_weights’), ] 或者用户也可以提供一个要监控的模型列表 (目标模型名称将与原始模型相同) 示例:[‘model_weights’, ] 要选择最新的(字典序)模型,请使用“model_*”,或者使用“*”选择最后创建的模型 示例: [‘model_weights_*’, ]

    • time_limit (可选 [ float ] ) – 默认值为 None,表示没有时间限制。 步骤执行时间限制,如果超过此限制,任务将被中止,管道将停止并标记为失败。

    • base_task_project (可选 [ str ] ) – 如果未提供 base_task_id, 则使用 base_task_project 和 base_task_name 组合来检索用于步骤的 base_task_id。

    • base_task_name (可选 [ str ] ) – 如果未提供 base_task_id, 则使用 base_task_project 和 base_task_name 组合来检索用于步骤的 base_task_id。

    • clone_base_task (bool ) – 如果为 True(默认),管道将克隆基础任务,并修改/排队克隆的任务。如果为 False,则直接使用基础任务,注意它必须处于草稿模式(已创建)。

    • continue_on_fail (bool ) – (已弃用,请使用 continue_behaviour 代替)。 如果为 True,失败的步骤不会导致管道停止(或标记为失败)。请注意,连接到(或间接连接到)失败步骤的步骤将被跳过。默认为 False

    • pre_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node *, * dict ] *, * bool ] *] * # noqa ) – 回调函数,在步骤(任务)创建时调用,并在发送执行之前调用。允许用户在启动前修改任务。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 参数是传递给 ClearmlJob 的配置参数。

    如果回调返回值为False,则该节点将被跳过,依赖于此节点的DAG中的任何节点也将被跳过。

    请注意参数已经被解析, 例如 ${step1.parameters.Args/param} 已被替换为相关值。

    def step_created_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    parameters, # type: dict
    ):
    pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node ] *, * None ] *] * # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
    def step_completed_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    ):
    pass
    • cache_executed_step (bool ) – 如果为True,在启动新步骤之前, 在更新最新配置后,检查是否已经执行过具有相同参数/代码的完全相同的任务。如果找到,则使用它而不是启动新任务。 默认值:False,始终使用base_task的新克隆副本。 注意:如果git仓库引用没有特定的提交ID,则永远不会使用该任务。 如果clone_base_task为False,则不会进行克隆,因此使用base_task。

    • base_task_factory (可选 [ Callable [ [ PipelineController.Node ] , * 任务 ]* ] ) – 可选,代替提供一个已存在的任务,提供一个可调用的函数来创建任务(返回任务对象)

    • retry_on_failure (可选 [ 联合 [ 整数 *, * 可调用 [ [ PipelineController *, * PipelineController.Node *, * 整数 ] *, * 布尔值 ] ] *] * # noqa ) – 整数(重试次数)或返回True以允许重试的回调函数

      • 整数:在节点故障的情况下,按照此参数指定的次数重试该节点。

      • 可调用:在节点失败时调用的函数。参数如下:

      PipelineController实例,失败的PipelineController.Node以及一个表示失败节点先前重试次数的整数。 如果节点应重试,则函数必须返回True,否则返回False。 如果为True,节点将重新排队,剩余的重试次数将减少1。 默认情况下,如果未指定此回调,函数将按照retry_on_failure指示的次数重试。

      def example_retry_on_failure_callback(pipeline, node, retries):
      print(node.name, ' failed')
      # allow up to 5 retries (total of 6 runs)
      return retries &lt; 5
    • status_change_callback (可选 [ 可调用 [ [ PipelineController *, * PipelineController.Node *, * str ] *, * None ] *] * # noqa ) – 回调函数,当步骤(任务)的状态发生变化时调用。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 函数的签名必须如下所示:

    def status_change_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    previous_status # type: str
    ):
    pass
    • output_uri (可选 [ 联合 [ str *, * bool ] ] ) – 此步骤的存储/输出URL。这是输出模型和其他工件的默认位置。有关更多信息,请查看Task.init参考文档(output_uri是一个参数)。

    • continue_behaviour (可选 [ 字典 ] ) – 控制管道在步骤失败/中止后是否继续运行。可以使用布尔选项的字典来设置不同的行为。支持的选项有:

    • continue_on_fail - If True, the pipeline will continue even if the step failed.

    如果为False,管道将停止

    • continue_on_abort - If True, the pipeline will continue even if the step was aborted.

    如果为False,管道将停止

    • skip_children_on_fail - If True, the children of this step will be skipped if it failed.

    如果为False,即使此步骤失败,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    • skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.

    如果为False,即使此步骤被中止,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    如果字典中不存在这些键,它们的值将默认为 True

  • 返回类型

    布尔

  • 返回

    如果成功则为真


添加标签

添加标签(tags)

向此管道添加标签。旧标签不会被删除。 当远程执行管道时 (即从用户界面启动管道/将其加入队列),此方法无效。

  • 参数

    标签 (Union[Sequence[str], str]) – 此管道的标签列表。

  • 返回类型

    None


PipelineController.clone

classmethod clone(pipeline_controller, name=None, comment=None, parent=None, project=None, version=None)

创建一个管道(实验)的副本(克隆)。克隆管道的状态为Draft并且可以修改。

  • 参数

    • pipeline_controller (str ) – 要克隆的管道。指定一个 PipelineController 对象或一个 ID。

    • name (str ) – 新克隆管道的名称。

    • comment (str ) – 新克隆管道的注释/描述。

    • parent (str ) – 新管道的父任务的ID。

      • 如果未指定parent,则parent设置为source_task.parent

      • 如果未指定parentsource_task.parent不可用,

    然后 parent 设置为 source_task

    • project (str ) – 创建新管道的项目名称。 如果 None,克隆将继承原始管道的项目

    • 版本 (str ) – 新克隆管道的版本。如果为 None,则克隆继承原始管道的版本

  • 返回类型

    PipelineController

  • 返回

    新克隆的PipelineController


连接配置

connect_configuration(configuration, name=None, description=None)

将配置字典或配置文件(pathlib.Path / str)连接到PipelineController对象。 在读取配置文件之前应调用此方法。

例如,一个本地文件:

config_file = pipe.connect_configuration(config_file)
my_params = json.load(open(config_file,'rt'))

一个参数字典/列表:

my_params = pipe.connect_configuration(my_params)
  • 参数

    • 配置 (Union[Mapping, list, Path, str]) – 配置。这通常是用于模型训练过程中的配置。

      指定以下之一:

      • 一个字典/列表 - 包含配置的字典。ClearML 将配置存储在

      ClearML 服务器 (后端), 以 HOCON 格式 (类似 JSON 的格式) 可编辑。

      • 一个 pathlib2.Path 字符串 - 配置文件的路径。ClearML 存储文件的内容。

      本地路径必须是相对路径。当在远程工作节点上执行管道时,从ClearML服务器(后端)带来的内容会覆盖文件的内容。

    • name (str ) – 配置部分的名称。默认值:'General' 允许用户存储多个配置字典/文件

    • 描述 (str ) – 配置部分的描述(文本)。默认值:无

  • 返回类型

    Union[dict, Path, str]

  • 返回

    如果指定了字典,则返回一个字典。如果指定了pathlib2.Path / 字符串,则返回本地配置文件的路径。配置对象。


PipelineController.create

classmethod create(project_name, task_name, repo=None, branch=None, commit=None, script=None, working_directory=None, packages=None, requirements_file=None, docker=None, docker_args=None, docker_bash_setup_script=None, argparse_args=None, force_single_script_file=False, version=None, add_run_number=True)

在系统中手动创建并填充一个新的Pipeline。 支持来自函数、装饰器和任务的pipelines。

  • 参数

    • project_name (str) – 为管道设置项目名称。

    • task_name (str) – 设置远程管道的名称。

    • repo (Optional[str]) – 要使用的仓库的远程URL,或本地git仓库的路径。 示例:'https://github.com/allegroai/clearml.git' 或 '~/project/repo'。如果指定了repo,则 还必须指定script参数

    • branch (Optional[str]) – 选择特定的仓库分支/标签(意味着从该分支获取最新的提交)

    • commit (Optional[str]) – 选择特定的提交ID使用(默认:最新提交,或与本地仓库匹配的本地提交ID)

    • script (Optional[str]) – 指定远程执行的入口脚本。当与远程git仓库一起使用时,脚本应该是仓库内的相对路径,例如:'./source/train.py'。当与本地仓库路径一起使用时,它支持直接指向本地仓库内的文件路径,例如:'~/project/source/train.py'

    • working_directory (Optional[str]) – 启动脚本的工作目录。默认值:仓库根文件夹。 相对于仓库根目录或本地文件夹。

    • packages (Union[bool, Sequence[str], None]) – 手动指定所需的包列表。示例:["tqdm>=2.1", "scikit-learn"] 或设置为 True 以根据本地安装的包自动创建需求(仓库必须是本地的)。

    • requirements_file (Union[str, Path, None]) – 指定在设置会话时要安装的requirements.txt文件。 如果未提供,则将使用存储库中的requirements.txt。

    • docker (Optional[str]) – 选择要在远程会话中执行的docker镜像

    • docker_args (Optional[str]) – 添加docker参数,传递一个单独的字符串

    • docker_bash_setup_script (Optional[str]) – 添加要在设置任务环境之前在docker内部执行的bash脚本

    • argparse_args (Optional[Sequence[Tuple[str, str]]]) – 传递给远程执行的参数,字符串对的列表(参数,值) 注意,仅在代码库本身使用 argparse.ArgumentParser 时支持。

    • force_single_script_file (bool) – 如果为True,则不自动检测本地仓库

    • 版本 (可选 [ 字符串 ] ) –

    • add_run_number (bool ) –

  • 返回类型

    PipelineController

  • 返回

    新创建的PipelineController


创建草稿

create_draft()

可选,手动创建并序列化管道任务(谨慎使用,用于手动创建多个管道)。

注意 推荐的流程是调用 pipeline.start(queue=None),这将产生类似的效果,并允许您在以后克隆/入队。

调用Pipeline.create()后,用户可以在UI中编辑管道并将其加入队列以执行。

注意:此函数应用于以编程方式创建管道以供后续使用。 要自动创建并启动管道,请调用 start() 方法。

  • 返回类型

    None


已用时间

elapsed()

返回从控制器启动时间戳开始经过的分钟数。

  • 返回类型

    float

  • 返回

    控制器启动时间的分钟数。负值表示进程尚未启动。


PipelineController.enqueue

classmethod enqueue(pipeline_controller, queue_name=None, queue_id=None, force=False)

将PipelineController加入执行队列以进行执行。

info

工作守护进程必须在队列中监听,以便工作器获取任务并执行它,请参阅ClearML文档中的“ClearML Agent”。

  • 参数

    • pipeline_controller (Union[PipelineController, str]) – 要排队的PipelineController。指定一个PipelineController对象或PipelineController ID

    • queue_name (Optional[str]) – 队列的名称。如果未指定,则必须指定 queue_id

    • queue_id (Optional[str]) – 队列的ID。如果未指定,则必须指定queue_name

    • force (bool ) – 如果为True,在将其加入队列之前,必要时重置PipelineController

  • 返回类型

    Any

  • 返回

    一个入队JSON响应。

    {
    "queued": 1,
    "updated": 1,
    "fields": {
    "status": "queued",
    "status_reason": "",
    "status_message": "",
    "status_changed": "2020-02-24T15:05:35.426770+00:00",
    "last_update": "2020-02-24T15:05:35.426770+00:00",
    "execution.queue": "2bd96ab2d9e54b578cc2fb195e52c7cf"
    }
    }
    • queued - 已排队的任务数量(一个整数或null)。

    • updated - 更新的任务数量(一个整数或null)。

    • fields

      • status - 实验的状态。

      • status_reason - 上次状态变更的原因。

      • status_message - 关于状态的信息。

      • status_changed - 最后状态更改的日期和时间(ISO 8601格式)。

      • last_update - 任务的最后更新时间,包括任务的创建、更新、更改或此任务的事件(ISO 8601格式)。

      • execution.queue - 任务入队的队列ID。null 表示未入队。


PipelineController.get

classmethod get(pipeline_id=None, pipeline_project=None, pipeline_name=None, pipeline_version=None, pipeline_tags=None, shallow_search=False)

获取特定的PipelineController。如果找到多个管道控制器,则返回具有最高语义版本的管道控制器。如果未找到语义版本,则返回最近更新的管道控制器。如果未找到管道控制器,此函数将引发异常。

注意:要运行此函数返回的管道控制器,请使用 PipelineController.enqueue

  • 参数

    • pipeline_id (Optional[str]) – 请求的PipelineController ID

    • pipeline_project (Optional[str]) – 请求的 PipelineController 项目

    • pipeline_name (Optional[str]) – 请求的PipelineController名称

    • pipeline_tags (Optional[Sequence[str]]) – 请求的PipelineController标签(标签字符串列表)

    • shallow_search (bool) – 如果为True,仅搜索前500个结果(第一页)

    • pipeline_version (可选 [ str ] ) –

  • 返回类型

    ForwardRef


PipelineController.get_logger

classmethod get_logger()

返回一个连接到管道任务的日志记录器。 该日志记录器可以被管道执行的任何函数/任务使用,以便直接向管道任务本身报告。它也可以从主管道控制任务中调用。

如果无法定位主Pipeline任务,则引发ValueError。

  • 返回类型

    Logger

  • 返回

    用于报告指标(标量、图表、调试样本等)的Logger对象


获取参数

get_parameters()

返回管道参数字典 :rtype: dict :return: 字典 str -> str

  • 返回类型

    字典


获取管道DAG

get_pipeline_dag()

返回管道执行图,DAG中的每个节点都是PipelineController.Node对象。 图本身是一个节点字典(基于节点名称的键), 每个节点都持有指向其父节点的链接(通过其唯一名称标识)

  • 返回类型

    Mapping[str, Node]

  • 返回

    执行树,作为一个嵌套字典。示例:

{
'stage1' : Node() {
name: 'stage1'
job: ClearmlJob
...
},
}

获取已处理的节点

get_processed_nodes()

返回已处理的管道节点列表,列表中的每个条目都是PipelineController.Node对象。

  • 返回类型

    Sequence[Node]

  • 返回

    已执行(不包括当前正在执行的)节点列表


获取运行中的节点

get_running_nodes()

返回当前正在运行的管道节点列表,列表中的每个条目都是PipelineController.Node对象。

  • 返回类型

    Sequence[Node]

  • 返回

    当前运行的节点列表


is_running

is_running()

如果管道控制器正在运行,则返回True。

  • 返回类型

    bool

  • 返回

    一个布尔值,指示管道控制器是否处于活动状态(仍在运行)或已停止。


is_successful

is_successful(fail_on_step_fail=True, fail_condition='all')

评估管道是否成功。

  • 参数

    • fail_on_step_fail (bool) – 如果为 True(默认值),则评估管道步骤的状态以判断管道是否成功。如果为 False,则仅评估控制器

    • fail_condition (str) – 必须是以下之一:'all'(默认)、'failed' 或 'aborted'。如果为 'failed',此函数将在管道失败时返回 False,在管道中止时返回 True。如果为 'aborted',此函数将在管道中止时返回 False,在管道失败时返回 True。如果为 'all',此函数在两种情况下都将返回 False。

  • 返回类型

    bool

  • 返回

    一个布尔值,指示管道是否成功。请注意,如果管道处于运行/挂起状态,此函数将返回False


设置默认执行队列

设置默认执行队列(default_execution_queue)

如果流水线步骤未指定执行队列,则设置默认执行队列

  • 参数

    default_execution_queue (Optional[str]) – 如果没有提供执行队列,则使用的执行队列

  • 返回类型

    None


设置管道执行时间限制

设置管道执行时间限制(max_execution_minutes)

设置整个管道的最大执行时间(分钟)。传递None或0以禁用执行时间限制。

  • 参数

    max_execution_minutes (float ) – 整个管道过程的最大时间(分钟)。默认值为 None,表示没有时间限制。

  • 返回类型

    None


开始

start(queue='services', step_task_created_callback=None, step_task_completed_callback=None, wait=True)

远程启动当前管道(在选定的服务队列上)。 当前进程将停止并在远程启动。

  • 参数

    • queue – 用于启动管道的队列名称

    • step_task_created_callback (Callable ) – 回调函数,当步骤(任务)创建时调用 并在发送执行之前。允许用户在启动前修改任务。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 参数是传递给 ClearmlJob 的配置参数。

    如果回调返回值为False,则该节点将被跳过,依赖于此节点的DAG中的任何节点也将被跳过。

    请注意参数已经被解析, 例如 ${step1.parameters.Args/param} 已被替换为相关值。

    def step_created_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    parameters, # type: dict
    ):
    pass
    • step_task_completed_callback (Callable ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
    def step_completed_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    ):
    pass
    • wait – If True (default), start the pipeline controller, return only after the pipeline is done (completed/aborted/failed)
  • 返回类型

    bool

  • 返回

    如果控制器启动,则为True。如果控制器未启动,则为False。


本地启动

start_locally(run_pipeline_steps_locally=False)

在本地启动当前管道,意味着管道逻辑在当前机器上运行,而不是在服务队列上。

使用 run_pipeline_steps_locally=True 你可以在本地以子进程的形式运行所有管道步骤。 注意:当在本地运行管道步骤时,它假设是本地代码执行 (即它正在运行本地代码,而不考虑管道步骤任务上的 git 提交/差异)

  • 参数

    run_pipeline_steps_locally (bool) – (默认 False) 如果为 True,则在本地作为子进程运行管道步骤(用于在本地调试管道,注意管道代码应在本地机器上可用)

  • 返回类型

    None


停止

stop(timeout=None, mark_failed=False, mark_aborted=False)

停止管道控制器和优化线程。 如果mark_failed和mark_aborted为False(默认),则将管道标记为已完成, 除非其中一个步骤失败,则将管道标记为失败。

  • 参数

    • timeout (可选 [ float ] ) – 等待优化线程退出的超时时间(分钟)。 默认值为 None,表示不等待立即终止。

    • mark_failed (bool ) – 如果为True,将管道任务标记为失败。(默认值为False)

    • mark_aborted (bool ) – 如果为True,将管道任务标记为已中止。(默认值为False)

  • 返回类型

    ()


更新执行图

update_execution_plot()

更新当前管道的桑基图

  • 返回类型

    ()


PipelineController.upload_artifact

classmethod upload_artifact(name, artifact_object, metadata=None, delete_after_upload=False, auto_pickle=None, preview=None, wait_on_upload=False, serialization_function=None)

上传(添加)一个工件到主Pipeline任务对象。 此函数可以从任何管道组件调用,以直接将工件添加到主管道任务中。

工件可以由管道执行的任何函数/任务上传,以便直接报告给管道任务本身。它也可以从主管道控制任务中调用。

如果无法定位主Pipeline任务,则引发ValueError。

当前支持的上传工件类型包括:

  • 字符串 / 路径 - 指向工件文件的路径。如果指定了通配符或文件夹,则 ClearML 会创建并上传一个 ZIP 文件。

  • 字典 - ClearML 将字典存储为 .json 文件并上传。

  • pandas.DataFrame - ClearML 将 pandas.DataFrame 存储为 .csv.gz(压缩的 CSV)文件并上传。

  • numpy.ndarray - ClearML 将 numpy.ndarray 存储为 .npz 文件并上传。

  • PIL.Image - ClearML 将 PIL.Image 存储为 .png 文件并上传。

  • 任何 - 如果使用 auto_pickle=True 调用,对象将被序列化并上传。

  • 参数

    • name (str ) – The artifact name.
    warning

    如果之前上传了同名的工件,则会被覆盖。

    • artifact_object (object ) – 工件对象。

    • metadata (dict ) – 一个包含任意元数据的键值对字典。该字典会出现在ClearML Web-App (UI)ARTIFACTS标签页中。

    • delete_after_upload (bool ) – 上传后,删除本地的工件副本

      • True - 删除工件的本地副本。

      • False - 不删除。(默认)

    • auto_pickle (bool ) – 如果为True,且artifact_object不是以下类型之一: pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) artifact_object将被pickle并作为pickle文件artifact上传(文件扩展名为.pkl) 如果设置为None(默认),将使用sdk.development.artifacts.auto_pickle配置值。

    • 预览 (任意 ) – 工件预览

    • wait_on_upload (bool) – 是否应同步上传,强制上传完成后再继续。

    • Union [ bytes **, ** bytearray ] **] ** serialization_function (Callable [ Any , ) – 一个序列化函数,它接受一个任意类型的参数,该参数是要序列化的对象。该函数应返回一个表示序列化对象的bytes或bytearray对象。请注意,对象将立即使用此函数进行序列化,因此即使可能,也不会使用其他序列化方法(例如pandas.DataFrame.to_csv)。在使用Artifact.get方法获取此工件时,要反序列化此工件,请使用其deserialization_function参数。

    • serialization_function (可选 [ Callable [ [ Any ] *, * Union [ bytes *, * bytearray ] ] ] ) –

  • 返回类型

    bool

  • 返回

    上传的状态。

  • True - 上传成功。

  • False - 上传失败。

  • Improvement

    如果工件对象类型不受支持,则引发ValueError

  • 参数

    • name (str ) –

    • artifact_object (任意 ) –

    • metadata (可选 [ 映射 ] ) –

    • delete_after_upload (bool ) –

    • auto_pickle (可选 [ 布尔型 ] ) –

    • 预览 (可选 [ 任意 ] ) –

    • wait_on_upload (bool ) –

    • serialization_function (可选 [ Callable [ [ Any ] *, * Union [ bytes *, * bytearray ] ] ] ) –

  • 返回类型

    布尔


PipelineController.upload_model

classmethod upload_model(model_name, model_local_path, upload_uri=None)

上传(添加)一个模型到主管道任务对象。 此函数可以从任何管道组件调用,以直接将模型添加到主管道任务中。

模型文件/路径将被上传到管道任务并在模型仓库中注册。

如果无法定位主Pipeline任务,则引发ValueError。

  • 参数

    • model_name (str) – 模型名称,将出现在模型注册表中(在管道的项目中)

    • model_local_path (str) – 要上传的本地模型文件或目录的路径。 如果提供了本地目录,文件夹的内容(递归地)将被打包成一个zip文件并上传。

    • upload_uri (Optional[str]) – 模型权重上传的存储目标URI。默认值是之前使用的URI。

  • 返回类型

    OutputModel

  • 返回

    上传的OutputModel


等待

wait(timeout=None)

等待管道完成。

info

此方法不会停止管道。调用stop以终止管道。

  • 参数

    timeout (float ) – 等待管道完成的超时时间(分钟)。 如果 None,则等待直到达到超时或管道完成。

  • 返回类型

    bool

  • 返回

    如果管道完成,则为True。如果管道超时,则为False。

class automation.controller.PipelineDecorator()

创建一个新的管道控制器。新创建的对象将启动并监控新的实验。

  • 参数

    • name (str ) – 提供管道名称(如果存在主任务,它将覆盖其名称)

    • 项目 (str ) – 提供存储管道的项目(如果存在主任务,它将覆盖其项目)

    • 版本 (可选 [ str ] ) – 管道版本。此版本允许唯一标识管道模板的执行。语义版本的示例:version=’1.0.1’ , version=’23’, version=’1.2’。 如果未设置,则查找管道的最新版本并递增。如果未找到此类版本,则默认为‘1.0.0’

    • pool_frequency (float ) – 监控实验/状态的池化频率(以分钟为单位)。

    • add_pipeline_tags (bool ) – (默认: False) 如果为 True,则为由此管道创建的所有步骤(任务)添加 pipe: 标签。

    • target_project (str ) – 如果提供,所有管道步骤将被克隆到目标项目中

    • abort_on_failure (bool ) – 如果为False(默认值),失败的管道步骤不会导致管道立即停止,相反,任何未连接到(或间接连接到)失败步骤的步骤仍将被执行。尽管如此,管道本身将被标记为失败,除非失败步骤特别定义为“continue_on_fail=True”。 如果为True,任何失败的步骤都将导致管道立即中止,停止所有正在运行的步骤,并将管道标记为失败。

    • add_run_number (bool ) – 如果为True(默认),将管道的运行编号添加到管道名称中。 例如,第二次启动管道“best pipeline”时,我们将其重命名为“best pipeline #2”

    • retry_on_failure (可选 [ 联合 [ 整数 *, * 可调用 [ [ PipelineController *, * PipelineController.Node *, * 整数 ] *, * 布尔值 ] ] *] * # noqa ) – 整数(重试次数)或返回True以允许重试的回调函数

      • 整数:在节点故障的情况下,按照此参数指定的次数重试该节点。

      • 可调用:在节点失败时调用的函数。参数如下:

      PipelineController实例,失败的PipelineController.Node以及一个表示失败节点先前重试次数的整数。 如果节点应重试,则函数必须返回True,否则返回False。 如果为True,节点将重新排队,剩余的重试次数将减少1。 默认情况下,如果未指定此回调,函数将按照retry_on_failure指示的次数重试。

      def example_retry_on_failure_callback(pipeline, node, retries):
      print(node.name, ' failed')
      # allow up to 5 retries (total of 6 runs)
      return retries &lt; 5
    • docker (可选 [ str ] ) – 选择要在远程会话中执行的docker镜像

    • docker_args (可选 [ str ] ) – 添加docker参数,传递一个字符串

    • docker_bash_setup_script (可选 [ str ] ) – 添加要在设置任务环境之前在docker内部执行的bash脚本

    • packages (可选 [ 联合 [ 布尔 *, * 字符串 *, * 序列 [ 字符串 ] ] ] ) – 手动指定所需的包列表或本地 requirements.txt 文件。 示例:[“tqdm>=2.1”, “scikit-learn”] 或 “./requirements.txt” 如果未提供,包将自动添加。 使用 False 从您的 git 仓库中的 “requirements.txt” 安装需求

    • repo (可选 [ str ] ) – 可选,指定一个仓库附加到管道控制器,当远程执行时。 允许用户在指定的仓库内执行控制器,使他们能够从仓库加载模块/脚本。 注意执行工作目录将是仓库的根文件夹。 支持git仓库的url链接,以及本地仓库路径(自动转换为远程git/提交,如当前检出的那样)。 示例远程url: ‘https://github.com/user/repo.git’ 示例本地仓库副本: ‘./repo’ -> 将自动存储远程仓库url和基于本地克隆副本的提交ID 使用空字符串(“”)来禁用任何仓库的自动检测

    • repo_branch (可选 [ str ] ) – 可选,指定远程仓库分支(如果使用本地仓库路径,则忽略)

    • repo_commit (可选 [ str ] ) – 可选,指定仓库提交ID(如果使用本地仓库路径,则忽略)

    • artifact_serialization_function (Optional [ Callable [ [ Any ] *, * Union [ bytes *, * bytearray ] ] ] ) – 一个序列化函数,它接受一个任意类型的参数,该参数是要序列化的对象。该函数应返回一个表示序列化对象的 bytes 或 bytearray 对象。管道上传的所有参数/返回工件都将使用此函数进行序列化。所有相关的导入必须在此函数中完成。例如:

    def serialize(obj):
    import dill
    return dill.dumps(obj)
    • artifact_deserialization_function (Optional [ Callable [ [ bytes ] *, * Any ] ] ) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:
    def deserialize(bytes_):
    import dill
    return dill.loads(bytes_)
    • output_uri (可选 [ 联合 [ str *, * bool ] ] ) – 此管道的存储/输出URL。这是输出模型和其他工件的默认位置。有关更多信息,请查看Task.init参考文档(output_uri是一个参数)。 此管道步骤的output_uri将默认为此值。

    • skip_global_imports (bool ) – 如果为True,全局导入将不会包含在步骤的执行中,否则所有全局导入将在每个步骤执行开始时以安全的方式自动导入。默认值为False

    • working_dir (可选 [ str ] ) – 启动管道的工作目录。

    • enable_local_imports (bool ) – 如果为True,允许管道步骤通过将管道控制器脚本所在的目录(sys.path[0])附加到每个步骤的PYTHONPATH来从本地文件导入。 如果为False,该目录将不会附加到PYTHONPATH。默认值为True。 在远程运行时忽略。


添加函数步骤

add_function_step(name, function, function_kwargs=None, function_return=None, project_name=None, task_name=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, packages=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, docker=None, docker_args=None, docker_bash_setup_script=None, parents=None, execution_queue=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, time_limit=None, continue_on_fail=False, pre_execute_callback=None, post_execute_callback=None, cache_executed_step=False, retry_on_failure=None, status_change_callback=None, tags=None, output_uri=None, draft=False, working_dir=None, continue_behaviour=None)

从函数创建任务,包括将函数输入参数包装到超参数部分作为kwargs,并将函数结果存储为命名的工件

示例:

def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2

create_task_from_function(mock_func, function_return=['mul', 'square'])

来自其他任务的示例参数(artifact):

def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c

create_task_from_function(
mock_func,
function_kwargs={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
  • 参数

    • name (str ) – 步骤的唯一标识。例如 stage1

    • function (Callable ) – 一个全局函数,用于转换为独立的Task

    • function_kwargs (可选 [ 字典 [ 字符串 *, * 任意 ] ] ) – 可选,提供函数参数和默认值的子集以暴露。 如果未提供,则自动获取所有函数参数和默认值 可选,从其他任务的输出工件中传递输入参数给函数。 例如,从任务ID aabbcc的工件名称answer中传递名为numpy_matrix的参数: {'numpy_matrix': 'aabbcc.answer'}

    • function_return (可选 [ 列表 [ 字符串 ] ] ) – 提供所有结果的名称列表。 如果未提供,则不会将结果存储为工件。

    • project_name (可选 [ str ] ) – 设置任务的名称。如果 base_task_id 为 None,则必须设置。

    • task_name (可选 [ str ] ) – 设置远程任务的名称,如果未提供,则使用名称参数。

    • task_type (可选 [ str ] ) – 可选,要创建的任务类型。支持的值:'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'

    • auto_connect_frameworks (可选 [ 字典 ] ) – 控制框架的自动连接,参见 Task.init auto_connect_frameworks

    • auto_connect_arg_parser (可选 [ 字典 ] ) – 控制ArgParser的自动连接,参见Task.init auto_connect_arg_parser

    • packages (可选 [ Union [ bool *, * str *, * Sequence [ str ] ] ] ) – 手动指定所需的包列表或本地的requirements.txt文件。 示例:[“tqdm>=2.1”, “scikit-learn”] 或 “./requirements.txt” 如果未提供,将根据函数中使用的导入自动添加包。 使用False从git仓库中的“requirements.txt”安装需求

    • repo (可选 [ str ] ) – 可选,指定一个仓库附加到函数上,当远程执行时。 允许用户在指定的仓库内执行函数,从而能够从仓库加载模块/脚本 注意执行工作目录将是仓库的根文件夹。 支持git仓库的url链接,以及本地仓库路径。 示例远程url: ‘https://github.com/user/repo.git’ 示例本地仓库副本: ‘./repo’ -> 将自动存储远程仓库的url和基于本地克隆副本的提交ID

    • repo_branch (可选 [ str ] ) – 可选,指定远程仓库分支(如果使用本地仓库路径,则忽略)

    • repo_commit (可选 [ str ] ) – 可选,指定仓库提交ID(如果使用本地仓库路径,则忽略)

    • helper_functions (可选 [ 序列 [ 可调用 ] ] ) – 可选,一个辅助函数列表,用于独立的函数任务。

    • docker (可选 [ str ] ) – 选择要在远程会话中执行的docker镜像

    • docker_args (可选 [ str ] ) – 添加docker参数,传递一个字符串

    • docker_bash_setup_script (可选 [ str ] ) – 添加要在设置任务环境之前在docker内部执行的bash脚本

    • parents (可选 [ 序列 [ str ] ] ) – DAG 中父节点的可选列表。 只有在所有父节点成功执行后,管道中的当前步骤才会被发送执行。

    • execution_queue (可选 [ str ] ) – 可选,用于执行此特定步骤的队列。 如果未提供,任务将被发送到默认的执行队列,如类中所定义。

    • monitor_metrics (可选 [ 列表 [ 联合 [ 元组 [ str *, * str ] *, * 元组 [ ( str *, * str ) *, * ( str *, * str ) ] ] ] ] ) – 可选,记录管道任务步骤的指标。 格式是要记录的指标对(标题,系列)的列表:

    [(步骤指标标题, 步骤指标系列), ] 示例: [(‘test’, ‘accuracy’), ]

    或者是一个元组对的列表,用于指定在管道任务上使用的不同目标指标:

    [((步骤指标标题, 步骤指标系列), (目标指标标题, 目标指标系列)), ] 示例: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]

    • monitor_artifacts (可选 [ 列表 [ 联合 [ str *, * 元组 [ str *, * str ] ] ] ] ) – 可选,记录管道任务中步骤的工件。 提供了步骤任务中存在的工件名称列表,它们也将出现在管道本身上。 示例:[(‘processed_data’, ‘final_processed_data’), ] 或者用户也可以提供一个要监控的工件列表 (目标工件名称将与原始工件名称相同) 示例:[‘processed_data’, ]

    • monitor_models (可选 [ 列表 [ 联合 [ 字符串 *, * 元组 [ 字符串 *, * 字符串 ] ] ] ] ) – 可选,记录管道任务上步骤的输出模型。 提供步骤任务上存在的模型名称列表,它们也将出现在管道本身上。 示例:[(‘model_weights’, ‘final_model_weights’), ] 或者用户也可以提供一个要监控的模型列表 (目标模型名称将与原始模型相同) 示例:[‘model_weights’, ] 要选择最新的(字典序)模型,请使用“model_*”,或者使用“*”选择最后创建的模型 示例: [‘model_weights_*’, ]

    • time_limit (可选 [ float ] ) – 默认值为 None,表示没有时间限制。 步骤执行时间限制,如果超过此限制,任务将被中止,管道将停止并标记为失败。

    • continue_on_fail (bool ) – (已弃用,请使用 continue_behaviour 代替)。 如果为 True,失败的步骤不会导致管道停止(或标记为失败)。请注意,连接到(或间接连接到)失败步骤的步骤将被跳过。默认为 False

    • pre_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node *, * dict ] *, * bool ] *] * # noqa ) – 回调函数,在步骤(任务)创建时调用,并在发送执行之前调用。允许用户在启动前修改任务。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 参数是传递给 ClearmlJob 的配置参数。

    如果回调返回值为False,则该节点将被跳过,依赖于此节点的DAG中的任何节点也将被跳过。

    请注意参数已经被解析, 例如 ${step1.parameters.Args/param} 已被替换为相关值。

    def step_created_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    parameters, # type: dict
    ):
    pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node ] *, * None ] *] * # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
    def step_completed_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    ):
    pass
    • cache_executed_step (bool ) – 如果为True,在启动新步骤之前, 在更新最新配置后,检查是否已经执行了具有相同参数/代码的完全相同的任务。如果找到,则使用它而不是启动新任务。 默认值:False,始终使用base_task的新克隆副本。 注意:如果git仓库引用没有特定的提交ID,则永远不会使用该任务。

    • retry_on_failure (可选 [ 联合 [ 整数 *, * 可调用 [ [ PipelineController *, * PipelineController.Node *, * 整数 ] *, * 布尔值 ] ] *] * # noqa ) – 整数(重试次数)或返回True以允许重试的回调函数

      • 整数:在节点故障的情况下,按照此参数指定的次数重试该节点。

      • 可调用:在节点失败时调用的函数。参数如下:

      PipelineController实例,失败的PipelineController.Node以及一个表示失败节点先前重试次数的整数。 如果节点应重试,则函数必须返回True,否则返回False。 如果为True,节点将重新排队,剩余的重试次数将减少1。 默认情况下,如果未指定此回调,函数将按照retry_on_failure指示的次数重试。

      def example_retry_on_failure_callback(pipeline, node, retries):
      print(node.name, ' failed')
      # allow up to 5 retries (total of 6 runs)
      return retries &lt; 5
    • status_change_callback (可选 [ 可调用 [ [ PipelineController *, * PipelineController.Node *, * str ] *, * None ] *] * # noqa ) – 回调函数,当步骤(任务)的状态发生变化时调用。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 函数的签名必须如下所示:

    def status_change_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    previous_status # type: str
    ):
    pass
    • tags (可选 [ 联合 [ str *, * 序列 [ str ] ] ] ) – 特定管道步骤的标签列表。 当远程执行管道时 (即从UI启动管道/将其加入队列),此方法无效。

    • output_uri (可选 [ 联合 [ str *, * bool ] ] ) – 此步骤的存储/输出URL。这是输出模型和其他工件的默认位置。有关更多信息,请查看Task.init参考文档(output_uri是一个参数)。

    • 草稿 (可选 [ 布尔值 ] ) – (默认 False). 如果为 True,任务将作为草稿任务创建。

    • working_dir (可选 [ str ] ) – 启动脚本的工作目录。

    • continue_behaviour (可选 [ 字典 ] ) – 控制管道在步骤失败/中止后是否继续运行。可以使用布尔选项的字典来设置不同的行为。支持的选项有:

    • continue_on_fail - If True, the pipeline will continue even if the step failed.

    如果为False,管道将停止

    • continue_on_abort - If True, the pipeline will continue even if the step was aborted.

    如果为False,管道将停止

    • skip_children_on_fail - If True, the children of this step will be skipped if it failed.

    如果为False,即使此步骤失败,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    • skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.

    如果为False,即使此步骤被中止,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    如果字典中不存在这些键,它们的值将默认为 True

  • 返回类型

    布尔

  • 返回

    如果成功则为真


add_parameter

add_parameter(名称, 默认值=None, 描述=None, 参数类型=None)

向管道任务添加一个参数。 该参数可以用作管道中任何步骤的输入参数。 注意所有参数将出现在PipelineController任务的高级参数 -> 管道部分 示例:pipeline.add_parameter(name=’dataset’, description=’要处理管道的数据集ID’) 然后在其中一个步骤中,我们可以使用'${pipeline.dataset}'来引用参数的值

  • 参数

    • name (str) – 参数的字符串名称。

    • default (Optional[Any]) – 默认值,将作为默认值设置(可以在UI中稍后更改)

    • 描述 (Optional[str]) – 参数的字符串描述及其在管道中的使用

    • param_type (Optional[str]) – 可选,参数类型信息(用于提示类型转换和描述)

  • 返回类型

    None


添加步骤

add_step(名称, 基础任务ID=无, 父任务=无, 参数覆盖=无, 配置覆盖=无, 任务覆盖=无, 执行队列=无, 监控指标=无, 监控工件=无, 监控模型=无, 时间限制=无, 基础任务项目=无, 基础任务名称=无, 克隆基础任务=真, 失败继续=假, 预执行回调=无, 后执行回调=无, 缓存执行步骤=假, 基础任务工厂=无, 失败重试=无, 状态变更回调=无, 递归解析参数=假, 输出URI=无, 继续行为=无)

向管道执行DAG添加一个步骤。 每个步骤必须具有唯一的名称(此名称稍后将用于寻址步骤)

  • 参数

    • name (str ) – 步骤的唯一标识。例如 stage1

    • base_task_id (可选 [ str ] ) – 用于步骤的任务ID。每次执行步骤时, 基础任务会被克隆,然后克隆的任务将被发送执行。

    • parents (可选 [ 序列 [ str ] ] ) – DAG 中父节点的可选列表。 只有在所有父节点成功执行后,管道中的当前步骤才会被发送执行。

    • parameter_override (可选 [ 映射 [ str *, * 任意 ] ] ) – 可选的参数覆盖字典。

      字典值可以使用以下形式引用先前执行的步骤 '${step_name}'。示例:

      • 工件访问 parameter_override={'Args/input_file': '${<step_name>.artifacts.<artifact_name>.url}' }

      • 模型访问(最后使用的模型) parameter_override={'Args/input_file': '${<step_name>.models.output.-1.url}' }

      • 参数访问 parameter_override={'Args/input_file': '${<step_name>.parameters.Args/input_file}' }

      • 管道任务参数(参见 Pipeline.add_parameter)parameter_override={'Args/input_file': '${pipeline.<pipeline_parameter>}' }

      • 任务ID parameter_override={'Args/input_file': '${stage3.id}' }

    • recursively_parse_parameters (bool ) – 如果为True,则递归地从parameter_override中的列表、字典或元组中解析参数。 示例:

      • parameter_override={'Args/input_file': ['${&lt;step_name&gt;.artifacts.&lt;artifact_name&gt;.url}', 'file2.txt']} will be correctly parsed.
        • parameter_override={'Args/input_file': ('${.parameters.Args/input_file}', '${.parameters.Args/input_file}')} 将被正确解析。

      • configuration_overrides (可选 [ 映射 [ str *, * 联合 [ str *, * 映射 ] ] ] ) – 可选,覆盖任务配置对象。 预期的配置对象名称和配置对象内容的字典。 示例:

    {'General': dict(key='value')} {'General': '配置文件内容'} {'OmegaConf': YAML.dumps(full_hydra_dict)}

    • task_overrides (可选 [ 映射 [ str *, * 任意 ] ] ) – 可选的任务部分覆盖字典。

      字典值可以使用以下形式引用先前执行的步骤 '${step_name}'。示例:

      • 从特定分支获取最新提交 task_overrides={'script.version_num': '', 'script.branch': 'main'}

      • 将git仓库分支匹配到上一步 task_overrides={'script.branch': '${stage1.script.branch}', 'script.version_num': ''}

      • 更改容器镜像 task_overrides={'container.image': 'nvidia/cuda:11.6.0-devel-ubuntu20.04', 'container.arguments': '--ipc=host'}

      • 将容器镜像与上一步匹配 task_overrides={'container.image': '${stage1.container.image}'}

      • 重置需求(代理将使用仓库内的“requirements.txt”) task_overrides={'script.requirements.pip': ""}

    • execution_queue (可选 [ str ] ) – 可选,用于执行此特定步骤的队列。 如果未提供,任务将被发送到默认的执行队列,如类中所定义。

    • monitor_metrics (可选 [ 列表 [ 联合 [ 元组 [ str *, * str ] *, * 元组 [ ( str *, * str ) *, * ( str *, * str ) ] ] ] ] ) – 可选,记录管道任务步骤的指标。 格式是要记录的指标对(标题,系列)的列表:

    [(步骤指标标题, 步骤指标系列), ] 示例: [(‘test’, ‘accuracy’), ]

    或者是一个元组对的列表,用于指定在管道任务上使用的不同目标指标:

    [((步骤指标标题, 步骤指标系列), (目标指标标题, 目标指标系列)), ] 示例: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]

    • monitor_artifacts (可选 [ 列表 [ 联合 [ str *, * 元组 [ str *, * str ] ] ] ] ) – 可选,记录管道任务中步骤的工件。 提供了步骤任务中存在的工件名称列表,它们也将出现在管道本身上。 示例:[(‘processed_data’, ‘final_processed_data’), ] 或者用户也可以提供一个要监控的工件列表 (目标工件名称将与原始工件名称相同) 示例:[‘processed_data’, ]

    • monitor_models (可选 [ 列表 [ 联合 [ 字符串 *, * 元组 [ 字符串 *, * 字符串 ] ] ] ] ) – 可选,记录管道任务上步骤的输出模型。 提供步骤任务上存在的模型名称列表,它们也将出现在管道本身上。 示例:[(‘model_weights’, ‘final_model_weights’), ] 或者用户也可以提供一个要监控的模型列表 (目标模型名称将与原始模型相同) 示例:[‘model_weights’, ] 要选择最新的(字典序)模型,请使用“model_*”,或者使用“*”选择最后创建的模型 示例: [‘model_weights_*’, ]

    • time_limit (可选 [ float ] ) – 默认值为 None,表示没有时间限制。 步骤执行时间限制,如果超过此限制,任务将被中止,管道将停止并标记为失败。

    • base_task_project (可选 [ str ] ) – 如果未提供 base_task_id, 则使用 base_task_project 和 base_task_name 组合来检索用于步骤的 base_task_id。

    • base_task_name (可选 [ str ] ) – 如果未提供 base_task_id, 则使用 base_task_project 和 base_task_name 组合来检索用于步骤的 base_task_id。

    • clone_base_task (bool ) – 如果为 True(默认),管道将克隆基础任务,并修改/排队克隆的任务。如果为 False,则直接使用基础任务,注意它必须处于草稿模式(已创建)。

    • continue_on_fail (bool ) – (已弃用,请使用 continue_behaviour 代替)。 如果为 True,失败的步骤不会导致管道停止(或标记为失败)。请注意,连接到(或间接连接到)失败步骤的步骤将被跳过。默认为 False

    • pre_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node *, * dict ] *, * bool ] *] * # noqa ) – 回调函数,在步骤(任务)创建时调用,并在发送执行之前调用。允许用户在启动前修改任务。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 参数是传递给 ClearmlJob 的配置参数。

    如果回调返回值为False,则该节点将被跳过,依赖于此节点的DAG中的任何节点也将被跳过。

    请注意参数已经被解析, 例如 ${step1.parameters.Args/param} 已被替换为相关值。

    def step_created_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    parameters, # type: dict
    ):
    pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node ] *, * None ] *] * # noqa ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
    def step_completed_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    ):
    pass
    • cache_executed_step (bool ) – 如果为True,在启动新步骤之前, 在更新最新配置后,检查是否已经执行过具有相同参数/代码的完全相同的任务。如果找到,则使用它而不是启动新任务。 默认值:False,始终使用base_task的新克隆副本。 注意:如果git仓库引用没有特定的提交ID,则永远不会使用该任务。 如果clone_base_task为False,则不会进行克隆,因此使用base_task。

    • base_task_factory (可选 [ Callable [ [ PipelineController.Node ] , * 任务 ]* ] ) – 可选,代替提供一个已存在的任务,提供一个可调用的函数来创建任务(返回任务对象)

    • retry_on_failure (可选 [ 联合 [ 整数 *, * 可调用 [ [ PipelineController *, * PipelineController.Node *, * 整数 ] *, * 布尔值 ] ] *] * # noqa ) – 整数(重试次数)或返回True以允许重试的回调函数

      • 整数:在节点故障的情况下,按照此参数指定的次数重试该节点。

      • 可调用:在节点失败时调用的函数。参数如下:

      PipelineController实例,失败的PipelineController.Node以及一个表示失败节点先前重试次数的整数。 如果节点应重试,则函数必须返回True,否则返回False。 如果为True,节点将重新排队,剩余的重试次数将减少1。 默认情况下,如果未指定此回调,函数将按照retry_on_failure指示的次数重试。

      def example_retry_on_failure_callback(pipeline, node, retries):
      print(node.name, ' failed')
      # allow up to 5 retries (total of 6 runs)
      return retries &lt; 5
    • status_change_callback (可选 [ 可调用 [ [ PipelineController *, * PipelineController.Node *, * str ] *, * None ] *] * # noqa ) – 回调函数,当步骤(任务)的状态发生变化时调用。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 函数的签名必须如下所示:

    def status_change_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    previous_status # type: str
    ):
    pass
    • output_uri (可选 [ 联合 [ str *, * bool ] ] ) – 此步骤的存储/输出URL。这是输出模型和其他工件的默认位置。有关更多信息,请查看Task.init参考文档(output_uri是一个参数)。

    • continue_behaviour (可选 [ 字典 ] ) – 控制管道在步骤失败/中止后是否继续运行。可以使用布尔选项的字典来设置不同的行为。支持的选项有:

    • continue_on_fail - If True, the pipeline will continue even if the step failed.

    如果为False,管道将停止

    • continue_on_abort - If True, the pipeline will continue even if the step was aborted.

    如果为False,管道将停止

    • skip_children_on_fail - If True, the children of this step will be skipped if it failed.

    如果为False,即使此步骤失败,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    • skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.

    如果为False,即使此步骤被中止,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    如果字典中不存在这些键,它们的值将默认为 True

  • 返回类型

    布尔

  • 返回

    如果成功则为真


添加标签

添加标签(tags)

向此管道添加标签。旧标签不会被删除。 当远程执行管道时 (即从用户界面启动管道/将其加入队列),此方法无效。

  • 参数

    标签 (Union[Sequence[str], str]) – 此管道的标签列表。

  • 返回类型

    None


PipelineDecorator.clone

classmethod clone(pipeline_controller, name=None, comment=None, parent=None, project=None, version=None)

创建一个管道(实验)的副本(克隆)。克隆管道的状态为Draft并且可以修改。

  • 参数

    • pipeline_controller (str ) – 要克隆的管道。指定一个 PipelineController 对象或一个 ID。

    • name (str ) – 新克隆管道的名称。

    • comment (str ) – 新克隆管道的注释/描述。

    • parent (str ) – 新管道的父任务的ID。

      • 如果未指定parent,则parent设置为source_task.parent

      • 如果未指定parentsource_task.parent不可用,

    然后 parent 设置为 source_task

    • project (str ) – 创建新管道的项目名称。 如果 None,克隆将继承原始管道的项目

    • 版本 (str ) – 新克隆管道的版本。如果为 None,则克隆继承原始管道的版本

  • 返回类型

    PipelineController

  • 返回

    新克隆的PipelineController


PipelineDecorator.component

*classmethod component(_func=None, , return_values=('return_object'), name=None, cache=False, packages=None, parents=None, execution_queue=None, continue_on_fail=False, docker=None, docker_args=None, docker_bash_setup_script=None, task_type=None, auto_connect_frameworks=None, auto_connect_arg_parser=None, repo=None, repo_branch=None, repo_commit=None, helper_functions=None, monitor_metrics=None, monitor_artifacts=None, monitor_models=None, retry_on_failure=None, pre_execute_callback=None, post_execute_callback=None, status_change_callback=None, tags=None, output_uri=None, draft=False, working_dir=None, continue_behaviour=None)

远程执行的pipeline组件函数

  • 参数

    • _func – 包装函数

    • return_values (Union [ str *, * Sequence [ str ] ] ) – 提供所有结果的名称列表。 注意!如果未提供,则不会将结果存储为工件。

    • name (可选 [ str ] ) – 可选,设置管道组件任务的名称。 如果未提供,则使用包装的函数名称作为管道组件名称

    • cache (bool ) – 如果为True,在启动新步骤之前,使用最新配置更新后,检查是否已经执行过具有相同参数/代码的完全相同的任务。如果找到,则使用它而不是启动新任务。默认值:False

    • packages (可选 [ 联合 [ 布尔 *, * 字符串 *, * 序列 [ 字符串 ] ] ] ) – 手动指定所需的包列表或本地 requirements.txt 文件。 示例:[“tqdm>=2.1”, “scikit-learn”] 或 “./requirements.txt” 如果未提供,将根据包装函数内部使用的导入自动添加包。 使用 False 从您的 git 仓库中的 “requirements.txt” 安装需求

    • parents (可选 [ 列表 [ str ] ] ) – DAG 中父节点的可选列表。 只有在所有父节点成功执行后,管道中的当前步骤才会被发送执行。

    • execution_queue (可选 [ str ] ) – 可选,用于执行此特定步骤的队列。 如果未提供,任务将被发送到管道的默认执行队列

    • continue_on_fail (bool ) – (已弃用,请使用 continue_behaviour 代替)。 如果为 True,失败的步骤不会导致管道停止(或标记为失败)。请注意,连接到(或间接连接到)失败步骤的步骤将被跳过。默认为 False

    • docker (可选 [ str ] ) – 指定在远程执行管道步骤时要使用的docker镜像

    • docker_args (可选 [ str ] ) – 为远程执行添加docker执行参数 (使用单个字符串表示所有docker参数)。

    • docker_bash_setup_script (可选 [ str ] ) – 在设置任务环境之前,添加一个要在docker内部执行的bash脚本

    • task_type (可选 [ str ] ) – 可选,要创建的任务类型。支持的值:'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'

    • auto_connect_frameworks (可选 [ 字典 ] ) – 控制框架的自动连接,参见 Task.init auto_connect_frameworks

    • auto_connect_arg_parser (可选 [ 字典 ] ) – 控制ArgParser的自动连接,参见Task.init auto_connect_arg_parser

    • repo (可选 [ str ] ) – 可选,指定一个仓库附加到函数上,以便在远程执行时使用。 允许用户在指定的仓库内执行函数,使他们能够从仓库中加载模块/脚本。 注意,执行工作目录将是仓库的根文件夹。 支持git仓库的URL链接,以及本地仓库路径(自动转换为远程git/提交,如当前检出的那样)。 示例远程URL:'https://github.com/user/repo.git' 示例本地仓库副本:'./repo' -> 将自动存储远程仓库URL和基于本地克隆副本的提交ID

    • repo_branch (可选 [ str ] ) – 可选,指定远程仓库分支(如果使用本地仓库路径,则忽略)

    • repo_commit (可选 [ str ] ) – 可选,指定仓库提交ID(如果使用本地仓库路径,则忽略)

    • helper_functions (可选 [ 序列 [ 可调用 ] ] ) – 可选,一个辅助函数的列表,用于使独立的管道步骤函数任务可用。默认情况下,管道步骤函数无法访问任何其他函数,通过在这里指定额外的函数,远程管道步骤可以调用这些额外的函数。 例如,假设我们有两个函数 parse_data() 和 load_data():[parse_data, load_data]

    • monitor_metrics (可选 [ 列表 [ 联合 [ 元组 [ str *, * str ] *, * 元组 [ ( str *, * str ) *, * ( str *, * str ) ] ] ] ] ) – 可选,自动记录步骤报告的指标也在管道任务上。 预期格式是要记录的指标(标题,系列)对的列表:

    [(步骤指标标题, 步骤指标系列), ] 示例: [(‘test’, ‘accuracy’), ]

    或者是一个元组对的列表,用于指定在管道任务上使用的不同目标指标:

    [((步骤指标标题, 步骤指标系列), (目标指标标题, 目标指标系列)), ] 示例: [[(‘test’, ‘accuracy’), (‘model’, ‘accuracy’)], ]

    • monitor_artifacts (可选 [ 列表 [ 联合 [ str *, * 元组 [ str *, * str ] ] ] ] ) – 可选,自动记录管道任务上的步骤工件。 提供了一个由步骤函数创建的工件名称列表,这些工件也将自动记录在管道任务本身上。 示例:[‘processed_data’, ] (管道任务上的目标工件名称将与原始工件名称相同) 或者,提供一个对列表 (source_artifact_name, target_artifact_name): 其中第一个字符串是组件任务上显示的工件名称, 第二个是要放在管道任务上的目标工件名称 示例:[(‘processed_data’, ‘final_processed_data’), ]

    • monitor_models (可选 [ 列表 [ 联合 [ str *, * 元组 [ str *, * str ] ] ] ] ) – 可选,自动记录管道任务中步骤的输出模型。 提供由步骤任务创建的模型名称列表,它们也将出现在管道本身上。 示例:[‘model_weights’, ] 要选择最新的(字典序)模型,请使用“model_*”,或仅使用“*”选择最后创建的模型 示例:[‘model_weights_*’, ] 或者,提供一对(source_model_name, target_model_name)的列表: 其中第一个字符串是组件任务上出现的模型名称, 第二个是要放在管道任务上的目标模型名称 示例:[(‘model_weights’, ‘final_model_weights’), ]

    • retry_on_failure (可选 [ 联合 [ 整数 *, * 可调用 [ [ PipelineController *, * PipelineController.Node *, * 整数 ] *, * 布尔值 ] ] *] * # noqa ) – 整数(重试次数)或返回True以允许重试的回调函数

      • Integer: In case of node failure, retry the node the number of times indicated by this parameter.
        • 可调用:在节点失败时调用的函数。参数如下:

      PipelineController实例,失败的PipelineController.Node和一个表示失败节点先前重试次数的整数 该函数必须返回一个布尔值:如果节点应重试则为True,否则为False。 如果为True,节点将重新排队,剩余的重试次数将减少1。 默认情况下,如果未指定此回调,函数将按照retry_on_failure指示的次数重试。

    def example_retry_on_failure_callback(pipeline, node, retries):
    print(node.name, ' failed')
    # allow up to 5 retries (total of 6 runs)
    return retries &lt; 5
    • pre_execute_callback (可选 [ 可调用 [ [ PipelineController *, * PipelineController.Node *, * 字典 ] *, * 布尔 ] *] * # noqa ) – 回调函数,在步骤(任务)创建时调用,

      并在发送执行之前。允许用户在启动前修改任务。

    使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 parameters 是传递给 ClearmlJob 的配置参数。

    如果回调返回值为False,则该节点将被跳过,依赖于此节点的DAG中的任何节点也将被跳过。

    请注意参数已经被解析, 例如 ${step1.parameters.Args/param} 已被替换为相关值。

    def step_created_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    parameters, # type: dict
    ):
    pass
    • post_execute_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node ] *, * None ] *] * # noqa ) – Callback function, called when a step (Task) is completed and other jobs are going to be executed. Allows a user to modify the Task status after completion.
    def step_completed_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    ):
    pass
    • status_change_callback (Optional [ Callable [ [ PipelineController *, * PipelineController.Node *, * str ] *, * None ] *] * # noqa ) – Callback function, called when the status of a step (Task) changes. Use node.job to access the ClearmlJob object, or node.job.task to directly access the Task object. The signature of the function must look the following way:
    def status_change_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    previous_status # type: str
    ):
    pass
    • tags (可选 [ 联合 [ str *, * 序列 [ str ] ] ] ) – 特定管道步骤的标签列表。 当远程执行管道时 (即从UI启动管道/将其加入队列),此方法无效。

    • output_uri (可选 [ 联合 [ str *, * bool ] ] ) – 此步骤的存储/输出URL。这是输出模型和其他工件的默认位置。有关更多信息,请查看Task.init参考文档(output_uri是一个参数)。

    • 草稿 (可选 [ 布尔值 ] ) – (默认 False). 如果为 True,任务将作为草稿任务创建。

    • working_dir (可选 [ str ] ) – 启动步骤的工作目录。

    • continue_behaviour (可选 [ 字典 ] ) – 控制管道在步骤失败/中止后是否继续运行。可以使用布尔选项的字典来设置不同的行为。支持的选项有:

    • continue_on_fail - If True, the pipeline will continue even if the step failed.

    如果为False,管道将停止

    • continue_on_abort - If True, the pipeline will continue even if the step was aborted.

    如果为False,管道将停止

    • skip_children_on_fail - If True, the children of this step will be skipped if it failed.

    如果为False,即使此步骤失败,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    • skip_children_on_abort - If True, the children of this step will be skipped if it was aborted.

    如果为False,即使此步骤被中止,子步骤仍将运行。 从失败的步骤传递给其子步骤的任何参数将默认为None

    如果字典中不存在这些键,它们的值将默认为 True

  • 返回类型

    可调用的

  • 返回

    函数包装器


连接配置

connect_configuration(configuration, name=None, description=None)

将配置字典或配置文件(pathlib.Path / str)连接到PipelineController对象。 在读取配置文件之前应调用此方法。

例如,一个本地文件:

config_file = pipe.connect_configuration(config_file)
my_params = json.load(open(config_file,'rt'))

一个参数字典/列表:

my_params = pipe.connect_configuration(my_params)
  • 参数

    • 配置 (Union[Mapping, list, Path, str]) – 配置。这通常是用于模型训练过程中的配置。

      指定以下之一:

      • 一个字典/列表 - 包含配置的字典。ClearML 将配置存储在

      ClearML 服务器 (后端), 以 HOCON 格式 (类似 JSON 的格式) 可编辑。

      • 一个 pathlib2.Path 字符串 - 配置文件的路径。ClearML 存储文件的内容。

      本地路径必须是相对路径。当在远程工作节点上执行管道时,从ClearML服务器(后端)带来的内容会覆盖文件的内容。

    • name (str ) – 配置部分的名称。默认值:'General' 允许用户存储多个配置字典/文件

    • 描述 (str ) – 配置部分的描述(文本)。默认值:无

  • 返回类型

    Union[dict, Path, str]

  • 返回

    如果指定了字典,则返回一个字典。如果指定了pathlib2.Path / 字符串,则返回本地配置文件的路径。配置对象。


PipelineDecorator.create

classmethod create(project_name, task_name, repo=None, branch=None, commit=None, script=None, working_directory=None, packages=None, requirements_file=None, docker=None, docker_args=None, docker_bash_setup_script=None, argparse_args=None, force_single_script_file=False, version=None, add_run_number=True)

在系统中手动创建并填充一个新的Pipeline。 支持来自函数、装饰器和任务的pipelines。

  • 参数

    • project_name (str) – 为管道设置项目名称。

    • task_name (str) – 设置远程管道的名称。

    • repo (Optional[str]) – 要使用的仓库的远程URL,或本地git仓库的路径。 示例:'https://github.com/allegroai/clearml.git' 或 '~/project/repo'。如果指定了repo,则 还必须指定script参数

    • branch (Optional[str]) – 选择特定的仓库分支/标签(意味着从该分支获取最新的提交)

    • commit (Optional[str]) – 选择特定的提交ID使用(默认:最新提交,或与本地仓库匹配的本地提交ID)

    • script (Optional[str]) – 指定远程执行的入口脚本。当与远程git仓库一起使用时,脚本应该是仓库内的相对路径,例如:'./source/train.py'。当与本地仓库路径一起使用时,它支持直接指向本地仓库内的文件路径,例如:'~/project/source/train.py'

    • working_directory (Optional[str]) – 启动脚本的工作目录。默认值:仓库根文件夹。 相对于仓库根目录或本地文件夹。

    • packages (Union[bool, Sequence[str], None]) – 手动指定所需的包列表。示例:["tqdm>=2.1", "scikit-learn"] 或设置为 True 以根据本地安装的包自动创建需求(仓库必须是本地的)。

    • requirements_file (Union[str, Path, None]) – 指定在设置会话时要安装的requirements.txt文件。 如果未提供,则将使用存储库中的requirements.txt。

    • docker (Optional[str]) – 选择要在远程会话中执行的docker镜像

    • docker_args (Optional[str]) – 添加docker参数,传递一个单独的字符串

    • docker_bash_setup_script (Optional[str]) – 添加要在设置任务环境之前在docker内部执行的bash脚本

    • argparse_args (Optional[Sequence[Tuple[str, str]]]) – 传递给远程执行的参数,字符串对的列表(参数,值) 注意,仅在代码库本身使用 argparse.ArgumentParser 时支持。

    • force_single_script_file (bool) – 如果为True,则不自动检测本地仓库

    • 版本 (可选 [ 字符串 ] ) –

    • add_run_number (bool ) –

  • 返回类型

    PipelineController

  • 返回

    新创建的PipelineController


创建草稿

create_draft()

可选,手动创建并序列化管道任务(谨慎使用,用于手动创建多个管道)。

注意 推荐的流程是调用 pipeline.start(queue=None),这将产生类似的效果,并允许您在以后克隆/入队。

调用Pipeline.create()后,用户可以在UI中编辑管道并将其加入队列以执行。

注意:此函数应用于以编程方式创建管道以供后续使用。 要自动创建并启动管道,请调用 start() 方法。

  • 返回类型

    None


PipelineDecorator.debug_pipeline

classmethod debug_pipeline()

设置调试模式,将所有函数作为函数在本地运行(串行) 在本地运行完整的管道DAG,其中步骤作为函数执行 注意:

在本地运行DAG假设是本地代码执行(即不会克隆和应用git差异) 管道步骤作为函数执行(不会创建任务),以便于调试 J

  • 返回类型

    ()


已用时间

elapsed()

返回从控制器启动时间戳开始经过的分钟数。

  • 返回类型

    float

  • 返回

    控制器启动时间的分钟数。负值表示进程尚未启动。


PipelineDecorator.enqueue

classmethod enqueue(pipeline_controller, queue_name=None, queue_id=None, force=False)

将PipelineController加入执行队列以进行执行。

info

工作守护进程必须在队列中监听,以便工作器获取任务并执行它,请参阅ClearML文档中的“ClearML Agent”。

  • 参数

    • pipeline_controller (Union[PipelineController, str]) – 要排队的PipelineController。指定一个PipelineController对象或PipelineController ID

    • queue_name (Optional[str]) – 队列的名称。如果未指定,则必须指定 queue_id

    • queue_id (Optional[str]) – 队列的ID。如果未指定,则必须指定queue_name

    • force (bool ) – 如果为True,在将其加入队列之前,必要时重置PipelineController

  • 返回类型

    Any

  • 返回

    一个入队JSON响应。

    {
    "queued": 1,
    "updated": 1,
    "fields": {
    "status": "queued",
    "status_reason": "",
    "status_message": "",
    "status_changed": "2020-02-24T15:05:35.426770+00:00",
    "last_update": "2020-02-24T15:05:35.426770+00:00",
    "execution.queue": "2bd96ab2d9e54b578cc2fb195e52c7cf"
    }
    }
    • queued - 已排队的任务数量(一个整数或null)。

    • updated - 更新的任务数量(一个整数或null)。

    • fields

      • status - 实验的状态。

      • status_reason - 上次状态变更的原因。

      • status_message - 关于状态的信息。

      • status_changed - 最后状态更改的日期和时间(ISO 8601格式)。

      • last_update - 任务的最后更新时间,包括任务的创建、更新、更改或此任务的事件(ISO 8601格式)。

      • execution.queue - 任务入队的队列ID。null 表示未入队。


PipelineDecorator.get

classmethod get(pipeline_id=None, pipeline_project=None, pipeline_name=None, pipeline_version=None, pipeline_tags=None, shallow_search=False)

获取特定的PipelineController。如果找到多个管道控制器,则返回具有最高语义版本的管道控制器。如果未找到语义版本,则返回最近更新的管道控制器。如果未找到管道控制器,此函数将引发异常。

注意:要运行此函数返回的管道控制器,请使用 PipelineController.enqueue

  • 参数

    • pipeline_id (Optional[str]) – 请求的PipelineController ID

    • pipeline_project (Optional[str]) – 请求的 PipelineController 项目

    • pipeline_name (Optional[str]) – 请求的PipelineController名称

    • pipeline_tags (Optional[Sequence[str]]) – 请求的PipelineController标签(标签字符串列表)

    • shallow_search (bool) – 如果为True,仅搜索前500个结果(第一页)

    • pipeline_version (可选 [ str ] ) –

  • 返回类型

    ForwardRef


PipelineDecorator.get_current_pipeline

classmethod get_current_pipeline()

返回当前正在运行的管道实例

  • 返回类型

    ForwardRef


PipelineDecorator.get_logger

classmethod get_logger()

返回一个连接到管道任务的日志记录器。 该日志记录器可以被管道执行的任何函数/任务使用,以便直接向管道任务本身报告。它也可以从主管道控制任务中调用。

如果无法定位主Pipeline任务,则引发ValueError。

  • 返回类型

    Logger

  • 返回

    用于报告指标(标量、图表、调试样本等)的Logger对象


获取参数

get_parameters()

返回管道参数字典 :rtype: dict :return: 字典 str -> str

  • 返回类型

    字典


获取管道DAG

get_pipeline_dag()

返回管道执行图,DAG中的每个节点都是PipelineController.Node对象。 图本身是一个节点字典(基于节点名称的键), 每个节点都持有指向其父节点的链接(通过其唯一名称标识)

  • 返回类型

    Mapping[str, Node]

  • 返回

    执行树,作为一个嵌套字典。示例:

{
'stage1' : Node() {
name: 'stage1'
job: ClearmlJob
...
},
}

获取已处理的节点

get_processed_nodes()

返回已处理的管道节点列表,列表中的每个条目都是PipelineController.Node对象。

  • 返回类型

    Sequence[Node]

  • 返回

    已执行(不包括当前正在执行的)节点列表


获取运行中的节点

get_running_nodes()

返回当前正在运行的管道节点列表,列表中的每个条目都是PipelineController.Node对象。

  • 返回类型

    Sequence[Node]

  • 返回

    当前运行的节点列表


正在运行

is_running()

如果管道控制器正在运行,则返回True。

  • 返回类型

    bool

  • 返回

    一个布尔值,指示管道控制器是否处于活动状态(仍在运行)或已停止。


is_successful

is_successful(fail_on_step_fail=True, fail_condition='all')

评估管道是否成功。

  • 参数

    • fail_on_step_fail (bool) – 如果为 True(默认值),则评估管道步骤的状态以判断管道是否成功。如果为 False,则仅评估控制器

    • fail_condition (str) – 必须是以下之一:‘all’(默认),‘failed’ 或 ‘aborted’。如果选择‘failed’,当管道失败时,此函数将返回 False,如果管道被中止,则返回 True。如果选择‘aborted’,当管道被中止时,此函数将返回 False,如果管道失败,则返回 True。如果选择‘all’,此函数在两种情况下都将返回 False。

  • 返回类型

    bool

  • 返回

    一个布尔值,指示管道是否成功。请注意,如果管道处于运行/挂起状态,此函数将返回False


PipelineDecorator.pipeline

*classmethod pipeline(_func=None, , name, project, version=None, return_value=None, default_queue=None, pool_frequency=0.2, add_pipeline_tags=False, target_project=None, abort_on_failure=False, pipeline_execution_queue='services', multi_instance_support=False, add_run_number=True, args_map=None, start_controller_locally=False, retry_on_failure=None, docker=None, docker_args=None, docker_bash_setup_script=None, packages=None, repo=None, repo_branch=None, repo_commit=None, artifact_serialization_function=None, artifact_deserialization_function=None, output_uri=None, skip_global_imports=False, working_dir=None, enable_local_imports=True)

装饰管道逻辑函数。

  • 参数

    • name (str ) – 提供管道名称(如果存在主任务,它将覆盖其名称)

    • 项目 (str ) – 提供存储管道的项目(如果存在主任务,它将覆盖其项目)

    • 版本 (可选 [ str ] ) – 管道版本。此版本允许唯一标识管道模板的执行。语义版本的示例:version=’1.0.1’ , version=’23’, version=’1.2’。 如果未设置,则查找管道的最新版本并递增。如果未找到此类版本,则默认为‘1.0.0’

    • return_value (可选 [ str ] ) – 可选,提供一个工件名称来存储管道函数的返回对象 注意,如果未提供,管道将不会存储管道函数的返回值。

    • default_queue (可选 [ str ] ) – 默认的管道步骤队列

    • pool_frequency (float ) – 监控实验/状态的池化频率(以分钟为单位)。

    • add_pipeline_tags (bool ) – (默认: False) 如果为 True,则为由此管道创建的所有步骤(任务)添加 pipe: 标签。

    • target_project (str ) – 如果提供,所有管道步骤将被克隆到目标项目中

    • abort_on_failure (bool ) – 如果为False(默认值),失败的管道步骤不会导致管道立即停止,相反,任何未连接到(或间接连接到)失败步骤的步骤仍将被执行。尽管如此,管道本身将被标记为失败,除非失败步骤特别定义为“continue_on_fail=True”。 如果为True,任何失败的步骤都将导致管道立即中止,停止所有正在运行的步骤,并将管道标记为失败。

    • pipeline_execution_queue (Optional [ str ] ) – 远程管道执行队列(默认为‘services’队列)。 如果传入None,则在本地执行管道逻辑(管道步骤仍然在远程执行)

    • multi_instance_support (bool ) – 如果为True,允许对同一管道函数进行多次调用,每次调用都会创建一个新的管道任务。请注意,建议在“主进程”上创建一个额外的任务,作为主管道,自动收集执行图。如果multi_instance_support==’parallel’,则管道调用将并行执行,在并行情况下,函数调用返回None,要收集所有管道结果,请调用PipelineDecorator.wait_for_multi_pipelines()。默认值为False,不支持多实例管道。

    • add_run_number (bool ) – 如果为True(默认),将管道的运行编号添加到管道名称中。 例如,第二次启动管道“best pipeline”时,我们将其重命名为“best pipeline #2”

    • args_map (dict [ str *, * List [ str ] ] ) – 将参数映射到其特定的配置部分。未包含在此映射中的参数

      将默认为Args部分。例如,对于以下代码:

      @PipelineDecorator.pipeline(args_map={'sectionA':['paramA'], 'sectionB:['paramB','paramC']
      def executing_pipeline(paramA, paramB, paramC, paramD):
      pass

      参数将存储为:

      • paramA: sectionA/paramA

      • paramB: sectionB/paramB

      • paramC: sectionB/paramC

      • paramD: 参数/paramD

    • start_controller_locally (bool ) – 如果为True,则在本地机器上启动控制器。如果未调用PipelineDecorator.run_locally或PipelineDecorator.debug_pipeline,步骤将在远程运行。 默认值:False

    • retry_on_failure (可选 [ 联合 [ 整数 *, * 可调用 [ [ PipelineController *, * PipelineController.Node *, * 整数 ] *, * 布尔值 ] ] *] * # noqa ) – 整数(重试次数)或返回True以允许重试的回调函数

      • 整数:在节点故障的情况下,按照此参数指定的次数重试该节点。

      • 可调用:在节点失败时调用的函数。参数如下:

      PipelineController实例,失败的PipelineController.Node以及一个表示失败节点先前重试次数的整数。 如果节点应重试,则函数必须返回True,否则返回False。 如果为True,节点将重新排队,剩余的重试次数将减少1。 默认情况下,如果未指定此回调,函数将按照retry_on_failure指示的次数重试。

      def example_retry_on_failure_callback(pipeline, node, retries):
      print(node.name, ' failed')
      # allow up to 5 retries (total of 6 runs)
      return retries &lt; 5
    • docker (可选 [ str ] ) – 选择要在远程会话中执行的docker镜像

    • docker_args (可选 [ str ] ) – 添加docker参数,传递一个字符串

    • docker_bash_setup_script (可选 [ str ] ) – 添加要在设置任务环境之前在docker内部执行的bash脚本

    • packages (可选 [ Union [ bool *, * str *, * Sequence [ str ] ] ] ) – 手动指定所需的包列表或本地的requirements.txt文件。 示例:[“tqdm>=2.1”, “scikit-learn”] 或 “./requirements.txt” 如果未提供,将根据函数中使用的导入自动添加包。 使用False从git仓库中的“requirements.txt”安装需求

    • repo (可选 [ str ] ) – 可选,指定一个仓库附加到函数上,以便在远程执行时使用。 允许用户在指定的仓库内执行函数,使他们能够从仓库中加载模块/脚本。 注意,执行工作目录将是仓库的根文件夹。 支持git仓库的URL链接,以及本地仓库路径(自动转换为远程git/提交,如当前检出的那样)。 示例远程URL:'https://github.com/user/repo.git' 示例本地仓库副本:'./repo' -> 将自动存储远程仓库URL和基于本地克隆副本的提交ID 使用空字符串(“”)来禁用任何仓库的自动检测

    • repo_branch (可选 [ str ] ) – 可选,指定远程仓库分支(如果使用本地仓库路径,则忽略)

    • repo_commit (可选 [ str ] ) – 可选,指定仓库提交ID(如果使用本地仓库路径,则忽略)

    • artifact_serialization_function (Optional [ Callable [ [ Any ] *, * Union [ bytes *, * bytearray ] ] ] ) – 一个序列化函数,它接受一个任意类型的参数,该参数是要序列化的对象。该函数应返回一个表示序列化对象的 bytes 或 bytearray 对象。管道上传的所有参数/返回工件都将使用此函数进行序列化。所有相关的导入必须在此函数中完成。例如:

    def serialize(obj):
    import dill
    return dill.dumps(obj)
    • artifact_deserialization_function (Optional [ Callable [ [ bytes ] *, * Any ] ] ) – A deserialization function that takes one parameter of type bytes, which represents the serialized object. This function should return the deserialized object. All parameter/return artifacts fetched by the pipeline will be deserialized using this function. All relevant imports must be done in this function. For example:
    def deserialize(bytes_):
    import dill
    return dill.loads(bytes_)
    • output_uri (可选 [ 联合 [ str *, * bool ] ] ) – 此管道的存储/输出URL。这是输出模型和其他工件的默认位置。有关更多信息,请查看Task.init参考文档(output_uri是一个参数)。 此管道步骤的output_uri将默认为此值。

    • skip_global_imports (bool ) – 如果为True,全局导入将不会包含在步骤的执行中,否则所有全局导入将在每个步骤执行开始时以安全的方式自动导入。默认值为False

    • working_dir (可选 [ str ] ) – 启动管道的工作目录。

    • enable_local_imports (bool ) – 如果为True,允许管道步骤通过将管道控制器脚本所在的目录(sys.path[0])附加到每个步骤的PYTHONPATH来从本地文件导入。 如果为False,该目录将不会附加到PYTHONPATH。默认值为True。 在远程运行时忽略。

  • 返回类型

    可调用的


PipelineDecorator.run_locally

classmethod run_locally()

设置本地模式,将所有函数作为子进程在本地运行

在本地运行完整的管道DAG,其中步骤作为子进程任务执行 注意:在本地运行DAG假设本地代码执行(即不会克隆和应用git差异)

  • 返回类型

    ()


PipelineDecorator.set_default_execution_queue

classmethod set_default_execution_queue(default_execution_queue)

如果流水线步骤未指定执行队列,则设置默认执行队列

  • 参数

    default_execution_queue (Optional[str]) – 如果没有提供执行队列,则使用的执行队列

  • 返回类型

    None


设置管道执行时间限制

设置管道执行时间限制(max_execution_minutes)

设置整个管道的最大执行时间(分钟)。传递None或0以禁用执行时间限制。

  • 参数

    max_execution_minutes (float ) – 整个管道过程的最大时间(分钟)。默认值为 None,表示没有时间限制。

  • 返回类型

    None


开始

start(queue='services', step_task_created_callback=None, step_task_completed_callback=None, wait=True)

远程启动当前管道(在选定的服务队列上)。 当前进程将停止并在远程启动。

  • 参数

    • queue – 用于启动管道的队列名称

    • step_task_created_callback (Callable ) – 回调函数,当步骤(任务)创建时调用 并在发送执行之前。允许用户在启动前修改任务。 使用 node.job 访问 ClearmlJob 对象,或使用 node.job.task 直接访问 Task 对象。 参数是传递给 ClearmlJob 的配置参数。

    如果回调返回值为False,则该节点将被跳过,依赖于此节点的DAG中的任何节点也将被跳过。

    请注意参数已经被解析, 例如 ${step1.parameters.Args/param} 已被替换为相关值。

    def step_created_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    parameters, # type: dict
    ):
    pass
    • step_task_completed_callback (Callable ) – Callback function, called when a step (Task) is completed and other jobs are executed. Allows a user to modify the Task status after completion.
    def step_completed_callback(
    pipeline, # type: PipelineController,
    node, # type: PipelineController.Node,
    ):
    pass
    • wait – If True (default), start the pipeline controller, return only after the pipeline is done (completed/aborted/failed)
  • 返回类型

    bool

  • 返回

    如果控制器启动,则为True。如果控制器未启动,则为False。


本地启动

start_locally(run_pipeline_steps_locally=False)

在本地启动当前管道,意味着管道逻辑在当前机器上运行,而不是在服务队列上。

使用 run_pipeline_steps_locally=True 你可以在本地以子进程的形式运行所有管道步骤。 注意:当在本地运行管道步骤时,它假设是本地代码执行 (即它正在运行本地代码,而不考虑管道步骤任务上的 git 提交/差异)

  • 参数

    run_pipeline_steps_locally (bool) – (默认 False) 如果为 True,则在本地作为子进程运行管道步骤(用于在本地调试管道,注意管道代码应在本地机器上可用)

  • 返回类型

    None


停止

stop(timeout=None, mark_failed=False, mark_aborted=False)

停止管道控制器和优化线程。 如果mark_failed和mark_aborted为False(默认),则将管道标记为已完成, 除非其中一个步骤失败,则将管道标记为失败。

  • 参数

    • timeout (可选 [ float ] ) – 等待优化线程退出的超时时间(分钟)。 默认值为 None,表示不等待立即终止。

    • mark_failed (bool ) – 如果为True,将管道任务标记为失败。(默认值为False)

    • mark_aborted (bool ) – 如果为True,将管道任务标记为已中止。(默认值为False)

  • 返回类型

    ()


更新执行图

update_execution_plot()

更新当前管道的桑基图

  • 返回类型

    ()


PipelineDecorator.upload_artifact

classmethod upload_artifact(name, artifact_object, metadata=None, delete_after_upload=False, auto_pickle=None, preview=None, wait_on_upload=False, serialization_function=None)

上传(添加)一个工件到主Pipeline任务对象。 此函数可以从任何管道组件调用,以直接将工件添加到主管道任务中。

工件可以由管道执行的任何函数/任务上传,以便直接报告给管道任务本身。它也可以从主管道控制任务中调用。

如果无法定位主Pipeline任务,则引发ValueError。

当前支持的上传工件类型包括:

  • 字符串 / 路径 - 指向工件文件的路径。如果指定了通配符或文件夹,则 ClearML 会创建并上传一个 ZIP 文件。

  • 字典 - ClearML 将字典存储为 .json 文件并上传。

  • pandas.DataFrame - ClearML 将 pandas.DataFrame 存储为 .csv.gz(压缩的 CSV)文件并上传。

  • numpy.ndarray - ClearML 将 numpy.ndarray 存储为 .npz 文件并上传。

  • PIL.Image - ClearML 将 PIL.Image 存储为 .png 文件并上传。

  • 任何 - 如果使用 auto_pickle=True 调用,对象将被序列化并上传。

  • 参数

    • name (str ) – The artifact name.
    warning

    如果之前上传了同名的工件,则会被覆盖。

    • artifact_object (object ) – 工件对象。

    • metadata (dict ) – 一个包含任意元数据的键值对字典。该字典会出现在ClearML Web-App (UI)ARTIFACTS标签页中。

    • delete_after_upload (bool ) – 上传后,删除本地的工件副本

      • True - 删除工件的本地副本。

      • False - 不删除。(默认)

    • auto_pickle (bool ) – 如果为True,且artifact_object不是以下类型之一: pathlib2.Path, dict, pandas.DataFrame, numpy.ndarray, PIL.Image, url (string), local_file (string) artifact_object将被pickle并作为pickle文件artifact上传(文件扩展名为.pkl) 如果设置为None(默认),将使用sdk.development.artifacts.auto_pickle配置值。

    • 预览 (任意 ) – 工件预览

    • wait_on_upload (bool) – 是否应同步上传,强制上传完成后再继续。

    • Union [ bytes **, ** bytearray ] **] ** serialization_function (Callable [ Any , ) – 一个序列化函数,它接受一个任意类型的参数,该参数是要序列化的对象。该函数应返回一个表示序列化对象的bytes或bytearray对象。请注意,对象将立即使用此函数进行序列化,因此即使可能,也不会使用其他序列化方法(例如pandas.DataFrame.to_csv)。在使用Artifact.get方法获取此工件时,要反序列化此工件,请使用其deserialization_function参数。

    • serialization_function (可选 [ Callable [ [ Any ] *, * Union [ bytes *, * bytearray ] ] ] ) –

  • 返回类型

    bool

  • 返回

    上传的状态。

  • True - 上传成功。

  • False - 上传失败。

  • Improvement

    如果工件对象类型不受支持,则引发ValueError

  • 参数

    • name (str ) –

    • artifact_object (任意 ) –

    • metadata (可选 [ 映射 ] ) –

    • delete_after_upload (bool ) –

    • auto_pickle (可选 [ 布尔型 ] ) –

    • 预览 (可选 [ 任意 ] ) –

    • wait_on_upload (bool ) –

    • serialization_function (可选 [ Callable [ [ Any ] *, * Union [ bytes *, * bytearray ] ] ] ) –

  • 返回类型

    布尔


PipelineDecorator.upload_model

classmethod upload_model(model_name, model_local_path, upload_uri=None)

上传(添加)一个模型到主管道任务对象。 此函数可以从任何管道组件调用,以直接将模型添加到主管道任务中。

模型文件/路径将被上传到管道任务并在模型仓库中注册。

如果无法定位主Pipeline任务,则引发ValueError。

  • 参数

    • model_name (str) – 模型名称,将出现在模型注册表中(在管道的项目中)

    • model_local_path (str) – 要上传的本地模型文件或目录的路径。 如果提供了本地目录,文件夹的内容(递归地)将被打包成一个zip文件并上传。

    • upload_uri (Optional[str]) – 模型权重上传的存储目标URI。默认值是之前使用的URI。

  • 返回类型

    OutputModel

  • 返回

    上传的OutputModel


等待

wait(timeout=None)

等待管道完成。

info

此方法不会停止管道。调用stop以终止管道。

  • 参数

    timeout (float ) – 等待管道完成的超时时间(分钟)。 如果 None,则等待直到达到超时或管道完成。

  • 返回类型

    bool

  • 返回

    如果管道完成,则为True。如果管道超时,则为False。


PipelineDecorator.wait_for_multi_pipelines

classmethod wait_for_multi_pipelines()

等待所有后台多管道执行完成。 按调用顺序返回所有管道结果(第一个管道调用在索引0处)

  • 返回

    基于调用顺序的执行管道返回值列表。