机器学习
内容
机器学习¶
机器学习是一个涉及许多不同工作流程的广泛领域。本页列出了Dask可以帮助您处理ML工作负载的几种更常见的方式。
超参数优化
提升树
批量预测
超参数优化¶
Optuna¶
对于最先进的超参数优化(HPO),我们推荐使用 Optuna 库,以及相关的 Dask 集成。
在 Optuna 中,您构建一个目标函数,该函数接受一个试验对象,该对象从您在代码中定义的分布生成参数。您的目标函数最终会产生一个分数。Optuna 会根据它收到的分数智能地建议分布中的值。
def objective(trial):
params = {
"max_depth": trial.suggest_int("max_depth", 2, 10, step=1),
"learning_rate": trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
...
}
model = train_model(train_data, **params)
result = score(model, test_data)
return result
Dask 和 Optuna 经常一起使用,通过在 Dask 调度器上并行运行许多目标函数并同步分数和参数选择。为此,我们使用 Optuna 中的 DaskStore
对象。
import optuna
storage = optuna.integration.DaskStorage()
study = optuna.create_study(
direction="maximize",
storage=storage, # This makes the study Dask-enabled
)
然后我们并行运行许多优化方法。
from dask.distributed import LocalCluster, wait
cluster = LocalCluster(processes=False) # replace this with some scalable cluster
client = cluster.get_client()
futures = [
client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(500)
]
wait(futures)
print(study.best_params)
有关更完整的示例,请参见此 Optuna + XGBoost 示例。
Dask 未来¶
此外,在更简单的情况下,人们通常使用 Dask Futures 来训练具有大量参数的相同模型。Dask Futures 是一个通用 API,用于在各种输入上运行普通 Python 函数。示例如下:
from dask.distributed import LocalCluster
cluster = LocalCluster(processes=False) # replace this with some scalable cluster
client = cluster.get_client()
def train_and_score(params: dict) -> float:
data = load_data()
model = make_model(**params)
train(model)
score = evaluate(model)
return score
params_list = [...]
futures = [
client.submit(train_and_score, params) for params in params_list
]
scores = client.gather(futures)
best = max(scores)
best_params = params_list[scores.index(best)]
有关更完整的示例,请参阅 未来文档。
梯度提升树¶
流行的 GBT 库,如 XGBoost 和 LightGBM,具有原生的 Dask 支持,这使得你可以在非常大的数据集上并行训练模型。
例如,使用 Dask DataFrame、XGBoost 和本地 Dask 集群的代码如下所示:
import dask.dataframe as dd
import xgboost as xgb
from dask.distributed import LocalCluster
df = dask.datasets.timeseries() # Randomly generated data
# df = dd.read_parquet(...) # In practice, you would probably read data though
train, test = df.random_split([0.80, 0.20])
X_train, y_train, X_test, y_test = ...
with LocalCluster() as cluster:
with cluster.get_client() as client:
d_train = xgb.dask.DaskDMatrix(client, X_train, y_train, enable_categorical=True)
model = xgb.dask.train(
...
d_train,
)
predictions = xgb.dask.predict(client, model, X_test)
有关更完整的示例,请参阅此 XGBoost 示例。
批量预测¶
模型一旦训练完成,通常会希望将模型应用于大量数据。我们最常见到这种方式有两种:
使用 Dask Futures
我们将在下面展示每种方法的示例。
Dask 未来¶
Dask Futures 是一个通用API,允许你在Python数据上并行运行任意Python函数。将此工具应用于解决批量预测问题非常简单。
例如,当人们想要将模型应用于许多数据文件时,我们经常看到这种情况。
from dask.distributed import LocalCluster
cluster = LocalCluster(processes=False) # replace this with some scalable cluster
client = cluster.get_client()
filenames = [...]
def predict(filename, model):
data = load(filename)
result = model.predict(data)
return result
model = client.submit(load_model, path_to_model)
predictions = client.map(predict, filenames, model=model)
results = client.gather(predictions)
有关更全面的示例,请参见 计算机视觉工作负载的批量评分(视频)。
Dask DataFrame¶
有时我们想使用更高级别的 Dask API 来处理我们的模型,比如 Dask DataFrame 或 Dask Array。这在处理记录数据时更为常见,例如,如果我们有一组患者记录,并想查看哪些患者可能患病。
import dask.dataframe as dd
df = dd.read_parquet("/path/to/my/data.parquet")
model = load_model("/path/to/my/model")
# pandas code
# predictions = model.predict(df)
# predictions.to_parquet("/path/to/results.parquet")
# Dask code
predictions = df.map_partitions(model.predict)
predictions.to_parquet("/path/to/results.parquet")
更多信息请参见 Dask DataFrame 文档。