实时笔记本

你可以在 live session Binder 中运行这个笔记本,或者在 Github 上查看它。

从 Pandas 到 Dask 的注意事项

本笔记本重点介绍了将代码从 Pandas 转移到在 Dask 环境中运行时的一些关键差异。
大多数问题都有一个指向 Dask 文档 的链接,以获取更多信息。
[ ]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask version: {dask.__version__}')
print(f'Pandas version: {pd.__version__}')

启动 Dask 客户端以使用仪表板

启动 Dask 客户端是可选的。在这个例子中,我们运行在 LocalCluster 上,这还将提供一个仪表板,这对于深入了解计算非常有用。
当你创建一个客户端时(如下所示),仪表板的链接将会变得可见。
Jupyter Lab 中运行时,可以安装一个 扩展 来查看各种仪表板小部件。
[ ]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client

参见 附加集群配置的文档

创建两个用于比较的DataFrame:

  1. 对于 Dask

  2. 对于 Pandas,Dask 自带了一些内置数据集样本,我们将使用这个样本作为我们的示例。

[ ]:
ddf = dask.datasets.timeseries()
ddf
  • 记住 Dask 框架惰性 的,因此为了看到结果我们需要运行 compute() (或者 head() ,它在底层运行 compute())

[ ]:
ddf.head(2)

为了创建一个 Pandas 数据框,我们可以使用 Dask 数据框 中的 compute() 方法。

[ ]:
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)

在使用 shape 属性时,我们也可以看到 dask 的惰性

[ ]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')

在访问所有分区之前,我们无法获取完整的形状 - 运行 len 将实现这一点

[ ]:
print(f'Dask computed shape: {len(ddf.index):,}')  # expensive

Pandas 创建 Dask dataframe

为了在现有的 Pandas dataframe (pdf) 上利用 Dask 的功能,我们需要使用 from_pandas 方法将 Pandas dataframe 转换为 Dask dataframe (ddf)。你必须提供用于生成 dask dataframe 的分区数量或块大小。

[ ]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2

Dask Dataframes 中的分区

请注意,当我们创建一个 Dask dataframe 时,我们需要提供一个 npartitions 的参数。
分区的数量将帮助 Dask 决定如何拆分 Pandas DataFrame 并并行化计算。
每个分区是一个 独立的 数据框。更多信息请参阅 分区文档

一个例子可以在检查 reset_index() 方法时看到:

[ ]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
[ ]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()

Dask Dataframe 对比 Pandas Dataframe

既然我们已经有了一个 dask (ddf) 和一个 pandas (pdf) 数据框,我们就可以开始比较与它们的交互了。

概念转变 - 从更新到插入/删除

Dask 不更新 - 因此没有像 Pandas 中存在的 inplace=True 这样的参数。
更多详情请参见 github 上的 issue#653

重命名列

  • 使用 inplace=True 不被认为是 最佳实践

[ ]:
# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
# Dask - Error # ddf.rename(columns={'id':'ID'}, inplace=True) # ddf.columns ''' python --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # Dask - Error ----> 2 ddf.rename(columns={'id':'ID'}, inplace=True) 3 ddf.columns TypeError: rename() got an unexpected keyword argument 'inplace' '''
[ ]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns

数据操作

在操作数据时,有几个不同之处。

loc - Pandas

[ ]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)

错误

cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [14], in <cell line: 2>()
      1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
      3 ddf[cond_ddf].head(2)

TypeError: '_LocIndexer' object does not support item assignment

Dask - 使用掩码/条件

[ ]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
[ ]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)

更多信息请参见 dask 掩码文档

元参数

Dask 的一个关键特性是引入了 meta 参数。
> meta 是计算输出名称/类型的处方
由于 Dask 为计算创建了一个DAG(有向无环图),因此它需要理解每个计算阶段的输出是什么。
更多信息请参见 元数据文档
[ ]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
[ ]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
[ ]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
[ ]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
[ ]:
# similar when using a function
def func(row):
    if (row['x']> 0):
        return row['x'] * 1000
    else:
        return row['y'] * -1
[ ]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)

地图分区

  • 我们可以使用 map_partitions 方法为每个分区提供一个临时函数来运行。主要用于在 DaskPandas 中未实现的函数。

  • 最后,我们可以返回一个新的 dataframe,它需要在 meta 参数中进行描述。该函数还可以包含参数。

[ ]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
                           +  (df[coor_y] - df[coor_y].shift())**2 )
    return df.drop(drop_cols, axis=1)

ddf2 = ddf.map_partitions(func2
                          , coor_x='x'
                          , coor_y='y'
                          , drop_cols=['initials', 'z']
                          , meta=pd.DataFrame({'ID':'i8'
                                              , 'name':str
                                              , 'x':'f8'
                                              , 'y':'f8'
                                              , 'dist':'f8'}, index=[0]))
ddf2.head()

将索引转换为时间列

[ ]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
[ ]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or  ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)

删除列中的NA值

[ ]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
[ ]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
[ ]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)

为了使 Dask 删除所有 na 的列,它必须使用 compute() 检查所有分区。

[ ]:
if ddf.colna.isnull().all().compute() == True:   # check if all values in column are Null -  expensive
    ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)

1.4 重置索引

[ ]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
[ ]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)

读取 / 保存文件

  • 在使用 pandasdask 时,建议使用 parquet 格式

  • 在使用 Dask 时 - 文件可以通过多个工作线程读取。

  • 大多数 kwargs 适用于读写文件,例如 ddf = dd.read_csv(‘data/pd2dd/ddf*.csv’, compression=’gzip’, header=False)。

  • 然而,有些功能是不可用的,例如 nrows

查看文档 (包括输出文件命名的选项)。

保存文件

[ ]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
[ ]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
[ ]:
list(output_dir_file.parent.glob('*.csv'))

注意 '*' 用于允许多个文件重命名。

[ ]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
[ ]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)

要找到将决定输出文件数量的分区数,请使用 dask.dataframe.npartitions

要更改输出文件的数量,请使用 repartition,这是一个昂贵的操作。

[ ]:
ddf.npartitions

读取多个文件

对于 pandas ,可以迭代并连接文件 参见 Stack Overflow 的回答

[ ]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
                       for f in list(output_dask_dir.iterdir())])
len(concat_df)
[ ]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf

记住 Dask 是惰性的 - 因此它不会 真正 读取文件,直到它需要这样做…

[ ]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
## 考虑使用 client.persist() 由于 Dask 是惰性的 - 它可能会再次运行 整个 图/DAG,即使它已经在之前的单元格中运行了部分计算。因此,使用 persist 来将结果保留在内存中
更多信息可以在这篇 stackoverflow 问题 中阅读,或者在 这篇帖子 中查看示例。
当在脚本(而不是Jupyter笔记本)中运行包含代码循环的代码时,也应使用此概念。
[ ]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)

按组 - 自定义聚合

除了仓库中的 groupby 笔记本示例 之外 -
这是另一个尝试消除使用 groupby.apply 的示例。
在这个例子中,我们将列分组为唯一的列表。
[ ]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
[ ]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
               (lambda x: list(set(x.to_list())))
               for att_col_gr in gp_col]
[ ]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
  • 记住,在某些情况下 Pandas 更高效(假设你可以将所有数据加载到内存中)。

[ ]:
def set_list_att(x: dd.Series):
        return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
[ ]:
ddf.columns
[ ]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
               for att_col_gr in gp_col]
[ ]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
我们可以做得更好…
使用 dask 自定义聚合 被认为更好
[ ]:
import itertools
custom_agg = dd.Aggregation(
    'custom_agg',
    lambda s: s.apply(set),
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
[ ]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)

调试

调试可能具有挑战性… 1. 在没有客户端的情况下运行代码 2. 使用仪表板分析器 3. 验证DAG的完整性

损坏的DAG

在这个例子中,我们展示了当DAG被破坏后,您可能需要重置计算。

[ ]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
[ ]:
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2    # `^` <-- wrong syntax
                     +  (df[coor_y] - df[coor_y].shift())^2 )  # `^` <-- wrong syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))

一切都好吗?

# Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # returns an error because of ^2 (needs to be **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'
  • 即使函数被修正,DAG 仍然损坏

[ ]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  # `**` <-- correct syntax
                     +  (df[coor_y] - df[coor_y].shift())**2 )  # `**` <-- correct syntax
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
# Still Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last) in 1 # returns an error because of ^2 (needs to be **2) ----> 2 ddf.head() c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\dataframe\core.py in head(self, n, npartitions, compute) 898 899 if compute: --> 900 result = result.compute() 901 return result 902 c:\users\jsber\.virtualenvs\dask-examples-3r4mgfnb\lib\site-packages\dask\base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() pandas\_libs\ops.pyx in pandas._libs.ops.vec_binop() TypeError: unsupported operand type(s) for ^: 'float' and 'bool'

我们需要重置数据框

[ ]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2    #corrected math function
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
ddf.head(2)