工作流管理#

工作流ID#

每个工作流都有一个唯一的 workflow_id。默认情况下,当你调用 .run().run_async() 时,会生成一个随机 ID。建议你通过 .run(workflow_id="id") 显式地为每个工作流分配一个 ID。

如果使用先前创建的工作流ID调用 .run() ,工作流将从上一次执行处恢复。

工作流状态#

一个工作流程可以处于以下几种状态之一:

状态

描述

运行

工作流程当前正在集群中运行。

待定

工作流程已排队,等待执行。

失败

此工作流程因应用程序错误而失败。可以从失败的任务处恢复。

可恢复的

此工作流程因系统错误而失败。可以从失败的任务处恢复。

CANCELED

工作流程已取消。其结果不可用,且无法恢复。

成功

工作流程已成功执行。

单一工作流管理API#

import ray
from ray import workflow

@ray.remote
def task():
    return 3

workflow.run(task.bind(), workflow_id="workflow_id")

# Get the status of a workflow.
try:
    status = workflow.get_status(workflow_id="workflow_id")
    assert status in {
        "RUNNING", "RESUMABLE", "FAILED",
        "CANCELED", "SUCCESSFUL"}
except workflow.exceptions.WorkflowNotFoundError:
    print("Workflow doesn't exist.")

# Resume a workflow.
print(workflow.resume(workflow_id="workflow_id"))
# return is an ObjectRef which is the result of this workflow

# Cancel a workflow.
workflow.cancel(workflow_id="workflow_id")

# Delete the workflow.
workflow.delete(workflow_id="workflow_id")
3

批量工作流管理API#

# List all running workflows.
print(workflow.list_all("RUNNING"))

# List RUNNING and CANCELED workflows.
print(workflow.list_all({"RUNNING", "CANCELED"}))

# List all workflows.
print(workflow.list_all())

# Resume all resumable workflows. This won't include failed workflow
print(workflow.resume_all())

# To resume workflows including failed ones, use `include_failed=True`
print(workflow.resume_all(include_failed=True))
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")]
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")]
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")]
[("workflow_id_1", ObjectRef), ("workflow_id_2", ObjectRef)]
[("workflow_id_1", ObjectRef), ("workflow_id_3", ObjectRef)]

重复的工作流程#

Ray Workflows 目前没有内置的任务调度器。不过,你可以很容易地使用任何外部任务调度器与你的 Ray 集群(通过 任务提交)交互,以触发工作流运行。

存储配置#

Ray Workflows 支持多种类型的存储后端,包括:

  • 本地文件系统:数据存储在本地。此选项仅适用于单节点测试,因为数据必须存储在共享文件系统(如 NFS)上才能用于多节点集群。要使用本地存储,请指定 ray.init(storage="/path/to/storage_dir")ray start --head --storage="/path/to/storage_dir"

  • S3: 这是生产环境中的一个流行选择,因为它提供了可扩展且持久的对象存储。使用 ray.init(storage="s3://bucket/path")ray start --head --storage="s3://bucket/path" 启用 S3 存储。

Ray 在内部使用 pyarrow 作为存储引擎。有关 pyarrow 支持的完整存储选项列表,请参阅 Pyarrow.fs.FileSystem 文档。

备注

如果你在使用 pyarrow 支持的存储选项时遇到问题,请确保你安装了正确版本的 pyarrow。例如,GCS(Google Cloud Storage)文件系统仅在 pyarrow >= 9.0 中受支持。

如果未指定,/tmp/ray/workflow_data 将用于临时存储。此默认设置 仅适用于单节点 Ray 集群

并发控制#

Ray Workflows 支持并发控制。你可以在执行任何工作流之前,通过 workflow.init() 来支持最大运行中的工作流和最大挂起的工作流。使用不同配置再次调用 workflow.init() 会引发错误,除非传入 None

例如,workflow.init(max_running_workflows=10, max_pending_workflows=50) 意味着最多会有10个工作流在运行,50个工作流在等待。如果在另一个驱动程序上调用不同的值将引发异常。如果它们被设置为 None,则会使用之前设置的值。

当待处理工作流数量达到最大值时提交工作流会引发 queue.Full("工作流队列已满")。获取待处理工作流的输出将被阻塞,直到工作流稍后完成运行。

待处理的工作流具有 PENDING 状态。在待处理的工作流被中断(例如,集群故障)后,它可以被恢复。当恢复被中断的工作流,这些工作流在运行和待处理状态时使用 workflow.resume_all(),运行中的工作流比待处理的工作流具有更高的优先级(即,待处理的工作流可能仍然处于待处理状态)。

备注

工作流不保证恢复的工作流按相同顺序运行。