使用 Ray Core 的简单 MapReduce 示例#
这个示例演示了如何使用 Ray 进行一个常见的分布式计算示例——在多个文档中计算单词的出现次数。复杂性在于处理大量文本语料库,这需要多个计算节点来处理数据。 使用 Ray 实现 MapReduce 的简便性是分布式计算中的一个重要里程碑。 许多流行的大数据技术,例如 Hadoop,都是基于这一编程模型构建的,这凸显了使用 Ray Core 的影响。
MapReduce 方法分为三个阶段:
映射阶段
映射阶段应用指定的函数来转换或_映射_数据集中的元素。它生成键值对:键代表一个元素,而值是为该元素计算的指标。
为了计算文档中每个单词出现的次数,映射函数每次单词出现时输出对(word, 1)
,以表明该单词已被找到一次。洗牌阶段
洗牌阶段收集所有来自映射阶段的输出,并按键组织它们。当在多个计算节点找到相同的键时,此阶段包括在不同节点之间转移或_洗牌_数据。
如果映射阶段产生了四次(word, 1)
的出现,洗牌阶段会将该单词的所有出现放在同一个节点上。归约阶段
归约阶段聚合来自洗牌阶段的元素。
每个单词出现的总计数是每个节点上出现次数的总和。
例如,四个(word, 1)
实例组合后最终计数为word: 4
。
第一个和最后一个阶段在MapReduce名称中,但中间阶段同样至关重要。 这些阶段看起来很简单,但它们的强大之处在于可以在多台机器上并行运行。 这个图示展示了一组文档中的三个MapReduce阶段:
加载数据#
我们使用Python实现MapReduce算法来进行词频统计,并使用Ray来并行计算。
我们首先从《Python之禅》中加载一些示例数据,而《Python之禅》是Python社区的一部编码指导方针的集合。根据复活节彩蛋的传统,访问《Python之禅》的方法是在Python会话中输入import this
。
我们将《Python之禅》划分为三个独立的“文档”,通过将每一行视为一个单独的实体,然后将这些行分成三个区块。
import subprocess
zen_of_python = subprocess.check_output(["python", "-c", "import this"])
corpus = zen_of_python.split()
num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [
corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]
数据映射#
为了确定映射阶段,我们需要一个映射函数来作用于每个文档。
输出是每个在文档中找到的单词对应的对 (word, 1)
。
对于以Python字符串加载的基本文本文档,过程如下:
def map_function(document):
for word in document.lower().split():
yield word, 1
我们通过使用 @ray.remote
装饰器将其标记为在 Ray 中的任务,使用 apply_map
函数处理大量文档集合。当我们调用 apply_map
时,我们将其应用于三组文档数据(num_partitions=3
)。apply_map
函数返回三个列表,每个分区一个,以便 Ray 可以重新排列映射阶段的结果并将其分发到适当的节点。
import ray
@ray.remote
def apply_map(corpus, num_partitions=3):
map_results = [list() for _ in range(num_partitions)]
for document in corpus:
for result in map_function(document):
first_letter = result[0].decode("utf-8")[0]
word_index = ord(first_letter) % num_partitions
map_results[word_index].append(result)
return map_results
对于可以存储在单台机器上的文本语料库,映射阶段并不是必要的。然而,当数据需要分布在多个节点时,映射阶段是有用的。为了并行地将映射阶段应用于语料库,我们使用对 apply_map
的远程调用,类似于之前的示例。主要的区别在于,我们希望返回三个结果(每个分区一个),这可以通过 num_returns
参数来实现。
map_results = [
apply_map.options(num_returns=num_partitions)
.remote(data, num_partitions)
for data in partitions
]
for i in range(num_partitions):
mapper_results = ray.get(map_results[i])
for j, result in enumerate(mapper_results):
print(f"Mapper {i}, return value {j}: {result[:2]}")
Mapper 0, return value 0: [(b'of', 1), (b'is', 1)]
Mapper 0, return value 1: [(b'python,', 1), (b'peters', 1)]
Mapper 0, return value 2: [(b'the', 1), (b'zen', 1)]
Mapper 1, return value 0: [(b'unless', 1), (b'in', 1)]
Mapper 1, return value 1: [(b'although', 1), (b'practicality', 1)]
Mapper 1, return value 2: [(b'beats', 1), (b'errors', 1)]
Mapper 2, return value 0: [(b'is', 1), (b'is', 1)]
Mapper 2, return value 1: [(b'although', 1), (b'a', 1)]
Mapper 2, return value 2: [(b'better', 1), (b'than', 1)]
此示例演示了如何使用 ray.get
收集驱动程序的数据。要在映射阶段之后继续执行另一个任务,您不会这样做。以下部分展示了如何高效地将所有阶段一起运行。
洗牌和数据缩减#
缩减阶段的目标是将所有来自 j
-th 返回值的对转移到同一个节点。在缩减阶段,我们创建一个字典,以汇总每个分区中所有单词的出现次数:
@ray.remote
def apply_reduce(*results):
reduce_results = dict()
for res in results:
for key, value in res:
if key not in reduce_results:
reduce_results[key] = 0
reduce_results[key] += value
return reduce_results
我们可以从每个映射器中获取第 j 个返回值,并使用以下方法将其发送到第 j 个归约器。请注意,此代码适用于无法在一台机器上容纳的大型数据集,因为我们使用 Ray 对象传递数据的引用,而不是实际数据。映射和归约阶段都可以在任何 Ray 集群上运行,并且 Ray 处理数据的洗牌。
outputs = []
for i in range(num_partitions):
outputs.append(
apply_reduce.remote(*[partition[i] for partition in map_results])
)
counts = {k: v for output in ray.get(outputs) for k, v in output.items()}
sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)
for count in sorted_counts:
print(f"{count[0].decode('utf-8')}: {count[1]}")
is: 10
better: 8
than: 8
the: 6
to: 5
of: 3
although: 3
be: 3
unless: 2
one: 2
if: 2
implementation: 2
idea.: 2
special: 2
should: 2
do: 2
may: 2
a: 2
never: 2
way: 2
explain,: 2
ugly.: 1
implicit.: 1
complex.: 1
complex: 1
complicated.: 1
flat: 1
readability: 1
counts.: 1
cases: 1
rules.: 1
in: 1
face: 1
refuse: 1
one--: 1
only: 1
--obvious: 1
it.: 1
obvious: 1
first: 1
often: 1
*right*: 1
it's: 1
it: 1
idea: 1
--: 1
let's: 1
python,: 1
peters: 1
simple: 1
sparse: 1
dense.: 1
aren't: 1
practicality: 1
purity.: 1
pass: 1
silently.: 1
silenced.: 1
ambiguity,: 1
guess.: 1
and: 1
preferably: 1
at: 1
you're: 1
dutch.: 1
good: 1
are: 1
great: 1
more: 1
zen: 1
by: 1
tim: 1
beautiful: 1
explicit: 1
nested.: 1
enough: 1
break: 1
beats: 1
errors: 1
explicitly: 1
temptation: 1
there: 1
that: 1
not: 1
now: 1
never.: 1
now.: 1
hard: 1
bad: 1
easy: 1
namespaces: 1
honking: 1
those!: 1
为了深入理解如何使用Ray在多个节点之间扩展MapReduce任务,包括内存管理,请阅读关于该主题的博客文章。
总结#
这个MapReduce示例展示了Ray编程模型的灵活性。一个生产级的MapReduce实现需要更多的努力,但能够_快速_重现像这样的常见算法是非常重要的。在MapReduce的早期,约在2010年,这种范式往往是表达工作负载的唯一模型。使用Ray,任何中级Python程序员都可以访问一系列有趣的分布式计算模式。
要了解更多关于Ray以及Ray Core的信息,请参见Ray Core示例库,或我们用例库中的机器学习工作负载。这个MapReduce示例可以在”学习Ray”中找到,其中包含了更多类似的示例。