Skip to main content

TriggerScheduler

class automation.TriggerScheduler()

如果系统中发生事件,则触发任务执行。

示例:

  • 新模型已发布/标记,

  • 新数据集已创建,

  • 一般任务失败,

  • 任务指标低于/高于阈值,每X分钟提醒一次

创建一个任务触发服务

  • 参数

    • pooling_frequency_minutes (float) – 每X分钟检查新事件(默认3)

    • sync_frequency_minutes (float) – 每X分钟同步任务调度器配置。 允许通过编辑任务配置对象在运行时更改调度器

    • force_create_task_name (Optional[str]) – 可选,强制创建任务调度器服务,即使主任务初始化已经存在。

    • force_create_task_project (Optional[str]) – 可选,强制创建任务调度器服务,即使主任务初始化已经存在。


添加数据集触发器

add_dataset_trigger(schedule_task_id=None, schedule_queue=None, schedule_function=None, trigger_project=None, trigger_name=None, trigger_on_publish=None, trigger_on_tags=None, trigger_on_archive=None, trigger_required_tags=None, name=None, target_project=None, add_tag=True, single_instance=False, reuse_task=False, task_parameters=None, task_overrides=None)

为预先存在的任务或函数创建类似cron作业的调度。 在数据集存储库中的更改时触发任务/函数的执行(请注意,这不是超数据集)。 请注意,建议为触发器提供一个描述性的唯一名称。如果未提供,则使用任务ID。

注意 task_overrides 可以接受对触发器模型 ID 的引用: 示例:task_overrides={'Args/dataset_id': '${dataset.id}'}.

注意如果传递了schedule_function,请使用以下函数接口:

def schedule_function(dataset_id):
pass
  • 参数

    • schedule_task_id (Union[str, Task, None]) – 要克隆并安排执行的任务/任务ID

    • schedule_queue (Optional[str]) – 将任务放入的队列名称或ID(即调度)

    • schedule_function (Optional[Callable[[str], None]]) – 可选的,不提供要调度的任务ID,而是提供一个要调用的函数。请注意,该函数是从调度器上下文中调用的(即在调度器所在的同一台机器上运行)

    • name (Optional[str]) – 定时任务的名称或描述(如果提供则应为唯一,否则随机生成)

    • trigger_project (Optional[str]) – 仅监控来自此特定项目的数据集(非递归)

    • trigger_name (Optional[str]) – 仅在数据集名称匹配(正则表达式)时触发

    • trigger_on_publish (Optional[bool]) – 当数据集发布时触发。

    • trigger_on_tags (Optional[List[str]]) – 当列表中的所有标签都存在时触发

    • trigger_on_archive (Optional[bool]) – 当数据集被归档时触发

    • trigger_required_tags (Optional[List[str]]) – 仅在具有以下附加标签的数据集上触发(必须包括所有标签)

    • target_project (Optional[str]) – 指定目标项目以放置克隆的预定任务。

    • add_tag (Union[bool, str]) – 向执行的任务添加标签。提供特定的标签(str)或传递True(默认)以使用触发器名称作为标签

    • single_instance (bool) – 如果为True,如果前一个实例仍在运行,则不启动任务作业 (跳过直到下一个计划时间段)。默认值为False。

    • reuse_task (bool) – 如果为True,则每次重新入队相同的任务(即不克隆它),默认为False。

    • task_parameters (Optional[dict]) – 执行任务的配置参数。 例如:{'Args/batch': '12'}。注意:当 reuse_task=True 时不可用。

    • task_overrides (Optional[dict]) – 更改任务定义。 例如 {'script.version_num': None, 'script.branch': 'main'}。注意:当 reuse_task=True 时不可用

  • 返回类型

    None

  • 返回

    如果作业成功添加到调度列表中,则为True


添加模型触发器

add_model_trigger(schedule_task_id=None, schedule_queue=None, schedule_function=None, trigger_project=None, trigger_name=None, trigger_on_publish=None, trigger_on_tags=None, trigger_on_archive=None, trigger_required_tags=None, name=None, target_project=None, add_tag=True, single_instance=False, reuse_task=False, task_parameters=None, task_overrides=None)

为预先存在的任务或函数创建类似cron作业的调度。 在模型仓库发生变化时触发任务/函数的执行 请注意,建议为触发器提供一个描述性的唯一名称,如果没有提供,则使用任务ID。

注意 task_overrides 可以接受对触发器模型 ID 的引用: 示例:task_overrides={'Args/model_id': '${model.id}'} 注意如果传递了 schedule_function,请使用以下函数接口:

def schedule_function(model_id):
pass
  • 参数

    • schedule_task_id (Union[str, Task, None]) – 要克隆并安排执行的任务/任务ID

    • schedule_queue (Optional[str]) – 将任务放入的队列名称或ID(即调度)

    • schedule_function (Optional[Callable[[str], None]]) – 可选的,不提供要调度的任务ID,而是提供一个要调用的函数。请注意,该函数是从调度器上下文中调用的(即在调度器所在的同一台机器上运行)

    • name (Optional[str]) – 定时任务的名称或描述(如果提供则应为唯一,否则随机生成)

    • trigger_project (Optional[str]) – 仅监控来自此特定项目的模型(非递归)

    • trigger_name (Optional[str]) – 仅在名称匹配(正则表达式)的模型上触发

    • trigger_on_publish (Optional[bool]) – 当模型发布时触发。

    • trigger_on_tags (Optional[List[str]]) – 当列表中的所有标签都存在时触发

    • trigger_on_archive (Optional[bool]) – 当模型被归档时触发

    • trigger_required_tags (Optional[List[str]]) – 仅在具有以下附加标签的模型上触发(必须包括所有标签)

    • target_project (Optional[str]) – 指定目标项目以放置克隆的预定任务。

    • add_tag (Union[bool, str]) – 向执行的任务添加标签。提供特定的标签(str)或传递True(默认)以使用触发器名称作为标签

    • single_instance (bool) – 如果为True,如果前一个实例仍在运行,则不启动任务作业 (跳过直到下一个计划时间段)。默认值为False。

    • reuse_task (bool) – 如果为True,则每次重新入队相同的任务(即不克隆它),默认为False。

    • task_parameters (Optional[dict]) – 执行任务的配置参数。 例如:{'Args/batch': '12'} 注意:当 reuse_task=True 时不可用

    • task_overrides (Optional[dict]) – 更改任务定义。 例如 {'script.version_num': None, 'script.branch': 'main'} 注意:当 reuse_task=True 时不可用

  • 返回类型

    None

  • 返回

    如果作业成功添加到调度列表中,则为True


添加任务触发器

add_task_trigger(schedule_task_id=None, schedule_queue=None, schedule_function=None, trigger_project=None, trigger_name=None, trigger_on_tags=None, trigger_on_status=None, trigger_exclude_dev_tasks=None, trigger_on_metric=None, trigger_on_variant=None, trigger_on_threshold=None, trigger_on_sign=None, trigger_required_tags=None, name=None, target_project=None, add_tag=True, single_instance=False, reuse_task=False, task_parameters=None, task_overrides=None)

为预先存在的任务或函数创建类似cron作业的调度。 在任务发生变化时触发任务/函数的执行。 请注意,建议为触发器提供一个描述性的唯一名称,如果未提供,则使用任务ID。

注意 task_overrides 可以接受对触发器模型 ID 的引用: 示例:task_overrides={'Args/task_id': '${task.id}'} 注意如果传递了 schedule_function,请使用以下函数接口:

def schedule_function(task_id):
pass
  • 参数

    • schedule_task_id (Union[str, Task, None]) – 要克隆并安排执行的任务/任务ID

    • schedule_queue (Optional[str]) – 将任务放入的队列名称或ID(即调度)

    • schedule_function (Optional[Callable[[str], None]]) – 可选的,不提供要调度的任务ID,而是提供一个要调用的函数。请注意,该函数是从调度器上下文中调用的(即在调度器所在的同一台机器上运行)

    • name (Optional[str]) – 定时任务的名称或描述(如果提供则应为唯一,否则随机生成)

    • trigger_project (Optional[str]) – 仅监控来自此特定项目的任务(非递归)

    • trigger_name (Optional[str]) – 仅在任务名称匹配(正则表达式)时触发

    • trigger_on_tags (Optional[List[str]]) – 当列表中的所有标签都存在时触发

    • trigger_required_tags (Optional[List[str]]) – 仅在具有以下附加标签的任务上触发(必须包含所有标签)

    • trigger_on_status (Optional[List[str]]) – 在任务状态变化时触发。期望状态字符串列表,例如 [‘failed’, ‘published’]。 TaskStatusEnum: [“created”, “in_progress”, “stopped”, “closed”, “failed”, “completed”, “queued”, “published”, “publishing”, “unknown”]

    • trigger_exclude_dev_tasks (Optional[bool]) – 如果为True,则仅在由clearml-agent执行的任务上触发(而不是手动执行的任务)

    • trigger_on_metric (Optional[str]) – 当指标/变体高于/低于阈值时触发(metric=标题,variant=系列)

    • trigger_on_variant (Optional[str]) – 当指标/变体高于/低于阈值时触发(指标=标题,变体=系列)

    • trigger_on_threshold (Optional[float]) – 当指标/变体高于/低于阈值时触发(浮点数)

    • trigger_on_sign (Optional[str]) – 可能的值 “max”/”maximum” 或 “min”/”minimum”, 如果指标低于“min”或高于“maximum”则触发任务。默认值:“minimum”

    • target_project (Optional[str]) – 指定目标项目以放置克隆的预定任务。

    • add_tag (Union[bool, str]) – 向执行的任务添加标签。提供特定的标签(str)或传递True(默认)以使用触发器名称作为标签

    • single_instance (bool) – 如果为True,如果前一个实例仍在运行,则不启动任务作业 (跳过直到下一个计划时间段)。默认值为False。

    • reuse_task (bool) – 如果为True,则每次重新入队相同的任务(即不克隆它),默认为False。

    • task_parameters (Optional[dict]) – 执行任务的配置参数。 例如:{'Args/batch': '12'} 注意:当 reuse_task=True 时不可用。

    • task_overrides (Optional[dict]) – 更改任务定义。 例如 {'script.version_num': None, 'script.branch': 'main'}。注意:当 reuse_task=True 时不可用

  • 返回类型

    None

  • 返回

    如果作业成功添加到调度列表中,则为True


获取触发器

get_triggers()

返回所有触发器(模型、数据集、任务) :rtype: List[BaseTrigger] :return: 触发器对象列表

  • 返回类型

    列表[clearml.automation.trigger.BaseTrigger]


开始

start()

启动任务触发器循环(注意此函数不会返回)


远程启动

start_remotely(queue='services')

启动任务调度器循环(注意此函数不会返回)

  • 参数

    queue (str) – 运行调度程序的远程队列,默认为‘services’队列。

  • 返回类型

    None