工作流管理#
工作流ID#
每个工作流都有一个唯一的 workflow_id
。默认情况下,当你调用 .run()
或 .run_async()
时,会生成一个随机 ID。建议你通过 .run(workflow_id="id")
显式地为每个工作流分配一个 ID。
如果使用先前创建的工作流ID调用 .run()
,工作流将从上一次执行处恢复。
工作流状态#
一个工作流程可以处于以下几种状态之一:
状态 |
描述 |
---|---|
运行 |
工作流程当前正在集群中运行。 |
待定 |
工作流程已排队,等待执行。 |
失败 |
此工作流程因应用程序错误而失败。可以从失败的任务处恢复。 |
可恢复的 |
此工作流程因系统错误而失败。可以从失败的任务处恢复。 |
CANCELED |
工作流程已取消。其结果不可用,且无法恢复。 |
成功 |
工作流程已成功执行。 |
单一工作流管理API#
import ray
from ray import workflow
@ray.remote
def task():
return 3
workflow.run(task.bind(), workflow_id="workflow_id")
# Get the status of a workflow.
try:
status = workflow.get_status(workflow_id="workflow_id")
assert status in {
"RUNNING", "RESUMABLE", "FAILED",
"CANCELED", "SUCCESSFUL"}
except workflow.exceptions.WorkflowNotFoundError:
print("Workflow doesn't exist.")
# Resume a workflow.
print(workflow.resume(workflow_id="workflow_id"))
# return is an ObjectRef which is the result of this workflow
# Cancel a workflow.
workflow.cancel(workflow_id="workflow_id")
# Delete the workflow.
workflow.delete(workflow_id="workflow_id")
3
批量工作流管理API#
# List all running workflows.
print(workflow.list_all("RUNNING"))
# List RUNNING and CANCELED workflows.
print(workflow.list_all({"RUNNING", "CANCELED"}))
# List all workflows.
print(workflow.list_all())
# Resume all resumable workflows. This won't include failed workflow
print(workflow.resume_all())
# To resume workflows including failed ones, use `include_failed=True`
print(workflow.resume_all(include_failed=True))
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "RUNNING")]
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")]
[("workflow_id_1", "RUNNING"), ("workflow_id_2", "CANCELED")]
[("workflow_id_1", ObjectRef), ("workflow_id_2", ObjectRef)]
[("workflow_id_1", ObjectRef), ("workflow_id_3", ObjectRef)]
重复的工作流程#
Ray Workflows 目前没有内置的任务调度器。不过,你可以很容易地使用任何外部任务调度器与你的 Ray 集群(通过 任务提交)交互,以触发工作流运行。
存储配置#
Ray Workflows 支持多种类型的存储后端,包括:
本地文件系统:数据存储在本地。此选项仅适用于单节点测试,因为数据必须存储在共享文件系统(如 NFS)上才能用于多节点集群。要使用本地存储,请指定
ray.init(storage="/path/to/storage_dir")
或ray start --head --storage="/path/to/storage_dir"
。S3: 这是生产环境中的一个流行选择,因为它提供了可扩展且持久的对象存储。使用
ray.init(storage="s3://bucket/path")
或ray start --head --storage="s3://bucket/path"
启用 S3 存储。
Ray 在内部使用 pyarrow 作为存储引擎。有关 pyarrow 支持的完整存储选项列表,请参阅 Pyarrow.fs.FileSystem 文档。
备注
如果你在使用 pyarrow 支持的存储选项时遇到问题,请确保你安装了正确版本的 pyarrow。例如,GCS(Google Cloud Storage)文件系统仅在 pyarrow >= 9.0 中受支持。
如果未指定,/tmp/ray/workflow_data
将用于临时存储。此默认设置 仅适用于单节点 Ray 集群。
并发控制#
Ray Workflows 支持并发控制。你可以在执行任何工作流之前,通过 workflow.init()
来支持最大运行中的工作流和最大挂起的工作流。使用不同配置再次调用 workflow.init()
会引发错误,除非传入 None
。
例如,workflow.init(max_running_workflows=10, max_pending_workflows=50)
意味着最多会有10个工作流在运行,50个工作流在等待。如果在另一个驱动程序上调用不同的值将引发异常。如果它们被设置为 None
,则会使用之前设置的值。
当待处理工作流数量达到最大值时提交工作流会引发 queue.Full("工作流队列已满")
。获取待处理工作流的输出将被阻塞,直到工作流稍后完成运行。
待处理的工作流具有 PENDING
状态。在待处理的工作流被中断(例如,集群故障)后,它可以被恢复。当恢复被中断的工作流,这些工作流在运行和待处理状态时使用 workflow.resume_all()
,运行中的工作流比待处理的工作流具有更高的优先级(即,待处理的工作流可能仍然处于待处理状态)。
备注
工作流不保证恢复的工作流按相同顺序运行。