创建 Dask 数组
内容
创建 Dask 数组¶
你可以从多种常见来源加载或存储 Dask 数组,如 HDF5、NetCDF、Zarr,或任何支持 NumPy 风格切片的格式。
|
从看起来像数组的对象创建 dask 数组。 |
|
从 dask 延迟值创建一个 dask 数组 |
|
从 npy 文件堆栈加载 dask 数组 |
|
从 zarr 存储格式加载数组 |
|
沿新轴堆叠数组 |
|
沿现有轴连接数组 |
NumPy 切片¶
|
从看起来像数组的对象创建 dask 数组。 |
许多存储格式都有 Python 项目,它们使用 NumPy 切片语法来暴露存储。这些包括 HDF5、NetCDF、BColz、Zarr、GRIB 等。例如,我们可以使用 h5py 从 HDF5 文件加载一个 Dask 数组:
>>> import h5py
>>> f = h5py.File('myfile.hdf5') # HDF5 file
>>> d = f['/data/path'] # Pointer on on-disk array
>>> d.shape # d can be very large
(1000000, 1000000)
>>> x = d[:5, :5] # We slice to get numpy arrays
给定一个如上所述的 d
对象,该对象具有 dtype
和 shape
属性,并且支持 NumPy 风格的切片,我们可以构造一个惰性的 Dask 数组:
>>> import dask.array as da
>>> x = da.from_array(d, chunks=(1000, 1000))
这个过程是完全惰性的。无论是创建 h5py 对象还是用 da.from_array
包装它,都没有加载任何数据。
随机数据¶
为了实验或基准测试,通常会创建随机数据数组。dask.array.random
模块实现了 numpy.random
模块中的大部分函数。我们列出了一些常用函数,但完整的列表请参见 数组 API:
|
从二项分布中抽取样本。 |
|
从正态(高斯)分布中随机抽取样本。 |
|
从泊松分布中抽取样本。 |
|
返回半开区间 [0.0, 1.0) 内的随机浮点数。 |
>>> import dask.array as da
>>> rng = da.random.default_rng()
>>> x = rng.random((10000, 10000), chunks=(1000, 1000))
连接与堆叠¶
|
沿新轴堆叠数组 |
|
沿现有轴连接数组 |
通常我们将数据存储在几个不同的位置,并希望将它们拼接在一起:
dask_arrays = []
for fn in filenames:
f = h5py.File(fn)
d = f['/data']
array = da.from_array(d, chunks=(1000, 1000))
dask_arrays.append(array)
x = da.concatenate(dask_arrays, axis=0) # concatenate arrays along first axis
更多信息,请参阅 拼接和堆叠 文档。
使用 dask.delayed
¶
|
从 dask 延迟值创建一个 dask 数组 |
|
沿新轴堆叠数组 |
|
沿现有轴连接数组 |
有时,NumPy 风格的数据存在于不支持 NumPy 风格切片的格式中。如果我们有一个可以生成完整数组片段的 Python 函数,我们仍然可以使用 dask.delayed 围绕这些数据构建 Dask 数组。Dask delayed 允许我们延迟一个将创建 NumPy 数组的函数调用。然后,我们可以使用 da.from_delayed
包装这个延迟对象,提供 dtype 和 shape 来生成一个单块的 Dask 数组。此外,我们可以使用之前的 stack
或 concatenate
来构建一个更大的惰性数组。
例如,考虑使用 skimage.io.imread
加载一系列图像:
import skimage.io
import dask.array as da
import dask
imread = dask.delayed(skimage.io.imread, pure=True) # Lazy version of imread
filenames = sorted(glob.glob('*.jpg'))
lazy_images = [imread(path) for path in filenames] # Lazily evaluate imread on each path
sample = lazy_images[0].compute() # load the first image (assume rest are same shape/dtype)
arrays = [da.from_delayed(lazy_image, # Construct a small Dask array
dtype=sample.dtype, # for every lazy value
shape=sample.shape)
for lazy_image in lazy_images]
stack = da.stack(arrays, axis=0) # Stack all small Dask arrays into one
通常使用 da.map_blocks
比使用 da.stack
要快得多。
import glob
import skimage.io
import numpy as np
import dask.array as da
filenames = sorted(glob.glob('*.jpg'))
def read_one_image(block_id, filenames=filenames, axis=0):
# a function that reads in one chunk of data
path = filenames[block_id[axis]]
image = skimage.io.imread(path)
return np.expand_dims(image, axis=axis)
# load the first image (assume rest are same shape/dtype)
sample = skimage.io.imread(filenames[0])
stack = da.map_blocks(
read_one_image,
dtype=sample.dtype,
chunks=((1,) * len(filenames), *sample.shape)
)
从 Dask DataFrame¶
有几种方法可以从 Dask DataFrame 创建 Dask 数组。Dask DataFrame 有一个 to_dask_array
方法:
>>> df = dask.dataframes.from_pandas(...)
>>> df.to_dask_array()
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
这与 Pandas 中的 to_numpy 函数相对应。values
属性也同样支持:
>>> df.values
dask.array<values, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
然而,这些数组没有已知的块大小,因为 dask.dataframe 不跟踪每个分区中的行数。这意味着一些操作,如切片,将无法正确操作。
块大小可以计算:
>>> df.to_dask_array(lengths=True)
dask.array<array, shape=(100, 3), dtype=float64, chunksize=(50, 3), chunktype=numpy.ndarray>
指定 lengths=True
会触发块大小的立即计算。这使得依赖于已知块大小的下游计算(例如,切片)得以进行。
Dask DataFrame 的 to_records
方法也会返回一个 Dask 数组,但不计算形状信息:
>>> df.to_records()
dask.array<to_records, shape=(nan,), dtype=(numpy.record, [('index', '<i8'), ('x', '<f8'), ('y', '<f8'), ('z', '<f8')]), chunksize=(nan,), chunktype=numpy.ndarray>
如果你有一个将 Pandas DataFrame 转换为 NumPy 数组的函数,那么在 Dask DataFrame 上调用 map_partitions
并使用该函数将生成一个 Dask 数组:
>>> df.map_partitions(np.asarray)
dask.array<asarray, shape=(nan, 3), dtype=float64, chunksize=(nan, 3), chunktype=numpy.ndarray>
与 NumPy 数组的交互¶
Dask 数组操作将自动将 NumPy 数组转换为单块 dask 数组:
>>> x = da.sum(np.ones(5))
>>> x.compute()
5
当 NumPy 和 Dask 数组交互时,结果将是一个 Dask 数组。自动重分块规则通常会将 NumPy 数组切片为适当的 Dask 块形状:
>>> x = da.ones(10, chunks=(5,))
>>> y = np.ones(10)
>>> z = x + y
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,), chunktype=numpy.ndarray>
这些交互不仅适用于 NumPy 数组,还适用于任何具有 shape 和 dtype 属性并实现了 NumPy 切片语法的对象。
内存映射¶
内存映射可以是一种非常有效的方法来访问原始二进制数据,因为如果数据已经在文件系统缓存中,它几乎没有任何开销。对于线程调度器,从原始二进制文件创建Dask数组可以简单到 a = da.from_array(np.memmap(filename, shape=shape, dtype=dtype, mode='r'))
。
对于多进程或分布式调度器,每个数组块的内存映射应在正确的 worker 进程上创建,而不是在主进程上创建,以避免通过集群进行数据传输。这可以通过使用 dask.delayed
包装创建内存映射的函数来实现。
import numpy as np
import dask
import dask.array as da
def mmap_load_chunk(filename, shape, dtype, offset, sl):
'''
Memory map the given file with overall shape and dtype and return a slice
specified by :code:`sl`.
Parameters
----------
filename : str
shape : tuple
Total shape of the data in the file
dtype:
NumPy dtype of the data in the file
offset : int
Skip :code:`offset` bytes from the beginning of the file.
sl:
Object that can be used for indexing or slicing a NumPy array to
extract a chunk
Returns
-------
numpy.memmap or numpy.ndarray
View into memory map created by indexing with :code:`sl`,
or NumPy ndarray in case no view can be created using :code:`sl`.
'''
data = np.memmap(filename, mode='r', shape=shape, dtype=dtype, offset=offset)
return data[sl]
def mmap_dask_array(filename, shape, dtype, offset=0, blocksize=5):
'''
Create a Dask array from raw binary data in :code:`filename`
by memory mapping.
This method is particularly effective if the file is already
in the file system cache and if arbitrary smaller subsets are
to be extracted from the Dask array without optimizing its
chunking scheme.
It may perform poorly on Windows if the file is not in the file
system cache. On Linux it performs well under most circumstances.
Parameters
----------
filename : str
shape : tuple
Total shape of the data in the file
dtype:
NumPy dtype of the data in the file
offset : int, optional
Skip :code:`offset` bytes from the beginning of the file.
blocksize : int, optional
Chunk size for the outermost axis. The other axes remain unchunked.
Returns
-------
dask.array.Array
Dask array matching :code:`shape` and :code:`dtype`, backed by
memory-mapped chunks.
'''
load = dask.delayed(mmap_load_chunk)
chunks = []
for index in range(0, shape[0], blocksize):
# Truncate the last chunk if necessary
chunk_size = min(blocksize, shape[0] - index)
chunk = dask.array.from_delayed(
load(
filename,
shape=shape,
dtype=dtype,
offset=offset,
sl=slice(index, index + chunk_size)
),
shape=(chunk_size, ) + shape[1:],
dtype=dtype
)
chunks.append(chunk)
return da.concatenate(chunks, axis=0)
x = mmap_dask_array(
filename='testfile-50-50-100-100-float32.raw',
shape=(50, 50, 100, 100),
dtype=np.float32
)
存储 Dask 数组¶
|
将 dask 数组存储在类似数组的对象中,覆盖目标中的数据 |
|
在HDF5文件中存储数组 |
|
将 dask 数组写入 .npy 文件堆栈 |
|
将数组保存为 zarr 存储格式 |
|
一次计算多个 dask 集合。 |
在内存中¶
|
一次计算多个 dask 集合。 |
如果你有少量的数据,你可以调用 np.array
或 .compute()
在你的 Dask 数组上,将其转换为普通的 NumPy 数组:
>>> x = da.arange(6, chunks=3)
>>> y = x**2
>>> np.array(y)
array([0, 1, 4, 9, 16, 25])
>>> y.compute()
array([0, 1, 4, 9, 16, 25])
NumPy 风格切片¶
|
将 dask 数组存储在类似数组的对象中,覆盖目标中的数据 |
你可以将 Dask 数组存储在任何支持 NumPy 风格切片赋值的对象中,例如 h5py.Dataset
:
>>> import h5py
>>> f = h5py.File('myfile.hdf5')
>>> d = f.require_dataset('/data', shape=x.shape, dtype=x.dtype)
>>> da.store(x, d)
此外,你可以通过传递源和目标的列表,在一次计算中存储多个数组:
>>> da.store([array1, array2], [output1, output2]) # doctest: +SKIP
HDF5¶
|
在HDF5文件中存储数组 |
HDF5 足够常见,因此有一个特殊函数 to_hdf5
用于使用 h5py
将数据存储到 HDF5 文件中:
>>> da.to_hdf5('myfile.hdf5', '/y', y) # doctest: +SKIP
你可以通过传递字典,使用函数 da.to_hdf5
在一次计算中存储多个数组:
>>> da.to_hdf5('myfile.hdf5', {'/x': x, '/y': y}) # doctest: +SKIP
Zarr¶
Zarr 格式是一种按块存储的二进制数组文件格式,具有多种编码和压缩选项。由于每个块存储在单独的文件中,因此非常适合在读写(对于后者,如果 Dask 数组块与目标对齐)时进行并行访问。此外,还支持存储在 远程数据服务 中,如 S3 和 GCS。
例如,要将数据保存到本地 zarr 数据集,您可以这样做:
>>> arr.to_zarr('output.zarr')
或者保存到S3上的特定存储桶:
>>> arr.to_zarr('s3://mybucket/output.zarr', storage_option={'key': 'mykey',
'secret': 'mysecret'})
或您自己的自定义 zarr 数组:
>>> z = zarr.create((10,), dtype=float, store=zarr.ZipStore("output.zarr"))
>>> arr.to_zarr(z)
要检索这些数据,你可以使用 da.from_zarr
并传入完全相同的参数。除非另有指定,否则生成的 Dask 数组的块划分由文件的保存方式定义。
TileDB¶
TileDB 是一种二进制数组格式和存储管理器,具有可调的块划分、布局和压缩选项。TileDB 存储管理器库支持可扩展的存储后端,如 S3 API 兼容的对象存储和 HDFS,具有自动扩展功能,并支持多线程和多进程读取(一致)和写入(最终一致)。
要将数据保存到本地 TileDB 数组中:
>>> arr.to_tiledb('output.tdb')
或保存到S3上的一个存储桶中:
>>> arr.to_tiledb('s3://mybucket/output.tdb',
storage_options={'vfs.s3.aws_access_key_id': 'mykey',
'vfs.s3.aws_secret_access_key': 'mysecret'})
可以通过使用相同的URI和任何必要的参数运行 da.from_tiledb 来检索文件。
中间存储¶
|
将 dask 数组存储在类似数组的对象中,覆盖目标中的数据 |
在某些情况下,可能希望将中间结果存储在长期存储中。这与 persist
不同,后者主要用于管理 Dask 中的中间结果,这些结果不一定具有长期性。此外,它与存储最终结果不同,因为这些结果标志着 Dask 图的结束。因此,中间结果更容易在不重新加载数据的情况下重用。中间存储主要在数据需要在 Dask 外部使用时(例如在磁盘上、数据库中、云中等)有用。它可以作为长时间运行或易出错的计算的检查点。
中间存储用例与典型存储用例不同,因为返回给用户的是一个表示该存储操作结果的 Dask 数组。这通常通过将 store
函数的 return_stored
标志设置为 True
来完成。
x.store() # stores data, returns nothing
x = x.store(return_stored=True) # stores data, returns new dask array backed by that data
用户可以决定存储操作是立即发生(通过将 compute
标志设置为 True
)还是稍后发生(通过将 compute
标志设置为 False
)。在所有其他方面,这与对 store
的普通调用行为相同。以下是一些示例。
>>> import dask.array as da
>>> import zarr as zr
>>> c = (2, 2)
>>> d = da.ones((10, 11), chunks=c)
>>> z1 = zr.open_array('lazy.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> z2 = zr.open_array('eager.zarr', shape=d.shape, dtype=d.dtype, chunks=c)
>>> d1 = d.store(z1, compute=False, return_stored=True)
>>> d2 = d.store(z2, compute=True, return_stored=True)
这可以与上述任何其他存储策略结合使用,无论是在文档中提到的,还是针对任何专门的存储类型。
插件¶
我们可以在构建 Dask 数组时运行任意用户定义的函数。这使我们能够构建各种自定义行为,以改进调试、用户警告等。您可以将要运行的函数列表注册到全局 array_plugins=
值中:
>>> def f(x):
... print(x.nbytes)
>>> with dask.config.set(array_plugins=[f]):
... x = da.ones((10, 1), chunks=(5, 1))
... y = x.dot(x.T)
80
80
800
800
如果插件函数返回 None,那么输入的 Dask 数组将保持不变。如果插件函数返回其他值,那么该值将是构造函数的结果。
示例¶
自动计算¶
我们可能希望将一些 Dask 数组代码转换为普通的 NumPy 代码。这在某些情况下非常有用,例如,可以立即追踪到那些由于 Dask 的延迟语义而被隐藏的错误:
>>> with dask.config.set(array_plugins=[lambda x: x.compute()]):
... x = da.arange(5, chunks=2)
>>> x # this was automatically converted into a numpy array
array([0, 1, 2, 3, 4])
警告大块内容¶
如果我们希望在用户创建过大的块时发出警告:
def warn_on_large_chunks(x):
shapes = list(itertools.product(*x.chunks))
nbytes = [x.dtype.itemsize * np.prod(shape) for shape in shapes]
if any(nb > 1e9 for nb in nbytes):
warnings.warn("Array contains very large chunks")
with dask.config.set(array_plugins=[warn_on_large_chunks]):
...
结合¶
你也可以将这些插件组合成一个列表。它们将一个接一个地运行,通过它们链接结果:
with dask.config.set(array_plugins=[warn_on_large_chunks, lambda x: x.compute()]):
...