通过示例温和地介绍Ray Core#
实现一个函数在Ray Core中,以理解Ray的工作原理及其基本概念。无论是经验较少的Python程序员还是对高级任务感兴趣的人,都可以通过学习Ray Core API开始使用Python进行分布式计算。
安装Ray#
使用以下命令安装Ray:
! pip install ray
Ray Core#
通过运行以下命令启动本地集群:
import ray
ray.init()
注意输出中的以下行:
... 信息 services.py:1263 -- 可以在 http://127.0.0.1:8265 查看 Ray 仪表板
{'节点_IP_地址': '192.168.1.41',
...
'节点_ID': '...'}
这些消息表明Ray集群正常工作。在这个示例输出中,Ray仪表板的地址是http://127.0.0.1:8265
。请在输出的第一行地址访问Ray仪表板。Ray仪表板显示的信息包括可用的CPU核心数量以及当前Ray应用程序的总利用率。
这是笔记本电脑的典型输出:
{'CPU': 12.0,
'memory': 14203886388.0,
'node:127.0.0.1': 1.0,
'object_store_memory': 2147483648.0}
接下来是对Ray Core API的简要介绍,我们称之为Ray API。 Ray API建立在Python程序员熟悉的装饰器、函数和类等概念之上。 它是一个用于分布式计算的通用编程接口。 引擎处理复杂的工作,使开发人员能够在现有的Python库和系统中使用Ray。
您的第一个 Ray API 示例#
以下函数从数据库中检索和处理数据。这个虚拟的 database
是一个普通的 Python 列表,包含了 “学习 Ray” 书籍 标题中的单词。sleep
函数暂停一段时间,以模拟访问和处理数据库中数据的成本。
import time
database = [
"Learning", "Ray",
"Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]
def retrieve(item):
time.sleep(item / 10.)
return item, database[item]
如果索引为5的项目需要花费半秒钟 (5 / 10.)
,那么顺序检索所有八个项目的总运行时间估计为 (0+1+2+3+4+5+6+7)/10. = 2.8
秒。
运行以下代码以获取实际时间:
def print_runtime(input_data, start_time):
print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
print(*input_data, sep="\n")
start = time.time()
data = [retrieve(item) for item in range(8)]
print_runtime(data, start)
Runtime: 2.82 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
在这个示例中,运行函数的总时间为2.82秒,但在您的计算机上可能会有所不同。 请注意,这个基本的Python版本无法同时运行该函数。
您可能会认为 Python 列表推导更高效。测得的运行时间为 2.8 秒,实际上是最坏的情况。尽管这个程序在大部分运行时间内“睡眠”,但由于全局解释器锁(GIL),它的速度很慢。
Ray 任务#
这个任务可以从并行化中受益。如果分布得非常完美,运行时间不应该比最慢的子任务多出太多,也就是说,7/10. = 0.7
秒。
要将这个示例扩展到在 Ray 上并行运行,首先使用 @ray.remote 装饰器:
import ray
@ray.remote
def retrieve_task(item):
return retrieve(item)
通过装饰器,函数 retrieve_task
成为一个 :ref:ray-remote-functions<Ray task>
_。
Ray 任务是一个函数,Ray 在与调用它的进程不同的进程中执行,并可能在不同的机器上执行。
Ray 的使用非常方便,因为您可以继续编写 Python 代码,而无需显著改变您的方法或编程风格。对检索函数使用 :func:ray.remote()<@ray.remote>
装饰器是装饰器的预期用法,并且您在这个示例中并没有修改原始代码。
要检索数据库条目并测量性能,您无需对代码做出很多更改。以下是该过程的概述:
start = time.time()
object_references = [
retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)
2022-12-20 13:52:34,632 INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
Runtime: 2.82 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
运行并行任务需要两个小的代码修改。
要远程执行您的Ray任务,您必须使用.remote()
调用。
Ray即使在本地集群上也会异步执行远程任务。
代码片段中的object_references
列表中的项不直接包含结果。
如果您检查第一个项的Python类型,使用type(object_references[0])
,
您会看到它实际上是一个ObjectRef
。
这些对象引用对应于需要请求结果的_未来_。
调用:func:
ray.get()<ray.get(…)>`是为了请求结果。每当您在Ray任务上调用远程时,
它立即返回一个或多个对象引用。
将Ray任务视为创建对象的主要方式。
以下部分是一个例子,连接多个任务在一起,并允许
Ray在它们之间传递和解析对象。
让我们回顾一下前面的步骤。
你从一个Python函数开始,然后用@ray.remote
修饰它,使这个函数成为一个Ray任务。
你不是直接在代码中调用原始函数,而是对Ray任务调用了.remote(...)
。
最后,你使用.get(...)
从Ray集群中检索结果。
考虑将你自己的函数创建为一个Ray任务,作为额外的练习。
让我们来回顾一下使用Ray任务所带来的性能提升。 在大多数笔记本电脑上,运行时间约为0.71秒, 这稍微超过了最慢的子任务,后者为0.7秒。 您可以通过更多利用Ray的API来进一步改善程序。
对象存储#
检索定义直接从 database
访问项目。虽然这在本地 Ray 集群中运行良好,但在一个实际拥有多台计算机的集群中,它的功能如何呢?
Ray 集群有一个头节点,带有驱动程序进程,以及多个工作节点,工作进程执行任务。
在这种情况下,数据库仅在驱动程序上定义,但工作进程需要访问它以运行检索任务。
Ray 共享对象的解决方案是使用 ray.put
函数将数据放入 Ray 的分布式对象存储中。
在 retrieve_task
定义中,您可以添加一个 db
参数,以便稍后将其作为 db_object_ref
对象传递。
db_object_ref = ray.put(database)
@ray.remote
def retrieve_task(item, db):
time.sleep(item / 10.)
return item, db[item]
通过使用对象存储,您允许Ray在整个集群中管理数据访问。尽管对象存储涉及一些开销,但它对较大数据集的性能有提升。这一步对于真正的分布式环境至关重要。重新运行带有retrieve_task
函数的示例,以确认它的执行符合您的预期。
非阻塞调用#
在上一个部分中,您使用 ray.get(object_references)
来检索结果。
此调用会阻塞驱动程序进程,直到所有结果可用。
这种依赖关系可能会导致问题,特别是当每个数据库项处理需要几分钟的时间。
如果您允许驱动程序进程在等待结果时执行其他任务,并在结果完成时进行处理,而不是等待所有项目都完成,将能获得更多的效率提升。
此外,如果由于数据库连接中的死锁等问题无法检索到某个数据库项,驱动程序将会无限期挂起。
为了防止无限期挂起,在使用 wait
函数时设置合理的 timeout
值。
例如,如果您希望等待的时间少于最慢数据检索任务时间的十倍,可以使用 wait
函数在该时间过去后停止任务。
start = time.time()
object_references = [
retrieve_task.remote(item, db_object_ref) for item in range(8)
]
all_data = []
while len(object_references) > 0:
finished, object_references = ray.wait(
object_references, timeout=7.0
)
data = ray.get(finished)
print_runtime(data, start)
all_data.extend(data)
print_runtime(all_data, start)
Runtime: 0.11 seconds, data:
(0, 'Learning')
(1, 'Ray')
Runtime: 0.31 seconds, data:
(2, 'Flexible')
(3, 'Distributed')
Runtime: 0.51 seconds, data:
(4, 'Python')
(5, 'for')
Runtime: 0.71 seconds, data:
(6, 'Machine')
(7, 'Learning')
Runtime: 0.71 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')
不是直接打印结果,你可以在while
循环中使用检索到的值来启动其他工作者上的新任务。
任务依赖#
您可能想对检索到的数据执行额外的处理任务。例如,使用第一次检索任务的结果从同一数据库(可能来自不同的表)查询其他相关数据。下面的代码设置了这个后续任务,并按顺序执行 retrieve_task
和 follow_up_task
。
@ray.remote
def follow_up_task(retrieve_result):
original_item, _ = retrieve_result
follow_up_result = retrieve(original_item + 1)
return retrieve_result, follow_up_result
retrieve_refs = [retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]]
follow_up_refs = [follow_up_task.remote(ref) for ref in retrieve_refs]
result = [print(data) for data in ray.get(follow_up_refs)]
((0, 'Learning'), (1, 'Ray'))
((2, 'Flexible'), (3, 'Distributed'))
((4, 'Python'), (5, 'for'))
((6, 'Machine'), (7, 'Learning'))
如果你对异步编程不熟悉,这个例子可能不会特别令人印象深刻。然而,仔细一看,代码能够运行实际上可能令人惊讶。代码看起来像是一个普通的Python函数,带有一些列表推导式。
follow_up_task
函数体期望输入参数retrieve_result
是一个Python元组。然而,当你使用[follow_up_task.remote(ref) for ref in retrieve_refs]
命令时,你并没有将元组传递给后续任务。相反,你正使用retrieve_refs
来传入Ray对象引用。
在幕后,Ray识别出follow_up_task
需要实际的值,因此它_自动_使用ray.get
函数来解析这些未来值。此外,Ray为所有任务创建了依赖图,并以一种尊重其依赖关系的方式执行它们。你不必显式告诉Ray何时等待先前的任务完成——它会推断执行顺序。Ray对象存储的这一特性非常有用,因为通过将对象引用传递给下一个任务,而让Ray处理其余部分,你可以避免将大型中间值复制回驱动程序。
接下来的步骤只有在专门设计用于检索信息的任务完成后才会安排。
实际上,如果 retrieve_refs
被称为 retrieve_result
,你可能不会注意到这个关键而有意的命名细微差别。Ray 使你能够专注于你的工作,而不是集群计算的技术细节。
这两个任务的依赖图如下所示:
Ray Actors#
此示例涵盖了 Ray Core 的另一个重要方面。
到目前为止,一切基本上都是一个函数。
您使用 @ray.remote
装饰器来使某些函数变为远程,但除此之外,您只使用了标准 Python。
如果您想跟踪数据库被查询的频率,可以计算检索任务的结果。然而,有没有更高效的方法呢?理想情况下,您希望以分布式的方式跟踪这一点,以处理大量数据。Ray 提供了一个解决方案,使用演员(actors),这些演员在集群上运行有状态的计算,并且可以相互通信。与使用装饰函数创建 Ray 任务类似,使用装饰的 Python 类创建 Ray 演员。因此,您可以使用 Ray 演员创建一个简单的计数器来跟踪数据库调用的次数。
@ray.remote
class DataTracker:
def __init__(self):
self._counts = 0
def increment(self):
self._counts += 1
def counts(self):
return self._counts
DataTracker类在你给它加上`ray.remote`装饰器时变成一个演员(actor)。这个演员能够跟踪状态,例如计数,其方法是Ray演员任务,你可以像调用函数一样使用`.remote()`来调用它。修改retrieve_task以结合这个演员。
@ray.remote
def retrieve_tracker_task(item, tracker, db):
time.sleep(item / 10.)
tracker.increment.remote()
return item, db[item]
tracker = DataTracker.remote()
object_references = [
retrieve_tracker_task.remote(item, tracker, db_object_ref) for item in range(8)
]
data = ray.get(object_references)
print(data)
print(ray.get(tracker.counts.remote()))
[(0, 'Learning'), (1, 'Ray'), (2, 'Flexible'), (3, 'Distributed'), (4, 'Python'), (5, 'for'), (6, 'Machine'), (7, 'Learning')]
8
如预期的那样,这个计算的结果是8。 虽然您不需要演员来执行这个计算,但这演示了一种在集群中保持状态的方法,这可能涉及多个任务。 实际上,您可以将演员传递到任何相关任务中,甚至传递到另一个演员的构造函数中。 Ray API 是灵活的,能够提供无限的可能性。 分布式 Python 工具允许状态计算的情况是比较少见的, 这对于运行复杂的分布式算法,如强化学习,特别有用。
摘要#
在这个例子中,您仅使用了六个API方法。
这些方法包括 ray.init()
用于初始化集群,@ray.remote
用于将函数和类转换为任务和演员,
ray.put()
用于将值传输到Ray的对象存储中,以及 ray.get()
用于从集群中检索对象。
此外,您还在演员方法或任务上使用了 .remote()
来在集群上执行代码,以及 ray.wait
来防止阻塞调用。
Ray API包含的不仅仅是这六个调用,但如果你刚开始,这六个调用是非常强大的。
更一般地总结一下,这些方法如下:
ray.init()
: 初始化您的 Ray 集群。传入一个地址以连接到现有集群。@ray.remote
: 将函数转换为任务,将类转换为演员。ray.put()
: 将值存入 Ray 的对象存储中。ray.get()
: 从对象存储中获取值。返回您放入的值或由任务或演员计算得到的值。.remote()
: 在您的 Ray 集群上运行演员方法或任务,并用于实例化演员。ray.wait()
: 返回两个对象引用的列表,一个是我们正在等待的已完成任务,另一个是尚未完成的任务。
想了解更多吗?#
这个例子是我们《学习 Ray》一书中 Ray Core 指南的简化版。如果你喜欢这个例子,可以查看 Ray Core 示例库 或我们 用例库 中的一些机器学习工作负载。