实时笔记本

你可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

使用 Dask 数组的 Xarray

Xarray 数据集

Xarray 是一个开源项目和Python包,它扩展了 Pandas 的带标签数据功能到N维数组类数据集。它的API与 NumPyPandas 类似,并且在底层支持 DaskNumPy 数组。

[ ]:
%matplotlib inline

from dask.distributed import Client
import xarray as xr

启动 Dask 客户端以使用仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。

当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。这可能需要一些努力来安排你的窗口,但在学习时同时看到两者是非常有用的。

[ ]:
client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client

打开一个示例数据集

我们将使用 xarray 的一些教程数据作为此示例。通过指定块形状,xarray 将自动为 Dataset 中的每个数据变量创建 Dask 数组。在 xarray 中,Datasets 是标记数组的类似字典的容器,类似于 pandas.DataFrame。请注意,我们在指定块形状时利用了 xarray 的维度标签。

[ ]:
ds = xr.tutorial.open_dataset('air_temperature',
                              chunks={'lat': 25, 'lon': 25, 'time': -1})
ds

快速检查上面的 Dataset ,我们会注意到这个 Dataset 有三个类似于 NumPy 中轴的 维度latlontime),三个类似于 pandas.Index 对象的 坐标变量 (同样命名为 latlontime),以及一个数据变量(air)。Xarray 还将特定于数据集的元数据作为 属性 保存。

[ ]:
da = ds['air']
da

在 xarray 中的每个数据变量被称为 DataArray。这些是 xarray 中的基本标记数组对象。与 Dataset 类似,DataArrays 也有 维度坐标,这些支持其许多基于标签的操作。

[ ]:
da.data

通过 data 属性可以访问底层数据数组。在这里我们可以看到我们有一个 Dask 数组。如果这个数组是由一个 NumPy 数组支持的,这个属性将指向数组中的实际值。

使用标准 Xarray 操作

在几乎所有情况下,使用 xarray 对象的操作都是相同的,无论底层数据是存储为 Dask 数组还是 NumPy 数组。

[ ]:
da2 = da.groupby('time.month').mean('time')
da3 = da - da2
da3

当你希望结果以存储为NumPy数组的``xarray.DataArray``形式返回时,调用``.compute()``或``.load()``。

注意:load() 会就地修改 DataArray,而 compute() 返回一个新的 DataArray 对象。

如果你在上面启动了 Client(),那么在计算过程中你可能想要查看状态页面。

[ ]:
computed_da = da3.compute()
type(computed_da.data)
[ ]:
computed_da

在内存中持久化数据

如果你的数据集有足够的可用RAM,那么你可以将数据持久化在内存中。

这使得未来的计算速度大大提高。

[ ]:
da = da.persist()

时间序列操作

因为我们有一个 datetime 索引,时间序列操作可以高效地工作。这里我们演示了 xarray 的 resample 方法的使用:

[ ]:
resampled_da = da.resample(time='1w').mean('time')
resampled_da.std('time')
[ ]:
resampled_da.std('time').plot(figsize=(12, 8))

和滚动窗口操作:

[ ]:
da_smooth = da.rolling(time=30).mean()
da_smooth

由于 xarray 将其每个坐标变量存储在内存中,按标签切片是微不足道的,并且完全惰性。

[ ]:
%time da.sel(time='2013-01-01T18:00:00')
[ ]:
%time da.sel(time='2013-01-01T18:00:00').load()

自定义工作流和自动并行化

几乎所有 xarray 的内置操作都适用于 Dask 数组。如果你想使用一个未被 xarray 封装的函数,一种选择是从 xarray 对象中提取 Dask 数组(.data)并直接使用 Dask。

另一个选项是使用 xarray 的 apply_ufunc() 函数,它可以自动化尴尬并行的“映射”类型操作,其中为处理 NumPy 数组编写的函数应重复应用于包含 Dask 数组的 xarray 对象。它的工作方式类似于 dask.array.map_blocks()dask.array.blockwise(),但不需要中间层的抽象。

这里我们展示了一个使用 NumPy 操作和来自 bottleneck 的快速函数的示例,我们用它来计算斯皮尔曼等级相关系数:

[ ]:
import numpy as np
import xarray as xr
import bottleneck

def covariance_gufunc(x, y):
    return ((x - x.mean(axis=-1, keepdims=True))
            * (y - y.mean(axis=-1, keepdims=True))).mean(axis=-1)

def pearson_correlation_gufunc(x, y):
    return covariance_gufunc(x, y) / (x.std(axis=-1) * y.std(axis=-1))

def spearman_correlation_gufunc(x, y):
    x_ranks = bottleneck.rankdata(x, axis=-1)
    y_ranks = bottleneck.rankdata(y, axis=-1)
    return pearson_correlation_gufunc(x_ranks, y_ranks)

def spearman_correlation(x, y, dim):
    return xr.apply_ufunc(
        spearman_correlation_gufunc, x, y,
        input_core_dims=[[dim], [dim]],
        dask='parallelized',
        output_dtypes=[float])

在上面的例子中,我们处理了一些空气温度数据。在这个例子中,我们将使用原始空气温度数据与我们也创建的平滑版本(da_smooth)来计算斯皮尔曼相关性。为此,我们还需要提前重新分块数据。

[ ]:
corr = spearman_correlation(da.chunk({'time': -1}),
                            da_smooth.chunk({'time': -1}),
                            'time')
corr
[ ]:
corr.plot(figsize=(12, 8))