通过示例温和地介绍Ray Core#

实现一个函数在Ray Core中,以理解Ray的工作原理及其基本概念。无论是经验较少的Python程序员还是对高级任务感兴趣的人,都可以通过学习Ray Core API开始使用Python进行分布式计算。

安装Ray#

使用以下命令安装Ray:

! pip install ray

Ray Core#

通过运行以下命令启动本地集群:

import ray
ray.init()

注意输出中的以下行:

... 信息 services.py:1263 -- 可以在 http://127.0.0.1:8265 查看 Ray 仪表板
{'节点_IP_地址': '192.168.1.41',
...
'节点_ID': '...'}

这些消息表明Ray集群正常工作。在这个示例输出中,Ray仪表板的地址是http://127.0.0.1:8265。请在输出的第一行地址访问Ray仪表板。Ray仪表板显示的信息包括可用的CPU核心数量以及当前Ray应用程序的总利用率。 这是笔记本电脑的典型输出:

{'CPU': 12.0,
'memory': 14203886388.0,
'node:127.0.0.1': 1.0,
'object_store_memory': 2147483648.0}

接下来是对Ray Core API的简要介绍,我们称之为Ray API。 Ray API建立在Python程序员熟悉的装饰器、函数和类等概念之上。 它是一个用于分布式计算的通用编程接口。 引擎处理复杂的工作,使开发人员能够在现有的Python库和系统中使用Ray。

您的第一个 Ray API 示例#

以下函数从数据库中检索和处理数据。这个虚拟的 database 是一个普通的 Python 列表,包含了 “学习 Ray” 书籍 标题中的单词。sleep 函数暂停一段时间,以模拟访问和处理数据库中数据的成本。

import time

database = [
    "Learning", "Ray",
    "Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]


def retrieve(item):
    time.sleep(item / 10.)
    return item, database[item]

如果索引为5的项目需要花费半秒钟 (5 / 10.),那么顺序检索所有八个项目的总运行时间估计为 (0+1+2+3+4+5+6+7)/10. = 2.8 秒。 运行以下代码以获取实际时间:

def print_runtime(input_data, start_time):
    print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
    print(*input_data, sep="\n")


start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)
Runtime: 2.82 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')

在这个示例中,运行函数的总时间为2.82秒,但在您的计算机上可能会有所不同。 请注意,这个基本的Python版本无法同时运行该函数。

您可能会认为 Python 列表推导更高效。测得的运行时间为 2.8 秒,实际上是最坏的情况。尽管这个程序在大部分运行时间内“睡眠”,但由于全局解释器锁(GIL),它的速度很慢。

Ray 任务#

这个任务可以从并行化中受益。如果分布得非常完美,运行时间不应该比最慢的子任务多出太多,也就是说,7/10. = 0.7 秒。 要将这个示例扩展到在 Ray 上并行运行,首先使用 @ray.remote 装饰器:

import ray 


@ray.remote
def retrieve_task(item):
    return retrieve(item)

通过装饰器,函数 retrieve_task 成为一个 :ref:ray-remote-functions<Ray task>_。 Ray 任务是一个函数,Ray 在与调用它的进程不同的进程中执行,并可能在不同的机器上执行。

Ray 的使用非常方便,因为您可以继续编写 Python 代码,而无需显著改变您的方法或编程风格。对检索函数使用 :func:ray.remote()<@ray.remote> 装饰器是装饰器的预期用法,并且您在这个示例中并没有修改原始代码。

要检索数据库条目并测量性能,您无需对代码做出很多更改。以下是该过程的概述:

start = time.time()
object_references = [
    retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)
2022-12-20 13:52:34,632	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
Runtime: 2.82 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')

运行并行任务需要两个小的代码修改。 要远程执行您的Ray任务,您必须使用.remote()调用。 Ray即使在本地集群上也会异步执行远程任务。 代码片段中的object_references列表中的项不直接包含结果。 如果您检查第一个项的Python类型,使用type(object_references[0]), 您会看到它实际上是一个ObjectRef。 这些对象引用对应于需要请求结果的_未来_。 调用:func:ray.get()<ray.get(…)>`是为了请求结果。每当您在Ray任务上调用远程时, 它立即返回一个或多个对象引用。 将Ray任务视为创建对象的主要方式。 以下部分是一个例子,连接多个任务在一起,并允许 Ray在它们之间传递和解析对象。

让我们回顾一下前面的步骤。 你从一个Python函数开始,然后用@ray.remote修饰它,使这个函数成为一个Ray任务。 你不是直接在代码中调用原始函数,而是对Ray任务调用了.remote(...)。 最后,你使用.get(...)从Ray集群中检索结果。 考虑将你自己的函数创建为一个Ray任务,作为额外的练习。

让我们来回顾一下使用Ray任务所带来的性能提升。 在大多数笔记本电脑上,运行时间约为0.71秒, 这稍微超过了最慢的子任务,后者为0.7秒。 您可以通过更多利用Ray的API来进一步改善程序。

对象存储#

检索定义直接从 database 访问项目。虽然这在本地 Ray 集群中运行良好,但在一个实际拥有多台计算机的集群中,它的功能如何呢? Ray 集群有一个头节点,带有驱动程序进程,以及多个工作节点,工作进程执行任务。 在这种情况下,数据库仅在驱动程序上定义,但工作进程需要访问它以运行检索任务。 Ray 共享对象的解决方案是使用 ray.put 函数将数据放入 Ray 的分布式对象存储中。 在 retrieve_task 定义中,您可以添加一个 db 参数,以便稍后将其作为 db_object_ref 对象传递。

db_object_ref = ray.put(database)


@ray.remote
def retrieve_task(item, db):
    time.sleep(item / 10.)
    return item, db[item]

通过使用对象存储,您允许Ray在整个集群中管理数据访问。尽管对象存储涉及一些开销,但它对较大数据集的性能有提升。这一步对于真正的分布式环境至关重要。重新运行带有retrieve_task函数的示例,以确认它的执行符合您的预期。

非阻塞调用#

在上一个部分中,您使用 ray.get(object_references) 来检索结果。 此调用会阻塞驱动程序进程,直到所有结果可用。 这种依赖关系可能会导致问题,特别是当每个数据库项处理需要几分钟的时间。 如果您允许驱动程序进程在等待结果时执行其他任务,并在结果完成时进行处理,而不是等待所有项目都完成,将能获得更多的效率提升。 此外,如果由于数据库连接中的死锁等问题无法检索到某个数据库项,驱动程序将会无限期挂起。 为了防止无限期挂起,在使用 wait 函数时设置合理的 timeout 值。 例如,如果您希望等待的时间少于最慢数据检索任务时间的十倍,可以使用 wait 函数在该时间过去后停止任务。

start = time.time()
object_references = [
    retrieve_task.remote(item, db_object_ref) for item in range(8)
]
all_data = []

while len(object_references) > 0:
    finished, object_references = ray.wait(
        object_references, timeout=7.0
    )
    data = ray.get(finished)
    print_runtime(data, start)
    all_data.extend(data)

print_runtime(all_data, start)
Runtime: 0.11 seconds, data:
(0, 'Learning')
(1, 'Ray')
Runtime: 0.31 seconds, data:
(2, 'Flexible')
(3, 'Distributed')
Runtime: 0.51 seconds, data:
(4, 'Python')
(5, 'for')
Runtime: 0.71 seconds, data:
(6, 'Machine')
(7, 'Learning')
Runtime: 0.71 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')

不是直接打印结果,你可以在while循环中使用检索到的值来启动其他工作者上的新任务。

任务依赖#

您可能想对检索到的数据执行额外的处理任务。例如,使用第一次检索任务的结果从同一数据库(可能来自不同的表)查询其他相关数据。下面的代码设置了这个后续任务,并按顺序执行 retrieve_taskfollow_up_task

@ray.remote
def follow_up_task(retrieve_result):
    original_item, _ = retrieve_result
    follow_up_result = retrieve(original_item + 1)
    return retrieve_result, follow_up_result


retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]

result = [print(data) for data in ray.get(follow_up_refs)]
((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))

如果你对异步编程不熟悉,这个例子可能不会特别令人印象深刻。然而,仔细一看,代码能够运行实际上可能令人惊讶。代码看起来像是一个普通的Python函数,带有一些列表推导式。

follow_up_task函数体期望输入参数retrieve_result是一个Python元组。然而,当你使用[follow_up_task.remote(ref) for ref in retrieve_refs]命令时,你并没有将元组传递给后续任务。相反,你正使用retrieve_refs来传入Ray对象引用。

在幕后,Ray识别出follow_up_task需要实际的值,因此它_自动_使用ray.get函数来解析这些未来值。此外,Ray为所有任务创建了依赖图,并以一种尊重其依赖关系的方式执行它们。你不必显式告诉Ray何时等待先前的任务完成——它会推断执行顺序。Ray对象存储的这一特性非常有用,因为通过将对象引用传递给下一个任务,而让Ray处理其余部分,你可以避免将大型中间值复制回驱动程序。

接下来的步骤只有在专门设计用于检索信息的任务完成后才会安排。
实际上,如果 retrieve_refs 被称为 retrieve_result,你可能不会注意到这个关键而有意的命名细微差别。Ray 使你能够专注于你的工作,而不是集群计算的技术细节。
这两个任务的依赖图如下所示:

任务依赖

Ray Actors#

此示例涵盖了 Ray Core 的另一个重要方面。 到目前为止,一切基本上都是一个函数。 您使用 @ray.remote 装饰器来使某些函数变为远程,但除此之外,您只使用了标准 Python。

如果您想跟踪数据库被查询的频率,可以计算检索任务的结果。然而,有没有更高效的方法呢?理想情况下,您希望以分布式的方式跟踪这一点,以处理大量数据。Ray 提供了一个解决方案,使用演员(actors),这些演员在集群上运行有状态的计算,并且可以相互通信。与使用装饰函数创建 Ray 任务类似,使用装饰的 Python 类创建 Ray 演员。因此,您可以使用 Ray 演员创建一个简单的计数器来跟踪数据库调用的次数。

@ray.remote
class DataTracker:
    def __init__(self):
        self._counts = 0

    def increment(self):
        self._counts += 1

    def counts(self):
        return self._counts
DataTracker类在你给它加上`ray.remote`装饰器时变成一个演员(actor)。这个演员能够跟踪状态,例如计数,其方法是Ray演员任务,你可以像调用函数一样使用`.remote()`来调用它。修改retrieve_task以结合这个演员。
@ray.remote
def retrieve_tracker_task(item, tracker, db):
    time.sleep(item / 10.)
    tracker.increment.remote()
    return item, db[item]


tracker = DataTracker.remote()

object_references = [
    retrieve_tracker_task.remote(item, tracker, db_object_ref) for item in range(8)
]
data = ray.get(object_references)

print(data)
print(ray.get(tracker.counts.remote()))
[(0, 'Learning'), (1, 'Ray'), (2, 'Flexible'), (3, 'Distributed'), (4, 'Python'), (5, 'for'), (6, 'Machine'), (7, 'Learning')]
8

如预期的那样,这个计算的结果是8。 虽然您不需要演员来执行这个计算,但这演示了一种在集群中保持状态的方法,这可能涉及多个任务。 实际上,您可以将演员传递到任何相关任务中,甚至传递到另一个演员的构造函数中。 Ray API 是灵活的,能够提供无限的可能性。 分布式 Python 工具允许状态计算的情况是比较少见的, 这对于运行复杂的分布式算法,如强化学习,特别有用。

摘要#

在这个例子中,您仅使用了六个API方法。 这些方法包括 ray.init() 用于初始化集群,@ray.remote 用于将函数和类转换为任务和演员, ray.put() 用于将值传输到Ray的对象存储中,以及 ray.get() 用于从集群中检索对象。 此外,您还在演员方法或任务上使用了 .remote() 来在集群上执行代码,以及 ray.wait 来防止阻塞调用。

Ray API包含的不仅仅是这六个调用,但如果你刚开始,这六个调用是非常强大的。
更一般地总结一下,这些方法如下:

  • ray.init(): 初始化您的 Ray 集群。传入一个地址以连接到现有集群。

  • @ray.remote: 将函数转换为任务,将类转换为演员。

  • ray.put(): 将值存入 Ray 的对象存储中。

  • ray.get(): 从对象存储中获取值。返回您放入的值或由任务或演员计算得到的值。

  • .remote(): 在您的 Ray 集群上运行演员方法或任务,并用于实例化演员。

  • ray.wait(): 返回两个对象引用的列表,一个是我们正在等待的已完成任务,另一个是尚未完成的任务。

想了解更多吗?#

这个例子是我们《学习 Ray》一书中 Ray Core 指南的简化版。如果你喜欢这个例子,可以查看 Ray Core 示例库 或我们 用例库 中的一些机器学习工作负载。