import numpy as np
import time
import torch
import ray
from ray import train, tune
from ray.train.horovod import HorovodTrainer
from ray.train import ScalingConfig
from ray.tune.tune_config import TuneConfig
from ray.tune.tuner import Tuner
def sq(x):
m2 = 1.0
m1 = -20.0
m0 = 50.0
return m2 * x * x + m1 * x + m0
def qu(x):
m3 = 10.0
m2 = 5.0
m1 = -20.0
m0 = -5.0
return m3 * x * x * x + m2 * x * x + m1 * x + m0
class Net(torch.nn.Module):
def __init__(self, mode="sq"):
super(Net, self).__init__()
if mode == "square":
self.mode = 0
self.param = torch.nn.Parameter(torch.FloatTensor([1.0, -1.0]))
else:
self.mode = 1
self.param = torch.nn.Parameter(torch.FloatTensor([1.0, -1.0, 1.0]))
def forward(self, x):
if ~self.mode:
return x * x + self.param[0] * x + self.param[1]
else:
return_val = 10 * x * x * x
return_val += self.param[0] * x * x
return_val += self.param[1] * x + self.param[2]
return return_val
def train_loop_per_worker(config):
import torch
import horovod.torch as hvd
hvd.init()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mode = config["mode"]
net = Net(mode).to(device)
optimizer = torch.optim.SGD(
net.parameters(),
lr=config["lr"],
)
optimizer = hvd.DistributedOptimizer(optimizer)
num_steps = 5
print(hvd.size())
np.random.seed(1 + hvd.rank())
torch.manual_seed(1234)
# 为了确保在各个工作线程中实现一致的初始化,
hvd.broadcast_parameters(net.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
start = time.time()
x_max = config["x_max"]
for step in range(1, num_steps + 1):
features = torch.Tensor(np.random.rand(1) * 2 * x_max - x_max).to(device)
if mode == "square":
labels = sq(features)
else:
labels = qu(features)
optimizer.zero_grad()
outputs = net(features)
loss = torch.nn.MSELoss()(outputs, labels)
loss.backward()
optimizer.step()
time.sleep(0.1)
train.report(dict(loss=loss.item()))
total = time.time() - start
print(f"Took {total:0.3f} s. Avg: {total / num_steps:0.3f} s.")
def tune_horovod(num_workers, num_samples, use_gpu, mode="square", x_max=1.0):
horovod_trainer = HorovodTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=ScalingConfig(
trainer_resources={"CPU": 0}, num_workers=num_workers, use_gpu=use_gpu
),
train_loop_config={"mode": mode, "x_max": x_max},
)
tuner = Tuner(
horovod_trainer,
param_space={"train_loop_config": {"lr": tune.uniform(0.1, 1)}},
tune_config=TuneConfig(mode="min", metric="loss", num_samples=num_samples),
)
result_grid = tuner.fit()
print("Best hyperparameters found were: ", result_grid.get_best_result().config)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--mode", type=str, default="square", choices=["square", "cubic"]
)
parser.add_argument(
"--learning_rate", type=float, default=0.1, dest="learning_rate"
)
parser.add_argument("--x_max", type=float, default=1.0, dest="x_max")
parser.add_argument("--gpu", action="store_true")
parser.add_argument(
"--smoke-test", action="store_true", help=("Finish quickly for testing.")
)
parser.add_argument("--num-workers", type=int, default=2)
args, _ = parser.parse_known_args()
if args.smoke_test:
# 使用2个样本、2个工作线程和每个工作线程1个CPU进行冒烟测试
# (并为训练器分配0个CPU)
ray.init(num_cpus=4)
tune_horovod(
num_workers=args.num_workers,
num_samples=2 if args.smoke_test else 10,
use_gpu=args.gpu,
mode=args.mode,
x_max=args.x_max,
)