HDFS 中的字数统计

HDFS 中的字数统计

设置

在这个例子中,我们将使用 distributedhdfs3 库来计算存储在 HDFS 中的文本文件(Enron 电子邮件数据集,6.4 GB)中的单词数量。

将文本数据从 Amazon S3 复制到集群上的 HDFS:

$ hadoop distcp s3n://AWS_SECRET_ID:AWS_SECRET_KEY@blaze-data/enron-email hdfs:///tmp/enron

其中 AWS_SECRET_IDAWS_SECRET_KEY 是有效的 AWS 凭证。

在集群上启动 distributed 调度器和工作者。

代码示例

导入 distributedhdfs3 以及本示例中使用的其他标准库:

>>> import hdfs3
>>> from collections import defaultdict, Counter
>>> from distributed import Client, progress

初始化与HDFS的连接,将 NAMENODE_HOSTNAMENAMENODE_PORT 替换为HDFS namenode的主机名和端口(默认:8020)。

>>> hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)

初始化与 distributed 客户端的连接,将 SCHEDULER_IPSCHEDULER_PORT 替换为 distributed 调度器的 IP 地址和端口。

>>> client = Client('SCHEDULER_IP:SCHEDULER_PORT')

从HDFS中的文本数据生成文件名列表:

>>> filenames = hdfs.glob('/tmp/enron/*/*')
>>> print(filenames[:5])

['/tmp/enron/edrm-enron-v2_nemec-g_xml.zip/merged.txt',
 '/tmp/enron/edrm-enron-v2_ring-r_xml.zip/merged.txt',
 '/tmp/enron/edrm-enron-v2_bailey-s_xml.zip/merged.txt',
 '/tmp/enron/edrm-enron-v2_fischer-m_xml.zip/merged.txt',
 '/tmp/enron/edrm-enron-v2_geaccone-t_xml.zip/merged.txt']

打印第一个文本文件的前1024字节:

>>> print(hdfs.head(filenames[0]))

b'Date: Wed, 29 Nov 2000 09:33:00 -0800 (PST)\r\nFrom: Xochitl-Alexis Velasc
o\r\nTo: Mark Knippa, Mike D Smith, Gerald Nemec, Dave S Laipple, Bo Barnwel
l\r\nCc: Melissa Jones, Iris Waser, Pat Radford, Bonnie Shumaker\r\nSubject:
 Finalize ECS/EES Master Agreement\r\nX-SDOC: 161476\r\nX-ZLID: zl-edrm-enro
n-v2-nemec-g-2802.eml\r\n\r\nPlease plan to attend a meeting to finalize the
 ECS/EES  Master Agreement \r\ntomorrow 11/30/00 at 1:30 pm CST.\r\n\r\nI wi
ll email everyone tomorrow with location.\r\n\r\nDave-I will also email you
the call in number tomorrow.\r\n\r\nThanks\r\nXochitl\r\n\r\n***********\r\n
EDRM Enron Email Data Set has been produced in EML, PST and NSF format by ZL
 Technologies, Inc. This Data Set is licensed under a Creative Commons Attri
bution 3.0 United States License <http://creativecommons.org/licenses/by/3.0
/us/> . To provide attribution, please cite to "ZL Technologies, Inc. (http:
//www.zlti.com)."\r\n***********\r\nDate: Wed, 29 Nov 2000 09:40:00 -0800 (P
ST)\r\nFrom: Jill T Zivley\r\nTo: Robert Cook, Robert Crockett, John Handley
, Shawna'

创建一个函数来统计每个文件中的单词数:

>>> def count_words(fn):
...     word_counts = defaultdict(int)
...     with hdfs.open(fn) as f:
...         for line in f:
...             for word in line.split():
...                 word_counts[word] += 1
...     return word_counts

在使用分布式工作进程处理所有文本文件之前,让我们先在本地测试我们的函数,通过计算第一个文本文件中的单词数量:

>>> counts = count_words(filenames[0])
>>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

[(b'the', 144873),
 (b'of', 98122),
 (b'to', 97202),
 (b'and', 90575),
 (b'or', 60305),
 (b'in', 53869),
 (b'a', 43300),
 (b'any', 31632),
 (b'by', 31515),
 (b'is', 30055)]

我们可以执行与第一个文本文件中计数单词相同的操作,只不过我们将使用 client.submitdistributed 工作节点上执行计算:

>>> future = client.submit(count_words, filenames[0])
>>> counts = future.result()
>>> print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

[(b'the', 144873),
 (b'of', 98122),
 (b'to', 97202),
 (b'and', 90575),
 (b'or', 60305),
 (b'in', 53869),
 (b'a', 43300),
 (b'any', 31632),
 (b'by', 31515),
 (b'is', 30055)]

我们准备使用 distributed 工作器来计算所有文本文件中的单词数量。请注意,map 操作是非阻塞的,你可以在计算运行时继续在 Python shell/notebook 中工作。

>>> futures = client.map(count_words, filenames)

我们可以在处理所有文本文件的同时检查一些 futures 的状态:

>>> len(futures)

161

>>> futures[:5]

[<Future: status: finished, key: count_words-5114ab5911de1b071295999c9049e941>,
 <Future: status: pending, key: count_words-d9e0d9daf6a1eab4ca1f26033d2714e7>,
 <Future: status: pending, key: count_words-d2f365a2360a075519713e9380af45c5>,
 <Future: status: pending, key: count_words-bae65a245042325b4c77fc8dde1acf1e>,
 <Future: status: pending, key: count_words-03e82a9b707c7e36eab95f4feec1b173>]

>>> progress(futures)

[########################################] | 100% Completed |  3min  0.2s

futures 完成读取所有文本文件并统计单词后,结果将存在于每个工作节点上。此操作在一个包含三台工作机器的集群上运行大约需要3分钟,每台机器有4个核心和16 GB内存。

请注意,由于之前的计算在Python中受GIL(全局解释器锁)的限制,我们可以通过使用 --nworkers 4 选项启动 distributed 工作进程来加速计算。

要汇总所有文本文件的词频,我们需要从 distributed 工作节点收集一些信息。为了减少从工作节点收集的数据量,我们可以定义一个函数,该函数仅返回每个文本文件的前10,000个单词。

>>> def top_items(d):
...     items = sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:10000]
...     return dict(items)

然后我们可以将上一步的 futures 映射 到这个筛选函数。这是使用 futures 构建计算管道的一种便捷方式:

>>> futures2 = client.map(top_items, futures)

我们可以 gather 每个文本文件的结果删减字数数据到本地进程:

>>> results = client.gather(iter(futures2))

要汇总所有文本文件的字数,我们可以遍历 futures2 中的结果,并更新一个包含所有字数的本地字典。

>>> all_counts = Counter()
>>> for result in results:
...     all_counts.update(result)

最后,我们打印结果中单词的总数以及所有文本文件中出现频率最高的单词:

>>> print(len(all_counts))

8797842

>>> print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

[(b'0', 67218380),
 (b'the', 19586868),
 (b'-', 14123768),
 (b'to', 11893464),
 (b'N/A', 11814665),
 (b'of', 11724827),
 (b'and', 10253753),
 (b'in', 6684937),
 (b'a', 5470371),
 (b'or', 5227805)]

以下是此示例的完整Python脚本:

# word-count.py

import hdfs3
from collections import defaultdict, Counter
from distributed import Client
from distributed.diagnostics.progressbar import progress

hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT)
client = Client('SCHEDULER_IP:SCHEDULER:PORT')

filenames = hdfs.glob('/tmp/enron/*/*')
print(filenames[:5])
print(hdfs.head(filenames[0]))


def count_words(fn):
    word_counts = defaultdict(int)
    with hdfs.open(fn) as f:
        for line in f:
            for word in line.split():
                word_counts[word] += 1
    return word_counts

counts = count_words(filenames[0])
print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

future = client.submit(count_words, filenames[0])
counts = future.result()
print(sorted(counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])

futures = client.map(count_words, filenames)
len(futures)
futures[:5]
progress(futures)


def top_items(d):
    items = sorted(d.items(), key=lambda kv: kv[1], reverse=True)[:10000]
    return dict(items)

futures2 = client.map(top_items, futures)
results = client.gather(iter(futures2))

all_counts = Counter()
for result in results:
    all_counts.update(result)

print(len(all_counts))

print(sorted(all_counts.items(), key=lambda k_v: k_v[1], reverse=True)[:10])