关键概念#

备注

Workflows 是一个为 Ray 任务图提供强大持久性的库。如果你是 Ray 的新手,我们建议从 核心演练 开始。

DAG API#

通常,Ray 任务是立即执行的。为了提供持久性,Ray Workflows 使用惰性的 Ray DAG API 来分离任务 DAG 的定义和执行。

从 Ray 任务切换到 DAG API 很简单:只需将所有对 .remote(...) 的调用(返回对象引用)替换为对 .bind(...) 的调用(返回 DAG 节点)。Ray DAG 节点可以像普通的 Ray 任务一样组合。

然而,与 Ray 任务不同,你不能在 DAG 节点上调用 ray.get()ray.wait()。相反,需要 执行 DAG 以计算结果。

将函数组合成一个DAG:

import ray

@ray.remote
def one() -> int:
    return 1

@ray.remote
def add(a: int, b: int) -> int:
    return a + b

dag = add.bind(100, one.bind())

工作流执行#

要使用工作流执行DAG,请使用 workflow.run

from ray import workflow

# Run the workflow until it completes and returns the output
assert workflow.run(dag) == 101

# Or you can run it asynchronously and fetch the output via 'ray.get'
output_ref = workflow.run_async(dag)
assert ray.get(output_ref) == 101

一旦启动,工作流的执行将被持久化记录到存储中。在系统故障时,工作流可以在任何有权访问该存储的 Ray 集群上恢复。

在执行工作流DAG时,工作流任务在失败时会重试,但一旦它们成功完成并且结果被工作流引擎持久化,它们将永远不会再次运行。

获取工作流的结果:

# configure the storage with "ray.init" or "ray start --head --storage=<STORAGE_URI>"
# A default temporary storage is used by by the workflow if starting without
# Ray init.
ray.init(storage="/tmp/data")
assert workflow.run(dag, workflow_id="run_1") == 101
assert workflow.get_status("run_1") == workflow.WorkflowStatus.SUCCESSFUL
assert workflow.get_output("run_1") == 101
# workflow.get_output_async returns an ObjectRef.
assert ray.get(workflow.get_output_async("run_1")) == 101

对象#

工作流通过允许 Ray 对象引用传递到任务中并从任务中返回,与 Ray 对象无缝集成。对象在最初从任务返回时会被检查点化。检查点化后,对象可以通过 Ray 对象存储在内存速度下被任意数量的工作流任务共享。

在工作流中使用 Ray 对象:

import ray
from typing import List

@ray.remote
def hello():
    return "hello"

@ray.remote
def words() -> List[ray.ObjectRef]:
    # NOTE: Here it is ".remote()" instead of ".bind()", so
    # it creates an ObjectRef instead of a DAG.
    return [hello.remote(), ray.put("world")]

@ray.remote
def concat(words: List[ray.ObjectRef]) -> str:
    return " ".join([ray.get(w) for w in words])

assert workflow.run(concat.bind(words.bind())) == "hello world"

动态工作流#

工作流可以在运行时生成新任务。这是通过返回DAG的延续来实现的。延续是由函数返回并在其返回后执行的内容。延续功能使得在工作流中可以进行嵌套、循环和递归。

斐波那契递归工作流:

@ray.remote
def add(a: int, b: int) -> int:
    return a + b

@ray.remote
def fib(n: int) -> int:
    if n <= 1:
        return n
    # return a continuation of a DAG
    return workflow.continuation(add.bind(fib.bind(n - 1), fib.bind(n - 2)))

assert workflow.run(fib.bind(10)) == 55

事件#

事件是发送到工作流的外部信号。通过事件系统,工作流可以被计时器或外部事件高效地触发。

import time

# Sleep is a special type of event.
sleep_task = workflow.sleep(1)

# `wait_for_events` allows for pluggable event listeners.
event_task = workflow.wait_for_event(workflow.event_listener.TimerListener, time.time() + 2)

@ray.remote
def gather(*args):
    return args

# If a task's arguments include events, the task won't be executed until all
# of the events have occurred.
workflow.run(gather.bind(sleep_task, event_task, "hello world"))