实时笔记本

您可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

Dask 包

Dask Bag 实现了对 Python 对象集合的操作,如 mapfiltergroupby 和聚合。它通过使用 Python 迭代器在并行和小内存中完成这些操作。它类似于 itertools 的并行版本,或者是 PySpark RDD 的 Pythonic 版本。

Dask Bags 常用于对日志文件、JSON记录或其他用户定义的Python对象进行简单的预处理。

完整的API文档可在此处获取:http://docs.dask.org/en/latest/bag-api.html

启动 Dask 客户端以使用仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。

当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者非常有用。

[ ]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client

创建随机数据

我们创建一组随机的记录数据,并将其作为多个 JSON 文件存储到磁盘上。这将作为本笔记本的数据。

[ ]:
import dask
import json
import os

os.makedirs('data', exist_ok=True)              # Create data/ directory

b = dask.datasets.make_people()                 # Make records of people
b.map(json.dumps).to_textfiles('data/*.json')   # Encode as JSON, write to disk

读取 JSON 数据

现在我们有了一些JSON数据在文件中,让我们用Dask Bag和Python JSON模块来看看它。

[ ]:
!head -n 2 data/0.json
[ ]:
import dask.bag as db
import json

b = db.read_text('data/*.json').map(json.loads)
b
[ ]:
b.take(2)

映射, 过滤, 聚合

我们可以通过过滤出仅感兴趣的某些记录、对其应用函数来处理数据,并将这些结果汇总为一个总值来处理这些数据。

[ ]:
b.filter(lambda record: record['age'] > 30).take(2)  # Select only people over 30
[ ]:
b.map(lambda record: record['occupation']).take(2)  # Select the occupation field
[ ]:
b.count().compute()  # Count total number of records

链式计算

通常在一个管道中完成这些步骤的许多操作,只在最后调用 computetake

[ ]:
result = (b.filter(lambda record: record['age'] > 30)
           .map(lambda record: record['occupation'])
           .frequencies(sort=True)
           .topk(10, key=1))
result

与所有懒惰的 Dask 集合一样,我们需要调用 compute 来实际评估我们的结果。前面示例中使用的 take 方法也类似于 compute,并且也会触发计算。

[ ]:
result.compute()

转换与存储

有时我们希望如上所述计算聚合,但有时我们希望将结果存储到磁盘以供将来分析。为此,我们可以使用 to_textfilesjson.dumps 等方法,或者我们可以转换为 Dask Dataframes 并使用它们的存储系统,我们将在下一节中看到更多相关内容。

[ ]:
(b.filter(lambda record: record['age'] > 30)  # Select records of interest
  .map(json.dumps)                            # Convert Python objects to text
  .to_textfiles('data/processed.*.json'))     # Write to local disk

转换为 Dask 数据框

Dask Bags 适用于读取初始数据,进行一些预处理,然后将其传递给其他更高效的形式,如 Dask Dataframes。Dask Dataframes 内部使用 Pandas,因此在数值数据上可以更快,并且具有更复杂的算法。

然而,Dask Dataframes 也期望数据以扁平列的形式组织。它对嵌套的JSON数据支持不太好(Bag在这方面更合适)。

在这里,我们创建一个函数来展平我们的嵌套数据结构,将其映射到我们的记录中,然后将其转换为Dask数据帧。

[ ]:
b.take(1)
[ ]:
def flatten(record):
    return {
        'age': record['age'],
        'occupation': record['occupation'],
        'telephone': record['telephone'],
        'credit-card-number': record['credit-card']['number'],
        'credit-card-expiration': record['credit-card']['expiration-date'],
        'name': ' '.join(record['name']),
        'street-address': record['address']['address'],
        'city': record['address']['city']
    }

b.map(flatten).take(1)
[ ]:
df = b.map(flatten).to_dataframe()
df.head()

我们现在可以执行与之前相同的计算,但现在使用的是 Pandas 和 Dask 数据帧。

[ ]:
df[df.age > 30].occupation.value_counts().nlargest(10).compute()

了解更多

您可能对以下链接感兴趣: