从 Pandas 到 Dask 的注意事项
内容
实时笔记本
你可以在 live session 中运行这个笔记本,或者在 Github 上查看它。
从 Pandas 到 Dask 的注意事项¶
Pandas
转移到在 Dask
环境中运行时的一些关键差异。[ ]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask version: {dask.__version__}')
print(f'Pandas version: {pd.__version__}')
启动 Dask 客户端以使用仪表板¶
LocalCluster
上,这还将提供一个仪表板,这对于深入了解计算非常有用。Jupyter Lab
中运行时,可以安装一个 扩展 来查看各种仪表板小部件。[ ]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client
参见 附加集群配置的文档
创建两个用于比较的DataFrame:¶
对于 Dask
对于 Pandas,Dask 自带了一些内置数据集样本,我们将使用这个样本作为我们的示例。
[ ]:
ddf = dask.datasets.timeseries()
ddf
记住
Dask 框架
是 惰性 的,因此为了看到结果我们需要运行 compute() (或者head()
,它在底层运行 compute())
[ ]:
ddf.head(2)
为了创建一个 Pandas
数据框,我们可以使用 Dask 数据框
中的 compute()
方法。
[ ]:
pdf = ddf.compute()
print(type(pdf))
pdf.head(2)
在使用 shape 属性时,我们也可以看到 dask 的惰性。
[ ]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}')
在访问所有分区之前,我们无法获取完整的形状 - 运行 len
将实现这一点
[ ]:
print(f'Dask computed shape: {len(ddf.index):,}') # expensive
从 Pandas
创建 Dask dataframe
¶
为了在现有的 Pandas dataframe
(pdf) 上利用 Dask
的功能,我们需要使用 from_pandas 方法将 Pandas dataframe
转换为 Dask dataframe
(ddf)。你必须提供用于生成 dask dataframe 的分区数量或块大小。
[ ]:
ddf2 = dask.dataframe.from_pandas(pdf, npartitions=10)
ddf2
Dask Dataframes 中的分区¶
Dask dataframe
时,我们需要提供一个 npartitions
的参数。Dask
决定如何拆分 Pandas DataFrame
并并行化计算。一个例子可以在检查 reset_index()
方法时看到:
[ ]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.loc[0]
[ ]:
ddf2 = ddf2.reset_index()
# each partition has an index=0
ddf2.loc[0].compute()
Dask Dataframe 对比 Pandas Dataframe¶
既然我们已经有了一个 dask
(ddf) 和一个 pandas
(pdf) 数据框,我们就可以开始比较与它们的交互了。
概念转变 - 从更新到插入/删除¶
inplace=True
这样的参数。重命名列¶
使用
inplace=True
不被认为是 最佳实践。
[ ]:
# Pandas
print(pdf.columns)
# pdf.rename(columns={'id':'ID'}, inplace=True)
pdf = pdf.rename(columns={'id':'ID'})
pdf.columns
[ ]:
# Dask
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns
数据操作¶
在操作数据时,有几个不同之处。
loc - Pandas¶
[ ]:
cond_pdf = (pdf['x']>0.5) & (pdf['x']<0.8)
pdf.loc[cond_pdf, ['y']] = pdf['y']* 100
pdf[cond_pdf].head(2)
错误¶
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
ddf[cond_ddf].head(2)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [14], in <cell line: 2>()
1 cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
----> 2 ddf.loc[cond_ddf, ['y']] = ddf['y']* 100
3 ddf[cond_ddf].head(2)
TypeError: '_LocIndexer' object does not support item assignment
Dask - 使用掩码/条件¶
[ ]:
# Pandas
pdf['y'] = pdf['y'].mask(cond=cond_pdf, other=pdf['y']* 100)
pdf.head(2)
[ ]:
#Dask
cond_ddf = (ddf['x']>0.5) & (ddf['x']<0.8)
ddf['y'] = ddf['y'].mask(cond=cond_ddf, other=ddf['y']* 100)
ddf.head(2)
更多信息请参见 dask 掩码文档
元参数¶
Dask
为计算创建了一个DAG(有向无环图),因此它需要理解每个计算阶段的输出是什么。[ ]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)
[ ]:
# Dask - Warning
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)
[ ]:
# Describe the outcome type of the calculation
meta_arg = pd.Series(object, name='initials')
[ ]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1], meta=meta_arg)
ddf.head(2)
[ ]:
# similar when using a function
def func(row):
if (row['x']> 0):
return row['x'] * 1000
else:
return row['y'] * -1
[ ]:
ddf['z'] = ddf.apply(func, axis=1, meta=('z', 'float'))
ddf.head(2)
地图分区¶
我们可以使用 map_partitions 方法为每个分区提供一个临时函数来运行。主要用于在
Dask
或Pandas
中未实现的函数。最后,我们可以返回一个新的
dataframe
,它需要在meta
参数中进行描述。该函数还可以包含参数。
[ ]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
df['dist'] = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2
+ (df[coor_y] - df[coor_y].shift())**2 )
return df.drop(drop_cols, axis=1)
ddf2 = ddf.map_partitions(func2
, coor_x='x'
, coor_y='y'
, drop_cols=['initials', 'z']
, meta=pd.DataFrame({'ID':'i8'
, 'name':str
, 'x':'f8'
, 'y':'f8'
, 'dist':'f8'}, index=[0]))
ddf2.head()
将索引转换为时间列¶
[ ]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)
[ ]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
# or ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf['times'] = ddf['times'].dt.time
ddf =client.persist(ddf)
ddf.head(2)
删除列中的NA值¶
[ ]:
# no issue with regular drop columns
pdf = pdf.drop(labels=['initials'],axis=1)
ddf = ddf.drop(labels=['initials','z'],axis=1)
[ ]:
# Pandas
pdf = pdf.assign(colna = None)
# Dask
ddf = ddf.assign(colna = None)
[ ]:
pdf = pdf.dropna(axis=1, how='all')
pdf.head(2)
为了使 Dask
删除所有 na
的列,它必须使用 compute()
检查所有分区。
[ ]:
if ddf.colna.isnull().all().compute() == True: # check if all values in column are Null - expensive
ddf = ddf.drop(labels=['colna'],axis=1)
ddf.head(2)
1.4 重置索引¶
[ ]:
# Pandas
pdf =pdf.reset_index(drop=True)
pdf.head(2)
[ ]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)
读取 / 保存文件¶
在使用
pandas
和dask
时,建议使用 parquet 格式。在使用
Dask
时 - 文件可以通过多个工作线程读取。大多数
kwargs
适用于读写文件,例如 ddf = dd.read_csv(‘data/pd2dd/ddf*.csv’, compression=’gzip’, header=False)。然而,有些功能是不可用的,例如
nrows
。
查看文档 (包括输出文件命名的选项)。
保存文件¶
[ ]:
from pathlib import Path
output_dir_file = Path('data/pdf_single_file.csv')
output_dir_file.parent.mkdir(parents=True, exist_ok=True)
[ ]:
%%time
# Pandas
pdf.to_csv(output_dir_file)
[ ]:
list(output_dir_file.parent.glob('*.csv'))
注意 '*'
用于允许多个文件重命名。
[ ]:
output_dask_dir = Path('data/dask_multi_files/')
output_dask_dir.mkdir(parents=True, exist_ok=True)
[ ]:
%%time
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)
要找到将决定输出文件数量的分区数,请使用 dask.dataframe.npartitions
要更改输出文件的数量,请使用 repartition,这是一个昂贵的操作。
[ ]:
ddf.npartitions
读取多个文件¶
对于 pandas
,可以迭代并连接文件 参见 Stack Overflow 的回答。
[ ]:
%%time
# Pandas
concat_df = pd.concat([pd.read_csv(f)
for f in list(output_dask_dir.iterdir())])
len(concat_df)
[ ]:
%%time
# Dask
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
_ddf
记住 Dask
是惰性的 - 因此它不会 真正 读取文件,直到它需要这样做…
[ ]:
%%time
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
len(_ddf)
[ ]:
# e.g.
_ddf = dd.read_csv(output_dask_dir/'ddf*.csv')
# do some filter
_ddf = client.persist(_ddf)
# do some computations
_ddf.head(2)
按组 - 自定义聚合¶
[ ]:
# prepare pandas dataframe
pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.time.astype(str).str[-2:]
cols_for_demo =['name', 'ID','seconds']
pdf[cols_for_demo].head()
[ ]:
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
(lambda x: list(set(x.to_list())))
for att_col_gr in gp_col]
[ ]:
%%time
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
print(df_edge_att.head(2))
记住,在某些情况下
Pandas
更高效(假设你可以将所有数据加载到内存中)。
[ ]:
def set_list_att(x: dd.Series):
return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf[cols_for_demo].head(2)
[ ]:
ddf.columns
[ ]:
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att'))
for att_col_gr in gp_col]
[ ]:
%%time
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
[ ]:
import itertools
custom_agg = dd.Aggregation(
'custom_agg',
lambda s: s.apply(set),
lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)
[ ]:
%%time
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)
调试¶
调试可能具有挑战性… 1. 在没有客户端的情况下运行代码 2. 使用仪表板分析器 3. 验证DAG的完整性
损坏的DAG¶
在这个例子中,我们展示了当DAG被破坏后,您可能需要重置计算。
[ ]:
# reset dataframe
ddf = dask.datasets.timeseries()
ddf.head(1)
[ ]:
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 # `^` <-- wrong syntax
+ (df[coor_y] - df[coor_y].shift())^2 ) # `^` <-- wrong syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
一切都好吗?
# Results in error ddf.head() --------------------------------------------------------------------------- TypeError Traceback (most recent call last)即使函数被修正,DAG 仍然损坏
[ ]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 # `**` <-- correct syntax
+ (df[coor_y] - df[coor_y].shift())**2 ) # `**` <-- correct syntax
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
我们需要重置数据框
[ ]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 #corrected math function
+ (df[coor_y] - df[coor_y].shift())**2 )
return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
, meta=('float'))
ddf.head(2)