使用 Ray Core 的简单 MapReduce 示例#

这个示例演示了如何使用 Ray 进行一个常见的分布式计算示例——在多个文档中计算单词的出现次数。复杂性在于处理大量文本语料库,这需要多个计算节点来处理数据。 使用 Ray 实现 MapReduce 的简便性是分布式计算中的一个重要里程碑。 许多流行的大数据技术,例如 Hadoop,都是基于这一编程模型构建的,这凸显了使用 Ray Core 的影响。

MapReduce 方法分为三个阶段:

  1. 映射阶段
    映射阶段应用指定的函数来转换或_映射_数据集中的元素。它生成键值对:键代表一个元素,而值是为该元素计算的指标。
    为了计算文档中每个单词出现的次数,映射函数每次单词出现时输出对 (word, 1),以表明该单词已被找到一次。

  2. 洗牌阶段
    洗牌阶段收集所有来自映射阶段的输出,并按键组织它们。当在多个计算节点找到相同的键时,此阶段包括在不同节点之间转移或_洗牌_数据。
    如果映射阶段产生了四次 (word, 1) 的出现,洗牌阶段会将该单词的所有出现放在同一个节点上。

  3. 归约阶段
    归约阶段聚合来自洗牌阶段的元素。
    每个单词出现的总计数是每个节点上出现次数的总和。
    例如,四个 (word, 1) 实例组合后最终计数为 word: 4

第一个和最后一个阶段在MapReduce名称中,但中间阶段同样至关重要。 这些阶段看起来很简单,但它们的强大之处在于可以在多台机器上并行运行。 这个图示展示了一组文档中的三个MapReduce阶段:

简单的 Map Reduce

加载数据#

我们使用Python实现MapReduce算法来进行词频统计,并使用Ray来并行计算。 我们首先从《Python之禅》中加载一些示例数据,而《Python之禅》是Python社区的一部编码指导方针的集合。根据复活节彩蛋的传统,访问《Python之禅》的方法是在Python会话中输入import this。 我们将《Python之禅》划分为三个独立的“文档”,通过将每一行视为一个单独的实体,然后将这些行分成三个区块。

import subprocess
zen_of_python = subprocess.check_output(["python", "-c", "import this"])
corpus = zen_of_python.split()

num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [
    corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]

数据映射#

为了确定映射阶段,我们需要一个映射函数来作用于每个文档。 输出是每个在文档中找到的单词对应的对 (word, 1)。 对于以Python字符串加载的基本文本文档,过程如下:

def map_function(document):
    for word in document.lower().split():
        yield word, 1

我们通过使用 @ray.remote 装饰器将其标记为在 Ray 中的任务,使用 apply_map 函数处理大量文档集合。当我们调用 apply_map 时,我们将其应用于三组文档数据(num_partitions=3)。apply_map 函数返回三个列表,每个分区一个,以便 Ray 可以重新排列映射阶段的结果并将其分发到适当的节点。

import ray

@ray.remote
def apply_map(corpus, num_partitions=3):
    map_results = [list() for _ in range(num_partitions)]
    for document in corpus:
        for result in map_function(document):
            first_letter = result[0].decode("utf-8")[0]
            word_index = ord(first_letter) % num_partitions
            map_results[word_index].append(result)
    return map_results

对于可以存储在单台机器上的文本语料库,映射阶段并不是必要的。然而,当数据需要分布在多个节点时,映射阶段是有用的。为了并行地将映射阶段应用于语料库,我们使用对 apply_map 的远程调用,类似于之前的示例。主要的区别在于,我们希望返回三个结果(每个分区一个),这可以通过 num_returns 参数来实现。

map_results = [
    apply_map.options(num_returns=num_partitions)
    .remote(data, num_partitions)
    for data in partitions
]

for i in range(num_partitions):
    mapper_results = ray.get(map_results[i])
    for j, result in enumerate(mapper_results):
        print(f"Mapper {i}, return value {j}: {result[:2]}")
Mapper 0, return value 0: [(b'of', 1), (b'is', 1)]
Mapper 0, return value 1: [(b'python,', 1), (b'peters', 1)]
Mapper 0, return value 2: [(b'the', 1), (b'zen', 1)]
Mapper 1, return value 0: [(b'unless', 1), (b'in', 1)]
Mapper 1, return value 1: [(b'although', 1), (b'practicality', 1)]
Mapper 1, return value 2: [(b'beats', 1), (b'errors', 1)]
Mapper 2, return value 0: [(b'is', 1), (b'is', 1)]
Mapper 2, return value 1: [(b'although', 1), (b'a', 1)]
Mapper 2, return value 2: [(b'better', 1), (b'than', 1)]

此示例演示了如何使用 ray.get 收集驱动程序的数据。要在映射阶段之后继续执行另一个任务,您不会这样做。以下部分展示了如何高效地将所有阶段一起运行。

洗牌和数据缩减#

缩减阶段的目标是将所有来自 j-th 返回值的对转移到同一个节点。在缩减阶段,我们创建一个字典,以汇总每个分区中所有单词的出现次数:

@ray.remote
def apply_reduce(*results):
    reduce_results = dict()
    for res in results:
        for key, value in res:
            if key not in reduce_results:
                reduce_results[key] = 0
            reduce_results[key] += value

    return reduce_results

我们可以从每个映射器中获取第 j 个返回值,并使用以下方法将其发送到第 j 个归约器。请注意,此代码适用于无法在一台机器上容纳的大型数据集,因为我们使用 Ray 对象传递数据的引用,而不是实际数据。映射和归约阶段都可以在任何 Ray 集群上运行,并且 Ray 处理数据的洗牌。

outputs = []
for i in range(num_partitions):
    outputs.append(
        apply_reduce.remote(*[partition[i] for partition in map_results])
    )

counts = {k: v for output in ray.get(outputs) for k, v in output.items()}

sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)
for count in sorted_counts:
    print(f"{count[0].decode('utf-8')}: {count[1]}")
is: 10
better: 8
than: 8
the: 6
to: 5
of: 3
although: 3
be: 3
unless: 2
one: 2
if: 2
implementation: 2
idea.: 2
special: 2
should: 2
do: 2
may: 2
a: 2
never: 2
way: 2
explain,: 2
ugly.: 1
implicit.: 1
complex.: 1
complex: 1
complicated.: 1
flat: 1
readability: 1
counts.: 1
cases: 1
rules.: 1
in: 1
face: 1
refuse: 1
one--: 1
only: 1
--obvious: 1
it.: 1
obvious: 1
first: 1
often: 1
*right*: 1
it's: 1
it: 1
idea: 1
--: 1
let's: 1
python,: 1
peters: 1
simple: 1
sparse: 1
dense.: 1
aren't: 1
practicality: 1
purity.: 1
pass: 1
silently.: 1
silenced.: 1
ambiguity,: 1
guess.: 1
and: 1
preferably: 1
at: 1
you're: 1
dutch.: 1
good: 1
are: 1
great: 1
more: 1
zen: 1
by: 1
tim: 1
beautiful: 1
explicit: 1
nested.: 1
enough: 1
break: 1
beats: 1
errors: 1
explicitly: 1
temptation: 1
there: 1
that: 1
not: 1
now: 1
never.: 1
now.: 1
hard: 1
bad: 1
easy: 1
namespaces: 1
honking: 1
those!: 1

为了深入理解如何使用Ray在多个节点之间扩展MapReduce任务,包括内存管理,请阅读关于该主题的博客文章

总结#

这个MapReduce示例展示了Ray编程模型的灵活性。一个生产级的MapReduce实现需要更多的努力,但能够_快速_重现像这样的常见算法是非常重要的。在MapReduce的早期,约在2010年,这种范式往往是表达工作负载的唯一模型。使用Ray,任何中级Python程序员都可以访问一系列有趣的分布式计算模式。

要了解更多关于Ray以及Ray Core的信息,请参见Ray Core示例库,或我们用例库中的机器学习工作负载。这个MapReduce示例可以在”学习Ray”中找到,其中包含了更多类似的示例。