实时笔记本

您可以在 live session Binder 中运行此笔记本,或查看 on Github

数据框:读取混乱的数据

01-data-access 示例中,我们展示了 Dask Dataframes 如何以与 Pandas Dataframes 相同的多种格式读取和存储数据。使用 Dask Dataframes 时的一个关键区别是,我们通常不是用 pandas.read_csv 这样的函数打开单个文件,而是用 dask.dataframe.read_csv 一次打开多个文件。这使我们能够将一组文件视为单个数据集。大多数情况下,这非常有效。但真实数据是混乱的,在本笔记本中,我们将探讨一种更高级的技术,将混乱的数据集引入 Dask Dataframe。

启动 Dask 客户端以使用仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。

当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者非常有用。

[ ]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client

创建人工数据集

首先,我们创建一个人工数据集并将其写入多个CSV文件。

你不需要理解这一部分,我们只是为笔记本的其余部分创建一个数据集。

[ ]:
import dask
df = dask.datasets.timeseries()
df
[ ]:
import os
import datetime

if not os.path.exists('data'):
    os.mkdir('data')

def name(i):
    """ Provide date for filename given index

    Examples
    --------
    >>> name(0)
    '2000-01-01'
    >>> name(10)
    '2000-01-11'
    """
    return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))

df.to_csv('data/*.csv', name_function=name, index=False);

读取 CSV 文件

我们现在在数据目录中有许多CSV文件,每个文件对应2000年1月份的一天。每个CSV文件保存了当天的时序数据。我们可以使用带有glob字符串的``dd.read_csv``函数将它们全部读取为一个逻辑数据框。

[ ]:
!ls data/*.csv | head
[ ]:
import dask.dataframe as dd

df = dd.read_csv('data/2000-*-*.csv')
df
[ ]:
df.head()

让我们来看看数据的一些统计信息

[ ]:
df.describe().compute()

生成一些混乱的数据

现在这工作得很好,在大多数情况下,dd.read_csv 或 dd.read_parquet 等是首选的将大量数据文件读入 dask 数据帧的方式,但现实世界的数据通常非常混乱,一些文件可能已损坏或格式不正确。为了模拟这种情况,我们将通过调整示例 csv 文件来创建一些假乱数据。对于文件 data/2000-01-05.csv,我们将替换为无数据,对于文件 data/2000-01-07.csv,我们将删除 y 列。

[ ]:
# corrupt the data in data/2000-01-05.csv
with open('data/2000-01-05.csv', 'w') as f:
    f.write('')
[ ]:
# remove y column from data/2000-01-07.csv
import pandas as pd
df = pd.read_csv('data/2000-01-07.csv')
del df['y']
df.to_csv('data/2000-01-07.csv', index=False)
[ ]:
!head data/2000-01-05.csv
[ ]:
!head data/2000-01-07.csv

阅读混乱的数据

让我们再次尝试读取文件集合

[ ]:
df = dd.read_csv('data/2000-*-*.csv')
[ ]:
df.head()

好的,看起来它起作用了,让我们再次计算数据集的统计信息

[ ]:
df.describe().compute()

那么发生了什么?

当从一系列文件创建 dask 数据帧时,dd.read_csv 会从数据集中的前几个文件中采样以确定数据类型和可用列。由于它没有打开所有文件,因此不知道其中一些文件是否损坏。因此,df.head() 可以工作,因为它只查看第一个文件。df.describe.compute() 失败是因为 data/2000-01-05.csv 中的数据损坏。

构建一个延迟阅读器

为了解决这个问题,我们将使用一种更高级的技术来构建我们的 dask 数据框。这种方法也可以在读取每个文件时需要一些自定义逻辑时使用。本质上,我们将构建一个使用 pandas 和一些错误检查并返回 pandas 数据框的函数。如果我们发现一个坏的数据文件,我们将设法修复/清理数据,或者我们将返回一个与良好数据结构相同的空 pandas 数据框。

[ ]:
import numpy as np
import io

def read_data(filename):

    # for this to work we need to explicitly set the datatypes of our pandas dataframe
    dtypes = {'id': int, 'name': str, 'x': float, 'y': float}
    try:
        # try reading in the data with pandas
        df = pd.read_csv(filename, dtype=dtypes)
    except:
        # if this fails create an empty pandas dataframe with the same dtypes as the good data
        df = pd.read_csv(io.StringIO(''), names=dtypes.keys(), dtype=dtypes)

    # for the case with the missing column, add a column of data with NaN's
    if 'y' not in df.columns:
        df['y'] = np.NaN

    return df

让我们在一个好的文件和两个坏的文件上测试这个功能

[ ]:
# test function on a normal file
read_data('data/2000-01-01.csv').head()
[ ]:
# test function on the empty file
read_data('data/2000-01-05.csv').head()
[ ]:
# test function on the file missing the y column
read_data('data/2000-01-07.csv').head()

组装 dask 数据帧

首先,我们将 read_data 函数转换为 dask 延迟函数

[ ]:
from dask import delayed
read_data = delayed(read_data)

让我们看看这个函数现在做了什么

[ ]:
df = read_data('data/2000-01-01.csv')
df

它创建了一个延迟对象,要实际运行读取文件,我们需要运行 .compute()

[ ]:
df.compute()

现在让我们列出所有可用的csv文件

[ ]:
# loop over all the files
from glob import glob
files = glob('data/2000-*-*.csv')
files

现在我们在列表中的每个文件上运行延迟的 read_data 函数

[ ]:
df = [read_data(file) for file in files]
df

然后我们使用 dask.dataframe.from_delayed。这个函数从一个延迟对象列表创建一个 Dask DataFrame,只要每个延迟对象返回一个 pandas DataFrame。每个返回的单个 DataFrame 的结构也必须相同。

[ ]:
df = dd.from_delayed(df, meta={'id': int, 'name': str, 'x': float, 'y': float})
df

注意:我们在 meta 关键字中提供了 dtypes,以明确告诉 Dask Dataframe 期望的数据框类型。如果我们不这样做,Dask 将从第一个延迟对象中推断出这一点,如果它是一个大型 csv 文件,这可能会很慢。

现在让我们看看这是否有效

[ ]:
df.head()
[ ]:
df.describe().compute()

成功!

回顾一下,在这个例子中,我们探讨了从大量数据文件集合创建Dask Dataframe的方法。通常你会使用内置函数如 dd.read_csvdd.read_parquet 来完成这个任务。有时,由于数据集中存在混乱/损坏的文件或需要进行一些自定义处理,这种方法可能不可行。

在这些情况下,您可以按照以下步骤构建一个 Dask DataFrame。

  1. 创建一个常规的 Python 函数,该函数读取数据,执行任何转换、错误检查等,并始终返回具有相同结构的 Pandas 数据框。

  2. 使用 dask.delayed 函数将此读取函数转换为延迟对象

  3. 使用延迟数据读取器调用数据集中的每个文件,并将输出组装为延迟对象的列表

  4. 使用 dd.from_delayed 将延迟对象列表转换为 Dask 数据帧

这种相同的技术也可以用于其他情况。另一个例子可能是需要使用专用读取器的数据文件,或者在转换为 pandas 数据框之前需要进行多次转换的数据文件。