使用 Dask DataFrames 加载和保存数据

你可以从各种数据存储格式(如CSV、HDF、Apache Parquet等)创建Dask DataFrame。对于大多数格式,这些数据可以存在于各种存储系统中,包括本地磁盘、网络文件系统(NFS)、Hadoop分布式文件系统(HDFS)、Google云存储和Amazon S3(HDF除外,它仅在类POSIX文件系统上可用)。

有关 dask.dataframe 的范围、用途和限制的更多信息,请参阅 DataFrame 概述页面,以及 DataFrame 最佳实践 以获取更多提示和常见问题的解决方案。

API

以下函数提供了在 Dask DataFrame、文件格式以及其他 Dask 或 Python 集合之间进行转换的访问。

文件格式:

read_csv(urlpath[, blocksize, ...])

将 CSV 文件读入 Dask.DataFrame

read_parquet(path[, columns, filters, ...])

将 Parquet 文件读取到 Dask DataFrame 中

read_hdf(pattern, key[, start, stop, ...])

将 HDF 文件读取到 Dask DataFrame 中

read_orc(path[, engine, columns, index, ...])

从 ORC 文件中读取数据框

read_json(url_path[, orient, lines, ...])

从一组JSON文件创建一个数据框

read_sql_table(table_name, con, index_col[, ...])

将 SQL 数据库表读取到 DataFrame 中。

read_sql_query(sql, con, index_col[, ...])

将 SQL 查询读取到 DataFrame 中。

read_sql(sql, con, index_col, **kwargs)

将 SQL 查询或数据库表读取到 DataFrame 中。

read_table(urlpath[, blocksize, ...])

将分隔文件读入 Dask.DataFrame

read_fwf(urlpath[, blocksize, ...])

将固定宽度的文件读入 Dask.DataFrame

from_array(x[, chunksize, columns, meta])

将任何可切片数组读入 Dask 数据框

to_csv(df, filename[, single_file, ...])

将 Dask DataFrame 存储为 CSV 文件

to_parquet(df, path[, engine, compression, ...])

将 Dask.dataframe 存储到 Parquet 文件

to_hdf(df, path, key[, mode, append, ...])

将 Dask DataFrame 存储到分层数据格式 (HDF) 文件

to_sql(df, name, uri[, schema, if_exists, ...])

将 Dask 数据框存储到 SQL 表中

Dask 集合:

from_delayed(dfs[, meta, divisions, prefix, ...])

从许多 Dask Delayed 对象创建 Dask DataFrame。

from_dask_array(x[, columns, index, meta])

从 Dask 数组创建一个 Dask DataFrame。

from_map(func, *iterables[, args, meta, ...])

从自定义函数映射创建一个 DataFrame 集合。

dask.bag.core.Bag.to_dataframe([meta, ...])

从 Dask Bag 创建 Dask Dataframe。

DataFrame.to_delayed([optimize_graph])

转换为一个 dask.delayed 对象列表,每个分区一个。

to_records(df)

从 Dask Dataframe 创建 Dask Array

to_bag(df[, index, format])

从 Dask DataFrame 创建 Dask Bag

Pandas:

from_pandas()

从 Pandas DataFrame 构建 Dask DataFrame

DataFrame.from_dict(data, *, npartitions[, ...])

从 Python 字典构建 Dask DataFrame

其他文件格式:

创建

从CSV读取

你可以使用 read_csv() 将一个或多个 CSV 文件读取到 Dask DataFrame 中。它支持使用 globstrings 一次性加载多个文件:

>>> df = dd.read_csv('myfiles.*.csv')

你可以使用 blocksize 参数将单个大文件拆分:

>>> df = dd.read_csv('largefile.csv', blocksize=25e6)  # 25MB chunks

更改 blocksize 参数将改变分区数量(参见关于 分区 的解释)。使用 Dask DataFrames 时的一个经验法则是保持分区大小在 100MB 以下。

从 Parquet 读取

同样地,你可以使用 read_parquet() 来读取一个或多个 Parquet 文件。你可以读取单个 Parquet 文件:

>>> df = dd.read_parquet("path/to/mydata.parquet")

或者是一个本地 Parquet 文件的目录:

>>> df = dd.read_parquet("path/to/my/parquet/")

有关使用 Parquet 文件的更多详细信息,包括提示和最佳实践,请参阅 Dask 数据帧和 Parquet 的文档。

从云存储读取

Dask可以从多种数据存储中读取数据,包括云对象存储。你可以通过在常用数据访问函数(如 dd.read_csv)中使用的路径前加上协议(如 s3://)来实现这一点:

>>> df = dd.read_csv('s3://bucket/path/to/data-*.csv')
>>> df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')

对于像 Amazon S3 或 Google Cloud Storage 这样的远程系统,您可能需要提供凭证。这些通常存储在一个配置文件中,但在某些情况下,您可能希望通过 storage_options 参数将特定于存储的选项传递给存储后端:

>>> df = dd.read_csv('s3://bucket-name/my-data-*.csv',
...                  storage_options={'anon': True})
>>> df = dd.read_parquet('gs://dask-nyc-taxi/yellowtrip.parquet',
...                      storage_options={'token': 'anon'})

请参阅有关连接到 Amazon S3Google Cloud Storage 的文档。

从函数映射

对于上述函数未涵盖的情况,但可以通过简单的 map 操作捕获的情况,from_map() 可能是最方便的 DataFrame 创建方法。例如,此 API 可用于通过将片段映射到 DataFrame 分区,将任意 PyArrow Dataset 对象转换为 DataFrame 集合:

>>> import pyarrow.dataset as ds
>>> dataset = ds.dataset("hive_data_path", format="orc", partitioning="hive")
>>> fragments = dataset.get_fragments()
>>> func = lambda frag: frag.to_table().to_pandas()
>>> df = dd.from_map(func, fragments)

Dask 延迟

Dask delayed 在简单的 map 操作不足以捕捉数据布局的复杂性时特别有用。它允许你通过任意的 Python 函数调用来构建 Dask DataFrames,这对于处理自定义数据格式或在加载数据时嵌入特定逻辑非常有帮助。请参阅 使用 dask.delayed 与集合的文档

存储

本地编写文件

您可以在本地保存文件,假设每个工作节点都可以访问相同的文件系统。工作节点可以位于同一台机器上,或者可以挂载网络文件系统,并在每个工作节点上引用相同的路径位置。请参阅关于 本地访问数据 的文档。

写入远程位置

Dask 可以写入多种数据存储,包括云对象存储。例如,你可以将 dask.dataframe 写入 Azure 存储 blob,如下所示:

>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
>>> df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
>>> dd.to_parquet(df=df,
...               path='abfs://CONTAINER/FILE.parquet'
...               storage_options={'account_name': 'ACCOUNT_NAME',
...                                'account_key': 'ACCOUNT_KEY'}

请参阅 如何连接到远程数据的指南