机器学习

机器学习是一个涉及许多不同工作流程的广泛领域。本页列出了Dask可以帮助您处理ML工作负载的几种更常见的方式。

  • 超参数优化

  • 提升树

  • 批量预测

超参数优化

Optuna

对于最先进的超参数优化(HPO),我们推荐使用 Optuna 库,以及相关的 Dask 集成

在 Optuna 中,您构建一个目标函数,该函数接受一个试验对象,该对象从您在代码中定义的分布生成参数。您的目标函数最终会产生一个分数。Optuna 会根据它收到的分数智能地建议分布中的值。

def objective(trial):
    params = {
        "max_depth": trial.suggest_int("max_depth", 2, 10, step=1),
        "learning_rate": trial.suggest_float("learning_rate", 1e-8, 1.0, log=True),
        ...
    }
    model = train_model(train_data, **params)
    result = score(model, test_data)
    return result

Dask 和 Optuna 经常一起使用,通过在 Dask 调度器上并行运行许多目标函数并同步分数和参数选择。为此,我们使用 Optuna 中的 DaskStore 对象。

import optuna

storage = optuna.integration.DaskStorage()

study = optuna.create_study(
    direction="maximize",
    storage=storage,  # This makes the study Dask-enabled
)

然后我们并行运行许多优化方法。

from dask.distributed import LocalCluster, wait

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

futures = [
    client.submit(study.optimize, objective, n_trials=1, pure=False) for _ in range(500)
]
wait(futures)

print(study.best_params)

有关更完整的示例,请参见此 Optuna + XGBoost 示例

Dask 未来

此外,在更简单的情况下,人们通常使用 Dask Futures 来训练具有大量参数的相同模型。Dask Futures 是一个通用 API,用于在各种输入上运行普通 Python 函数。示例如下:

from dask.distributed import LocalCluster

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

def train_and_score(params: dict) -> float:
    data = load_data()
    model = make_model(**params)
    train(model)
    score = evaluate(model)
    return score

params_list = [...]
futures = [
    client.submit(train_and_score, params) for params in params_list
]
scores = client.gather(futures)
best = max(scores)

best_params = params_list[scores.index(best)]

有关更完整的示例,请参阅 未来文档

梯度提升树

流行的 GBT 库,如 XGBoost 和 LightGBM,具有原生的 Dask 支持,这使得你可以在非常大的数据集上并行训练模型。

  • XGBoost 是一个强大的梯度提升框架。

  • LightGBM 是一个强大的梯度提升框架,支持并行学习。

例如,使用 Dask DataFrame、XGBoost 和本地 Dask 集群的代码如下所示:

import dask.dataframe as dd
import xgboost as xgb
from dask.distributed import LocalCluster

df = dask.datasets.timeseries()  # Randomly generated data
# df = dd.read_parquet(...)      # In practice, you would probably read data though

train, test = df.random_split([0.80, 0.20])
X_train, y_train, X_test, y_test = ...

with LocalCluster() as cluster:
    with cluster.get_client() as client:
        d_train = xgb.dask.DaskDMatrix(client, X_train, y_train, enable_categorical=True)
        model = xgb.dask.train(
            ...
            d_train,
        )
        predictions = xgb.dask.predict(client, model, X_test)

有关更完整的示例,请参阅此 XGBoost 示例

批量预测

模型一旦训练完成,通常会希望将模型应用于大量数据。我们最常见到这种方式有两种:

  1. 使用 Dask Futures

  2. 使用 DataFrame.map_partitionsArray.map_blocks

我们将在下面展示每种方法的示例。

Dask 未来

Dask Futures 是一个通用API,允许你在Python数据上并行运行任意Python函数。将此工具应用于解决批量预测问题非常简单。

例如,当人们想要将模型应用于许多数据文件时,我们经常看到这种情况。

from dask.distributed import LocalCluster

cluster = LocalCluster(processes=False)  # replace this with some scalable cluster
client = cluster.get_client()

filenames = [...]

def predict(filename, model):
    data = load(filename)
    result = model.predict(data)
    return result

model = client.submit(load_model, path_to_model)
predictions = client.map(predict, filenames, model=model)
results = client.gather(predictions)

有关更全面的示例,请参见 计算机视觉工作负载的批量评分(视频)

Dask DataFrame

有时我们想使用更高级别的 Dask API 来处理我们的模型,比如 Dask DataFrame 或 Dask Array。这在处理记录数据时更为常见,例如,如果我们有一组患者记录,并想查看哪些患者可能患病。

import dask.dataframe as dd

df = dd.read_parquet("/path/to/my/data.parquet")

model = load_model("/path/to/my/model")

# pandas code
# predictions = model.predict(df)
# predictions.to_parquet("/path/to/results.parquet")

# Dask code
predictions = df.map_partitions(model.predict)
predictions.to_parquet("/path/to/results.parquet")

更多信息请参见 Dask DataFrame 文档