Dask 包
内容
实时笔记本
您可以在 live session 中运行此笔记本,或查看 Github 上的内容。
Dask 包¶
Dask Bag 实现了对 Python 对象集合的操作,如 map
、filter
、groupby
和聚合。它通过使用 Python 迭代器在并行和小内存中完成这些操作。它类似于 itertools 的并行版本,或者是 PySpark RDD 的 Pythonic 版本。
Dask Bags 常用于对日志文件、JSON记录或其他用户定义的Python对象进行简单的预处理。
完整的API文档可在此处获取:http://docs.dask.org/en/latest/bag-api.html
启动 Dask 客户端以使用仪表板¶
启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。
当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者非常有用。
[ ]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client
创建随机数据¶
我们创建一组随机的记录数据,并将其作为多个 JSON 文件存储到磁盘上。这将作为本笔记本的数据。
[ ]:
import dask
import json
import os
os.makedirs('data', exist_ok=True) # Create data/ directory
b = dask.datasets.make_people() # Make records of people
b.map(json.dumps).to_textfiles('data/*.json') # Encode as JSON, write to disk
读取 JSON 数据¶
现在我们有了一些JSON数据在文件中,让我们用Dask Bag和Python JSON模块来看看它。
[ ]:
!head -n 2 data/0.json
[ ]:
import dask.bag as db
import json
b = db.read_text('data/*.json').map(json.loads)
b
[ ]:
b.take(2)
映射, 过滤, 聚合¶
我们可以通过过滤出仅感兴趣的某些记录、对其应用函数来处理数据,并将这些结果汇总为一个总值来处理这些数据。
[ ]:
b.filter(lambda record: record['age'] > 30).take(2) # Select only people over 30
[ ]:
b.map(lambda record: record['occupation']).take(2) # Select the occupation field
[ ]:
b.count().compute() # Count total number of records
链式计算¶
通常在一个管道中完成这些步骤的许多操作,只在最后调用 compute
或 take
。
[ ]:
result = (b.filter(lambda record: record['age'] > 30)
.map(lambda record: record['occupation'])
.frequencies(sort=True)
.topk(10, key=1))
result
与所有懒惰的 Dask 集合一样,我们需要调用 compute
来实际评估我们的结果。前面示例中使用的 take
方法也类似于 compute
,并且也会触发计算。
[ ]:
result.compute()
转换与存储¶
有时我们希望如上所述计算聚合,但有时我们希望将结果存储到磁盘以供将来分析。为此,我们可以使用 to_textfiles
和 json.dumps
等方法,或者我们可以转换为 Dask Dataframes 并使用它们的存储系统,我们将在下一节中看到更多相关内容。
[ ]:
(b.filter(lambda record: record['age'] > 30) # Select records of interest
.map(json.dumps) # Convert Python objects to text
.to_textfiles('data/processed.*.json')) # Write to local disk
转换为 Dask 数据框¶
Dask Bags 适用于读取初始数据,进行一些预处理,然后将其传递给其他更高效的形式,如 Dask Dataframes。Dask Dataframes 内部使用 Pandas,因此在数值数据上可以更快,并且具有更复杂的算法。
然而,Dask Dataframes 也期望数据以扁平列的形式组织。它对嵌套的JSON数据支持不太好(Bag在这方面更合适)。
在这里,我们创建一个函数来展平我们的嵌套数据结构,将其映射到我们的记录中,然后将其转换为Dask数据帧。
[ ]:
b.take(1)
[ ]:
def flatten(record):
return {
'age': record['age'],
'occupation': record['occupation'],
'telephone': record['telephone'],
'credit-card-number': record['credit-card']['number'],
'credit-card-expiration': record['credit-card']['expiration-date'],
'name': ' '.join(record['name']),
'street-address': record['address']['address'],
'city': record['address']['city']
}
b.map(flatten).take(1)
[ ]:
df = b.map(flatten).to_dataframe()
df.head()
我们现在可以执行与之前相同的计算,但现在使用的是 Pandas 和 Dask 数据帧。
[ ]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()
了解更多¶
您可能对以下链接感兴趣:
dask 教程,笔记本 02,以获得更深入的介绍。