实时笔记

您可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

数据框:读取和写入数据

Dask 数据框可以读取和存储与 Pandas 数据框相同格式的许多数据。在这个例子中,我们使用流行的 CSV 和 Parquet 格式读取和写入数据,并讨论使用这些格式的最佳实践。

[ ]:
from IPython.display import YouTubeVideo

YouTubeVideo("0eEsIA0O1iE")

启动 Dask 客户端以获取仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。

当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。这可能需要一些努力来安排你的窗口,但在学习时同时看到两者是非常有用的。

[ ]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client

创建人工数据集

首先,我们创建一个人工数据集并将其写入多个CSV文件。

你不需要理解这一部分,我们只是为笔记本的其余部分创建一个数据集。

[ ]:
import dask
df = dask.datasets.timeseries()
df
[ ]:
import os
import datetime

if not os.path.exists('data'):
    os.mkdir('data')

def name(i):
    """ Provide date for filename given index

    Examples
    --------
    >>> name(0)
    '2000-01-01'
    >>> name(10)
    '2000-01-11'
    """
    return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))

df.to_csv('data/*.csv', name_function=name);

读取 CSV 文件

现在我们的数据目录中有许多CSV文件,每个文件对应2000年1月的每一天。每个CSV文件都保存了当天的时序数据。我们可以使用带有glob字符串的 dd.read_csv 函数将它们全部读取为一个逻辑数据框。

[ ]:
!ls data/*.csv | head
[ ]:
!head data/2000-01-01.csv
[ ]:
!head data/2000-01-30.csv

我们可以用 pandas.read_csv 读取一个文件,或者用 dask.dataframe.read_csv 读取多个文件。

[ ]:
import pandas as pd

df = pd.read_csv('data/2000-01-01.csv')
df.head()
[ ]:
import dask.dataframe as dd

df = dd.read_csv('data/2000-*-*.csv')
df
[ ]:
df.head()

调整 read_csv

Pandas 的 read_csv 函数有许多选项帮助你解析文件。Dask 版本在内部使用了 Pandas 函数,因此支持许多相同的选项。你可以使用 ? 操作符查看完整的文档字符串。

[ ]:
pd.read_csv?
[ ]:
dd.read_csv?

在这种情况下,我们使用 parse_dates 关键字将时间戳列解析为日期时间。这将使未来的操作更加高效。注意时间戳列的 dtype 已从 object 更改为 datetime64[ns]

[ ]:
df = dd.read_csv('data/2000-*-*.csv', parse_dates=['timestamp'])
df

进行简单的计算

每当我们操作数据框时,我们都会读取所有的CSV数据,以避免占用RAM。这对于内存使用非常高效,但每次读取所有CSV文件可能会很慢。

[ ]:
%time df.groupby('name').x.mean().compute()
[ ]:

写入 Parquet

相反,我们将数据存储在 Parquet 中,这是一种对计算机读写更高效的数据格式。

[ ]:
df.to_parquet('data/2000-01.parquet', engine='pyarrow')
[ ]:
!ls data/2000-01.parquet/

从 Parquet 读取

[ ]:
df = dd.read_parquet('data/2000-01.parquet', engine='pyarrow')
df
[ ]:
%time df.groupby('name').x.mean().compute()

只选择你计划使用的列

Parquet 是一个列存储格式,这意味着它可以高效地仅从数据集中提取几列。这很好,因为它有助于避免不必要的数据加载。

[ ]:
%%time
df = dd.read_parquet('data/2000-01.parquet', columns=['name', 'x'], engine='pyarrow')
df.groupby('name').x.mean().compute()

这里的差异并不大,但对于更大的数据集,这可以节省大量时间。

了解更多

http://docs.dask.org/en/latest/dataframe-create.html