使用 Ray Core 进行批量预测#

备注

要获取有关大数据集的批量推断的更高级 API,请参见 使用 Ray Data 进行批量推断。此示例适用于希望对数据分片和执行有更多控制的用户。

批量预测是使用训练好的模型为一组观察结果生成预测的过程。它包含以下元素:

  • 输入数据集:这是一个用于生成预测的观察结果集合。数据通常存储在外部存储系统中,如 S3、HDFS 或数据库,并且可能很大。

  • 机器学习模型:这是一个训练好的机器学习模型,通常也存储在外部存储系统中。

  • 预测:这些是将机器学习模型应用于观察结果时的输出。预测通常会写回到存储系统中。

使用 Ray,您可以为大数据集构建可扩展的批量预测,并提供高预测吞吐量。Ray Data 提供了一个 用于离线批量推断的更高级 API,并具有内置优化。不过,为了获得更多控制,您可以使用更低级的 Ray Core API。此示例通过将数据集拆分为不相交的分片,并使用 Ray 任务或 Ray Actor 在 Ray 集群中并行执行,演示了 Ray Core 的批量推断。

基于任务的批量预测#

使用Ray任务,您可以以以下方式构建批量预测程序:

  1. 加载模型

  2. 启动Ray任务,每个任务接收模型和输入数据集的一个分片

  3. 每个工作节点在分配的分片上执行预测,并输出结果

让我们以2009年的纽约出租车数据为例。假设我们有这样一个简单的模型:

import pandas as pd
import numpy as np

def load_model():
    # 一个虚拟模型。
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        # 虚拟负载,以便复制模型时实际复制一些数据
        # 跨节点。
        model.payload = np.zeros(100_000_000)
        return pd.DataFrame({"score": batch["passenger_count"] % 2 == 0})
    
    return model

数据集有12个文件(每个月一个),因此我们可以自然地让每个Ray任务处理一个文件。通过获取模型和一个输入数据集的片段(即一个单独的文件),我们可以定义一个用于预测的Ray远程任务:

import pyarrow.parquet as pq
import ray

@ray.remote
def make_prediction(model, shard_path):
    df = pq.read_table(shard_path).to_pandas()
    result = model(df)

    # 写出预测结果。
    # 注意:除非驱动程序需要进一步处理,
    # 结果(不仅仅是简单地写入存储系统),
    # 建议采用远程任务执行方式,因为这样可以避免
    # 使驾驶员感到拥挤或负担过重。
    # ...

    # 在这里,我们仅返回此示例中结果的大小。
    return len(result)

驱动程序为整个输入数据集启动所有任务。

# 12个文件,每个文件对应一个远程任务。
input_files = [
        f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
        f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
        for i in range(12)
]

# 只需将模型一次放入本地对象存储中,然后传递
# 参考远程任务。
model = load_model()
model_ref = ray.put(model)

result_refs = []

# 启动所有预测任务。
for file in input_files:
    # 通过传递模型引用和分片文件来启动预测任务。
    # 注意:如果你传递的是模型本身,那将是非常低效的。
    # 类似于make_prediction.remote(model, file),为了传递模型
    # 对于远程节点,每个任务可能会调用ray.put(model),这可能会导致过载。
    # 本地对象存储导致磁盘空间不足错误。
    result_refs.append(make_prediction.remote(model_ref, file))

results = ray.get(result_refs)

# 我们来检查预测输出的大小。
for r in results:
    print("Prediction output size:", r)

为了不使集群过载并导致OOM,我们可以通过为任务设置适当的资源需求来控制并行度,关于这种设计模式的详细信息请参见 模式:使用资源来限制并发运行的任务数量。例如,如果您能够轻松地估算从外部存储加载的数据在内存中的大小,您可以通过指定每个任务所需的内存量来控制并行度,例如,使用make_prediction.options(memory=100*1023*1025).remote(model_ref, file)启动任务。Ray将自动处理,确保调度到某个节点的任务不会超过该节点的总内存。

小技巧

为了避免重复将相同的模型存储到对象存储中(这可能导致驱动节点的磁盘空间不足),使用 ray.put() 将模型存储一次,然后将引用传递到其他地方。

小技巧

为了避免驱动节点的拥堵或过载,最好让每个任务输出预测结果(而不是将结果返回给驱动节点,因为这样实际上只是将其写入存储系统)。

基于演员的批量预测#

在上述解决方案中,每个 Ray 任务在开始执行预测之前都必须从驱动节点获取模型。如果模型的大小很大,这将是一个显著的开销成本。我们可以通过使用 Ray 演员来优化这一点,演职人员只需获取一次模型,并将其重用于分配给该演员的所有任务。

首先,我们定义一个可调用类,该类具有一个接口(即构造函数)用于加载/缓存模型,另一个接口用于接收文件并执行预测。

import pandas as pd
import pyarrow.parquet as pq
import ray

@ray.remote
class BatchPredictor:
    def __init__(self, model):
        self.model = model
        
    def predict(self, shard_path):
        df = pq.read_table(shard_path).to_pandas()
        result =self.model(df)

        # 写出预测结果。
        # 注意:除非驱动程序需要进一步处理,
        # 结果(不仅仅是简单地写入存储系统),
        # 建议使用远程任务输出,因为它可以避免
        # 使驾驶员感到拥挤或负担过重。
        # ...

        # 在这里,我们仅返回此示例中结果的大小。
        return len(result)

构造函数每个演员工作者只调用一次。我们使用 ActorPool 来管理一组可以接收预测请求的演员。

from ray.util.actor_pool import ActorPool

model = load_model()
model_ref = ray.put(model)
num_actors = 4
actors = [BatchPredictor.remote(model_ref) for _ in range(num_actors)]
pool = ActorPool(actors)
input_files = [
        f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
        f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
        for i in range(12)
]
for file in input_files:
    pool.submit(lambda a, v: a.predict.remote(v), file)
while pool.has_next():
    print("Prediction output size:", pool.get_next())

请注意,ActorPool 的大小是固定的,这与基于任务的方法不同,后者的并行任务数量可以是动态的(只要不超过 max_in_flight_tasks)。要实现自动缩放的 actor 池,您需要使用 Ray 数据批量预测

使用GPU进行批量预测#

如果您的集群有GPU节点,并且您的预测器可以利用这些GPU,您可以通过指定num_gpus将任务或演员定向到那些GPU节点。Ray将按照相应的方式将它们调度到GPU节点上。在节点上,您需要将模型移动到GPU。以下是Torch模型的一个示例。

import torch

@ray.remote(num_gpus=1)
def make_torch_prediction(model: torch.nn.Module, shard_path):
    # 将模型移动到GPU。
    model.to(torch.device("cuda"))
    inputs = pq.read_table(shard_path).to_pandas().to_numpy()

    results = []
    # 对于输入中的每个张量:
    # 结果.append(模型(张量))
    #
    # 直接在任务中写出结果,而不是返回
    # 除非必须,否则不要连接到驱动节点,以避免拥堵/过载。
    # 驱动节点。
    # ...

    # 这里我们只返回简单的/轻量级的元信息。
    return len(results)

常见问题解答#

如何在Ray集群中高效加载和传递大型模型?#

推荐的方式是(以基于任务的批量预测为例,基于演员的方式相同):

  1. 让驱动程序加载模型(例如,从存储系统中)。

  2. 让驱动程序使用 ray.put(model) 将模型存储到对象存储中;并且

  3. 在启动远程任务时将模型的相同引用传递给每个远程任务。 远程任务将在开始执行预测之前,从驱动程序的对象存储中获取模型到其本地对象存储。

请注意,如果您跳过步骤2,直接将模型(而不是引用)传递给远程任务,这样做是非常低效的。如果模型很大且任务很多,可能会导致驱动节点的磁盘崩溃。

# GOOD: the model will be stored to driver's object store only once
model = load_model()
model_ref = ray.put(model)
for file in input_files:
    make_prediction.remote(model_ref, file)

# BAD: the same model will be stored to driver's object store repeatedly for each task
model = load_model()
for file in input_files:
    make_prediction.remote(model, file)

有关更多详细信息,请查看 反模式:重复按值传递相同的大参数会损害性能

如何提高GPU利用率?#

为了保持GPU的繁忙状态,可以关注以下几点:

  • 在同一GPU节点上调度多个任务(如果它有多个GPU):如果同一节点上有多个GPU,而单个任务无法使用所有GPU,则可以将多个任务指派给该节点。这由Ray自动处理,例如,如果您指定num_gpus=1而该节点上有4个GPU,Ray将向该节点调度4个任务,前提是有足够的任务且没有其他资源限制。

  • 使用基于演员的方法:如上所述,基于演员的方法更有效,因为它为多个任务重用模型初始化,因此节点将花费更多时间在实际工作负载上。