提高计算性能#
特征工程是一个计算成本较高的任务。虽然Featuretools针对特征计算具有合理的默认设置,但根据数据集和问题特定考虑,有许多内置方法可以提高计算性能。
减少唯一截止时间的数量#
Featuretools创建的特征矩阵中的每一行都是在特定的截止时间计算的,该时间代表实体集中任何数据框中的数据可以用于计算特征的最后时间点。因此,计算会产生在计算中找到允许数据子集的开销。
Note
Featuretools is very precise in how it deals with time. For more information, see 处理时间.
如果有许多独特的截止时间,通常值得弄清楚如何减少数量。可以通过手动确定哪些独特时间对于预测问题是必要的,或者可以使用 近似方法 自动完成。
并行特征计算#
通过并行化特征计算过程,通常可以提高计算性能。有几种不同的方法可以用于使用 Featuretools 执行并行特征计算。下面提供了最常用方法的概述。
简单并行特征计算#
如果使用pandas的EntitySet
,Featuretools可以选择在多个核心上计算特征。控制并行性的最简单方法是指定n_jobs
参数:
fm = ft.calculate_feature_matrix(features=features,
entityset=entityset,
cutoff_time=cutoff_time,
n_jobs=2,
verbose=True)
上述命令将启动2个进程并行计算特征矩阵的块。每个进程都会收到自己的entityset副本,因此内存使用量将与并行进程数量成正比。由于entityset必须被复制到每个进程,因此在计算开始之前需要执行此操作,会有一些开销。为了避免在对calculate_feature_matrix
进行连续调用时出现这种开销,请阅读下面关于使用持久集群的部分。
调整块大小#
默认情况下,Featuretools同时计算具有相同截止时间的行。chunk_size
参数限制了将被分组然后一起计算的最大行数。如果使用并行处理进行计算,默认的块大小设置为1 / n_jobs
,以确保计算可以分布在可用的工作进程中。通常,这种行为效果很好,但如果只有很少的唯一截止时间,可能会导致更高的峰值内存使用量(由于存储在内存中的更多中间计算)或有限的并行性(如果块数少于n_jobs)。通过设置chunk_size
,我们可以在调用ft.dfs
或ft.calculate_feature_matrix
时限制每个组中的最大行数为特定数量或整体数据的百分比:
# 每个块最多使用100行
feature_matrix, features_list = ft.dfs(entityset=es,
target_dataframe_name="customers",
chunk_size=100)
我们还可以将块大小设置为总行数的百分比:
# 每个块最多使用总行数的5%
feature_matrix, features_list = ft.dfs(entityset=es,
target_dataframe_name="customers",
chunk_size=.05)
使用持久集群#
在幕后,Featuretools使用Dask的分布式调度程序来实现多进程。当您仅指定n_jobs
参数时,将为特定特征矩阵计算创建一个集群,并在计算完成后销毁它。这样做的一个缺点是,每次计算特征矩阵时,entityset都必须再次传输到工作进程。为了避免这种情况,我们希望在调用之间重用相同的集群。方法是首先创建一个集群,并告诉featuretools使用它的dask_kwargs
参数:
import featuretools as ft
from dask.distributed import LocalCluster
cluster = LocalCluster()
fm_1 = ft.calculate_feature_matrix(features=features_1,
entityset=entityset,
cutoff_time=cutoff_time,
dask_kwargs={'cluster': cluster},
verbose=True)
'cluster'
的值可以是实际的集群对象,也可以是集群调度程序地址的字符串。下面的调用也可以工作。这第二个特征矩阵计算不需要重新将entityset数据发送到工作进程,因为数据已经保存在集群上。
fm_2 = ft.calculate_feature_matrix(features=features_2,
entityset=entityset,
cutoff_time=cutoff_time,
dask_kwargs={'cluster': cluster.scheduler.address},
verbose=True)
Note
When using a persistent cluster, Featuretools publishes a copy of the EntitySet
to the cluster the first time it calculates a feature matrix. Based on the EntitySet
’s metadata the cluster will reuse it for successive computations. This means if two EntitySets
have the same metadata but different row values (e.g. new data is added to the EntitySet
), Featuretools won’t recopy the second EntitySet
in later calls. A simple way to avoid this scenario is to use a unique EntitySet
id.
使用分布式仪表板#
Dask.distributed具有基于Web的诊断仪表板,可用于分析工作节点和任务的状态。它还可以用于跟踪内存使用情况或可视化任务运行时间。可以在这里找到有关Web界面的详细描述。仪表板需要额外的Python包bokeh才能正常工作。安装bokeh后,当创建LocalCluster时,默认情况下将启动Web界面。使用n_jobs
时Featuretools创建的集群不会自动启用Web界面。为了启用Web界面,必须在dask_kwargs
中指定要启动主Web界面的端口:python3fm = ft.calculate_feature_matrix(features=features, entityset=entityset, cutoff_time=cutoff_time, n_jobs=2, dask_kwargs={'diagnostics_port': 8787} verbose=True)
通过数据分区进行并行计算#
作为Featuretools并行化的替代方案,可以对数据进行分区,并使用Dask或Apache Spark与PySpark在多个核心或集群上运行特征计算。对于大型pandas EntitySet
,这种方法可能是必要的,因为当前的并行实现会将整个EntitySet
发送到每个工作节点,这可能会耗尽工作节点的内存。Dask和Spark允许Featuretools在单台机器上的多个核心或集群上扩展。
当不需要整个数据集来计算给定实例集的特征时,我们可以将数据分割成独立的分区,并在每个分区上进行计算。例如,假设我们正在为客户计算特征,这些特征可能是“该邮政编码中的其他客户数量”或“该邮政编码中其他客户的平均年龄”。在这种情况下,我们可以按邮政编码加载数据进行分区。只要在计算时我们有该邮政编码的所有数据,我们就可以为一部分客户计算所有特征。这种方法的一个示例可以在预测下一个购买演示笔记本中看到。在这个示例中,我们按客户对数据进行分区,并且在任何给定时间只将固定数量的客户加载到内存中。我们可以很容易地使用Dask实现这一点,Dask也可以用于将计算扩展到一组计算机上。类似地,可以使用类似Spark的框架。另一个使用Dask对数据进行分区以在多个核心或集群上分发的示例可以在Featuretools on Dask笔记本中看到。这种方法在Feature Labs工程博客上的使用Dask并行特征工程文章中有详细介绍。Dask允许在单台计算机上的多个核心或集群上进行简单扩展。要使用Apache Spark和PySpark进行类似的分区和分发实现,请参考在Spark上进行特征工程笔记本。该实现展示了如何在使用Spark作为分布式框架的EC2实例集群上进行特征工程。