数据框:读取混乱的数据
内容
实时笔记本
您可以在 live session 中运行此笔记本,或查看 on Github。
数据框:读取混乱的数据¶
在 01-data-access 示例中,我们展示了 Dask Dataframes 如何以与 Pandas Dataframes 相同的多种格式读取和存储数据。使用 Dask Dataframes 时的一个关键区别是,我们通常不是用 pandas.read_csv 这样的函数打开单个文件,而是用 dask.dataframe.read_csv 一次打开多个文件。这使我们能够将一组文件视为单个数据集。大多数情况下,这非常有效。但真实数据是混乱的,在本笔记本中,我们将探讨一种更高级的技术,将混乱的数据集引入 Dask Dataframe。
启动 Dask 客户端以使用仪表板¶
启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。
当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者非常有用。
[ ]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client
创建人工数据集¶
首先,我们创建一个人工数据集并将其写入多个CSV文件。
你不需要理解这一部分,我们只是为笔记本的其余部分创建一个数据集。
[ ]:
import dask
df = dask.datasets.timeseries()
df
[ ]:
import os
import datetime
if not os.path.exists('data'):
os.mkdir('data')
def name(i):
""" Provide date for filename given index
Examples
--------
>>> name(0)
'2000-01-01'
>>> name(10)
'2000-01-11'
"""
return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))
df.to_csv('data/*.csv', name_function=name, index=False);
读取 CSV 文件¶
我们现在在数据目录中有许多CSV文件,每个文件对应2000年1月份的一天。每个CSV文件保存了当天的时序数据。我们可以使用带有glob字符串的``dd.read_csv``函数将它们全部读取为一个逻辑数据框。
[ ]:
!ls data/*.csv | head
[ ]:
import dask.dataframe as dd
df = dd.read_csv('data/2000-*-*.csv')
df
[ ]:
df.head()
让我们来看看数据的一些统计信息
[ ]:
df.describe().compute()
生成一些混乱的数据¶
现在这工作得很好,在大多数情况下,dd.read_csv 或 dd.read_parquet 等是首选的将大量数据文件读入 dask 数据帧的方式,但现实世界的数据通常非常混乱,一些文件可能已损坏或格式不正确。为了模拟这种情况,我们将通过调整示例 csv 文件来创建一些假乱数据。对于文件 data/2000-01-05.csv
,我们将替换为无数据,对于文件 data/2000-01-07.csv
,我们将删除 y
列。
[ ]:
# corrupt the data in data/2000-01-05.csv
with open('data/2000-01-05.csv', 'w') as f:
f.write('')
[ ]:
# remove y column from data/2000-01-07.csv
import pandas as pd
df = pd.read_csv('data/2000-01-07.csv')
del df['y']
df.to_csv('data/2000-01-07.csv', index=False)
[ ]:
!head data/2000-01-05.csv
[ ]:
!head data/2000-01-07.csv
阅读混乱的数据¶
让我们再次尝试读取文件集合
[ ]:
df = dd.read_csv('data/2000-*-*.csv')
[ ]:
df.head()
好的,看起来它起作用了,让我们再次计算数据集的统计信息
[ ]:
df.describe().compute()
那么发生了什么?
当从一系列文件创建 dask 数据帧时,dd.read_csv 会从数据集中的前几个文件中采样以确定数据类型和可用列。由于它没有打开所有文件,因此不知道其中一些文件是否损坏。因此,df.head()
可以工作,因为它只查看第一个文件。df.describe.compute()
失败是因为 data/2000-01-05.csv
中的数据损坏。
构建一个延迟阅读器¶
为了解决这个问题,我们将使用一种更高级的技术来构建我们的 dask 数据框。这种方法也可以在读取每个文件时需要一些自定义逻辑时使用。本质上,我们将构建一个使用 pandas 和一些错误检查并返回 pandas 数据框的函数。如果我们发现一个坏的数据文件,我们将设法修复/清理数据,或者我们将返回一个与良好数据结构相同的空 pandas 数据框。
[ ]:
import numpy as np
import io
def read_data(filename):
# for this to work we need to explicitly set the datatypes of our pandas dataframe
dtypes = {'id': int, 'name': str, 'x': float, 'y': float}
try:
# try reading in the data with pandas
df = pd.read_csv(filename, dtype=dtypes)
except:
# if this fails create an empty pandas dataframe with the same dtypes as the good data
df = pd.read_csv(io.StringIO(''), names=dtypes.keys(), dtype=dtypes)
# for the case with the missing column, add a column of data with NaN's
if 'y' not in df.columns:
df['y'] = np.NaN
return df
让我们在一个好的文件和两个坏的文件上测试这个功能
[ ]:
# test function on a normal file
read_data('data/2000-01-01.csv').head()
[ ]:
# test function on the empty file
read_data('data/2000-01-05.csv').head()
[ ]:
# test function on the file missing the y column
read_data('data/2000-01-07.csv').head()
组装 dask 数据帧¶
首先,我们将 read_data
函数转换为 dask 延迟函数
[ ]:
from dask import delayed
read_data = delayed(read_data)
让我们看看这个函数现在做了什么
[ ]:
df = read_data('data/2000-01-01.csv')
df
它创建了一个延迟对象,要实际运行读取文件,我们需要运行 .compute()
[ ]:
df.compute()
现在让我们列出所有可用的csv文件
[ ]:
# loop over all the files
from glob import glob
files = glob('data/2000-*-*.csv')
files
现在我们在列表中的每个文件上运行延迟的 read_data 函数
[ ]:
df = [read_data(file) for file in files]
df
然后我们使用 dask.dataframe.from_delayed。这个函数从一个延迟对象列表创建一个 Dask DataFrame,只要每个延迟对象返回一个 pandas DataFrame。每个返回的单个 DataFrame 的结构也必须相同。
[ ]:
df = dd.from_delayed(df, meta={'id': int, 'name': str, 'x': float, 'y': float})
df
注意:我们在 meta
关键字中提供了 dtypes,以明确告诉 Dask Dataframe 期望的数据框类型。如果我们不这样做,Dask 将从第一个延迟对象中推断出这一点,如果它是一个大型 csv 文件,这可能会很慢。
现在让我们看看这是否有效¶
[ ]:
df.head()
[ ]:
df.describe().compute()
成功!¶
回顾一下,在这个例子中,我们探讨了从大量数据文件集合创建Dask Dataframe的方法。通常你会使用内置函数如 dd.read_csv
或 dd.read_parquet
来完成这个任务。有时,由于数据集中存在混乱/损坏的文件或需要进行一些自定义处理,这种方法可能不可行。
在这些情况下,您可以按照以下步骤构建一个 Dask DataFrame。
创建一个常规的 Python 函数,该函数读取数据,执行任何转换、错误检查等,并始终返回具有相同结构的 Pandas 数据框。
使用
dask.delayed
函数将此读取函数转换为延迟对象使用延迟数据读取器调用数据集中的每个文件,并将输出组装为延迟对象的列表
使用
dd.from_delayed
将延迟对象列表转换为 Dask 数据帧
这种相同的技术也可以用于其他情况。另一个例子可能是需要使用专用读取器的数据文件,或者在转换为 pandas 数据框之前需要进行多次转换的数据文件。