关键概念#
备注
Workflows 是一个为 Ray 任务图提供强大持久性的库。如果你是 Ray 的新手,我们建议从 核心演练 开始。
DAG API#
通常,Ray 任务是立即执行的。为了提供持久性,Ray Workflows 使用惰性的 Ray DAG API 来分离任务 DAG 的定义和执行。
从 Ray 任务切换到 DAG API 很简单:只需将所有对 .remote(...)
的调用(返回对象引用)替换为对 .bind(...)
的调用(返回 DAG 节点)。Ray DAG 节点可以像普通的 Ray 任务一样组合。
然而,与 Ray 任务不同,你不能在 DAG 节点上调用 ray.get()
或 ray.wait()
。相反,需要 执行 DAG 以计算结果。
将函数组合成一个DAG:
import ray
@ray.remote
def one() -> int:
return 1
@ray.remote
def add(a: int, b: int) -> int:
return a + b
dag = add.bind(100, one.bind())
工作流执行#
要使用工作流执行DAG,请使用 workflow.run
:
from ray import workflow
# Run the workflow until it completes and returns the output
assert workflow.run(dag) == 101
# Or you can run it asynchronously and fetch the output via 'ray.get'
output_ref = workflow.run_async(dag)
assert ray.get(output_ref) == 101
一旦启动,工作流的执行将被持久化记录到存储中。在系统故障时,工作流可以在任何有权访问该存储的 Ray 集群上恢复。
在执行工作流DAG时,工作流任务在失败时会重试,但一旦它们成功完成并且结果被工作流引擎持久化,它们将永远不会再次运行。
获取工作流的结果:
# configure the storage with "ray.init" or "ray start --head --storage=<STORAGE_URI>"
# A default temporary storage is used by by the workflow if starting without
# Ray init.
ray.init(storage="/tmp/data")
assert workflow.run(dag, workflow_id="run_1") == 101
assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL
assert workflow.get_output("run_1") == 101
# workflow.get_output_async returns an ObjectRef.
assert ray.get(workflow.get_output_async("run_1")) == 101
对象#
工作流通过允许 Ray 对象引用传递到任务中并从任务中返回,与 Ray 对象无缝集成。对象在最初从任务返回时会被检查点化。检查点化后,对象可以通过 Ray 对象存储在内存速度下被任意数量的工作流任务共享。
在工作流中使用 Ray 对象:
import ray
from typing import List
@ray.remote
def hello():
return "hello"
@ray.remote
def words() -> List[ray.ObjectRef]:
# NOTE: Here it is ".remote()" instead of ".bind()", so
# it creates an ObjectRef instead of a DAG.
return [hello.remote(), ray.put("world")]
@ray.remote
def concat(words: List[ray.ObjectRef]) -> str:
return " ".join([ray.get(w) for w in words])
assert workflow.run(concat.bind(words.bind())) == "hello world"
动态工作流#
工作流可以在运行时生成新任务。这是通过返回DAG的延续来实现的。延续是由函数返回并在其返回后执行的内容。延续功能使得在工作流中可以进行嵌套、循环和递归。
斐波那契递归工作流:
@ray.remote
def add(a: int, b: int) -> int:
return a + b
@ray.remote
def fib(n: int) -> int:
if n <= 1:
return n
# return a continuation of a DAG
return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))
assert workflow.run(fib.bind(10)) == 55
事件#
事件是发送到工作流的外部信号。通过事件系统,工作流可以被计时器或外部事件高效地触发。
import time
# Sleep is a special type of event.
sleep_task = workflow.sleep(1)
# `wait_for_events` allows for pluggable event listeners.
event_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)
@ray.remote
def gather(*args):
return args
# If a task's arguments include events, the task won't be executed until all
# of the events have occurred.
workflow.run(gather.bind(sleep_task, event_task, "hello world"))