简单的并行模型选择#

小技巧

对于生产级的分布式超参数调优实现,使用 Ray Tune,这是一个使用Ray的Actor API构建的可扩展超参数调优库。

在此示例中,我们将演示如何快速编写一个超参数调优脚本,该脚本并行评估一组超参数。

该脚本将演示如何使用Ray API的两个重要部分:使用ray.remote定义远程函数,以及使用ray.wait等待它们的结果准备就绪。

../../_images/hyperparameter.png

设置:依赖项#

首先,导入一些依赖项并定义生成随机超参数和检索数据的函数。

import os
import numpy as np
from filelock import FileLock

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

import ray

ray.init()

# 尝试的随机超参数集的数量。
num_evaluations = 10


# 生成随机超参数的函数。
def generate_hyperparameters():
    return {
        "learning_rate": 10 ** np.random.uniform(-5, 1),
        "batch_size": np.random.randint(1, 100),
        "momentum": np.random.uniform(0, 1),
    }


def get_data_loaders(batch_size):
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    # 我们在这里添加了文件锁,因为多个工作进程会希望
    # 下载数据,这可能会导致覆盖,因为
    # DataLoader 不是线程安全的。
    with FileLock(os.path.expanduser("~/data.lock")):
        train_loader = torch.utils.data.DataLoader(
            datasets.MNIST(
                "~/data", train=True, download=True, transform=mnist_transforms
            ),
            batch_size=batch_size,
            shuffle=True,
        )
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST("~/data", train=False, transform=mnist_transforms),
        batch_size=batch_size,
        shuffle=True,
    )
    return train_loader, test_loader

配置:定义神经网络#

我们定义了一个小型神经网络用于训练。此外,我们还创建了用于训练和测试该神经网络的方法。

class ConvNet(nn.Module):
    """简单的双层卷积神经网络。"""

    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)


def train(model, optimizer, train_loader, device=torch.device("cpu")):
    """通过一次数据遍历优化模型。

    截取前1024个样本以简化训练过程。
    """
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        if batch_idx * len(data) > 1024:
            return
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()


def test(model, test_loader, device=torch.device("cpu")):
    """检查模型的验证准确性。

    为了简化,仅截取前512个样本进行验证。
    """
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            if batch_idx * len(data) > 512:
                break
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    return correct / total

评估超参数#

对于给定的配置,之前创建的神经网络将进行训练并返回模型的准确性。这些训练过的网络将被测试其准确性,以找到最佳的超参数组合。

@ray.remote 装饰器定义了一个远程进程。

@ray.remote
def evaluate_hyperparameters(config):
    model = ConvNet()
    train_loader, test_loader = get_data_loaders(config["batch_size"])
    optimizer = optim.SGD(
        model.parameters(), lr=config["learning_rate"], momentum=config["momentum"]
    )
    train(model, optimizer, train_loader)
    return test(model, test_loader)

随机生成超参数的同步评估#

我们将为我们的神经网络创建多个随机超参数集,并在并行中进行评估。

# 记录最佳超参数和最佳准确率。
best_hyperparameters = None
best_accuracy = 0
# 一个包含我们所有实验对象引用的列表
# 已启动但尚未处理。
remaining_ids = []
# 一个将实验对象引用映射到其超参数的字典。
# 用于该实验的超参数。
hyperparameters_mapping = {}

启动异步并行任务以评估不同的超参数。accuracy_id 是一个 ObjectRef,作为远程任务的句柄。它在任务完成后用于获取任务的结果。

# 随机生成一组超参数并启动任务对其进行评估。
for i in range(num_evaluations):
    hyperparameters = generate_hyperparameters()
    accuracy_id = evaluate_hyperparameters.remote(hyperparameters)
    remaining_ids.append(accuracy_id)
    hyperparameters_mapping[accuracy_id] = hyperparameters

处理每个超参数及其对应的准确率,按照完成的顺序存储具有最佳准确率的超参数。

# 按任务完成的顺序获取并打印结果。
while remaining_ids:
    # 使用ray.wait获取首个完成任务的对象引用。
    done_ids, remaining_ids = ray.wait(remaining_ids)
    # 默认情况下只有一个返回结果。
    result_id = done_ids[0]

    hyperparameters = hyperparameters_mapping[result_id]
    accuracy = ray.get(result_id)
    print(
        """We achieve accuracy {:.3}% with
        learning_rate: {:.2}
        batch_size: {}
        momentum: {:.2}
      """.format(
            100 * accuracy,
            hyperparameters["learning_rate"],
            hyperparameters["batch_size"],
            hyperparameters["momentum"],
        )
    )
    if accuracy > best_accuracy:
        best_hyperparameters = hyperparameters
        best_accuracy = accuracy

# 记录表现最佳的超参数组合。
print(
    """Best accuracy over {} trials was {:.3} with
      learning_rate: {:.2}
      batch_size: {}
      momentum: {:.2}
      """.format(
        num_evaluations,
        100 * best_accuracy,
        best_hyperparameters["learning_rate"],
        best_hyperparameters["batch_size"],
        best_hyperparameters["momentum"],
    )
)