在 Ray 上使用 Spark (RayDP)#

RayDP 结合了你的 Spark 和 Ray 集群,使得使用 PySpark API 进行大规模数据处理变得容易,并且可以无缝地使用这些数据来训练你的 TensorFlow 和 PyTorch 模型。

更多信息和示例,请参阅 RayDP Github 页面:oap-project/raydp

安装 RayDP#

RayDP 可以从 PyPI 安装,并支持 PySpark 3.0 和 3.1。

备注

RayDP 需要 ray >= 1.2.0

备注

为了运行 Spark,头节点和工作节点需要安装 Java。

创建一个 Spark 会话#

要创建一个 Spark 会话,调用 raydp.init_spark

例如,

import ray
import raydp

ray.init()
spark = raydp.init_spark(
  app_name = "example",
  num_executors = 10,
  executor_cores = 64,
  executor_memory = "256GB"
)

使用 Spark DataFrame 进行深度学习#

使用 TensorFlow 训练 Spark DataFrame#

raydp.tf.TFEstimator 提供了一个用于使用 TensorFlow 进行训练的 API。

from pyspark.sql.functions import col
df = spark.range(1, 1000)
# calculate z = x + 2y + 1000
df = df.withColumn("x", col("id")*2)\
  .withColumn("y", col("id") + 200)\
  .withColumn("z", col("x") + 2*col("y") + 1000)

from raydp.utils import random_split
train_df, test_df = random_split(df, [0.7, 0.3])

# TensorFlow code
from tensorflow import keras
input_1 = keras.Input(shape=(1,))
input_2 = keras.Input(shape=(1,))

concatenated = keras.layers.concatenate([input_1, input_2])
output = keras.layers.Dense(1, activation='sigmoid')(concatenated)
model = keras.Model(inputs=[input_1, input_2],
                    outputs=output)

optimizer = keras.optimizers.Adam(0.01)
loss = keras.losses.MeanSquaredError()

from raydp.tf import TFEstimator
estimator = TFEstimator(
  num_workers=2,
  model=model,
  optimizer=optimizer,
  loss=loss,
  metrics=["accuracy", "mse"],
  feature_columns=["x", "y"],
  label_column="z",
  batch_size=1000,
  num_epochs=2,
  use_gpu=False,
  config={"fit_config": {"steps_per_epoch": 2}})

estimator.fit_on_spark(train_df, test_df)

tensorflow_model = estimator.get_model()

estimator.shutdown()

使用 PyTorch 训练 Spark DataFrame#

同样地,raydp.torch.TorchEstimator 提供了一个用于使用 PyTorch 进行训练的 API。

from pyspark.sql.functions import col
df = spark.range(1, 1000)
# calculate z = x + 2y + 1000
df = df.withColumn("x", col("id")*2)\
  .withColumn("y", col("id") + 200)\
  .withColumn("z", col("x") + 2*col("y") + 1000)

from raydp.utils import random_split
train_df, test_df = random_split(df, [0.7, 0.3])

# PyTorch Code
import torch
class LinearModel(torch.nn.Module):
    def __init__(self):
        super(LinearModel, self).__init__()
        self.linear = torch.nn.Linear(2, 1)

    def forward(self, x, y):
        x = torch.cat([x, y], dim=1)
        return self.linear(x)

model = LinearModel()
optimizer = torch.optim.Adam(model.parameters())
loss_fn = torch.nn.MSELoss()

def lr_scheduler_creator(optimizer, config):
    return torch.optim.lr_scheduler.MultiStepLR(
      optimizer, milestones=[150, 250, 350], gamma=0.1)

# You can use the RayDP Estimator API or libraries like Ray Train for distributed training.
from raydp.torch import TorchEstimator
estimator = TorchEstimator(
  num_workers = 2,
  model = model,
  optimizer = optimizer,
  loss = loss_fn,
  lr_scheduler_creator=lr_scheduler_creator,
  feature_columns = ["x", "y"],
  label_column = ["z"],
  batch_size = 1000,
  num_epochs = 2
)

estimator.fit_on_spark(train_df, test_df)

pytorch_model = estimator.get_model()

estimator.shutdown()