保存数据#

Ray Data 允许你将数据保存到文件或其他 Python 对象中。

本指南向您展示如何:

将数据写入文件#

Ray Data 写入本地磁盘和云存储。

将数据写入本地磁盘#

要将您的 Dataset 保存到本地磁盘,请调用类似 Dataset.write_parquet 的方法,并使用 local:// 方案指定一个本地目录。

警告

如果你的集群包含多个节点并且你没有使用 local://,Ray Data 会将数据的不同分区写入不同的节点。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

ds.write_parquet("local:///tmp/iris/")

要写入除 Parquet 以外的格式,请阅读 输入/输出参考

将数据写入云存储#

要将您的 Dataset 保存到云存储,请使用您的云服务提供商对所有节点进行身份验证。然后,调用类似 Dataset.write_parquet 的方法,并指定一个带有适当方案的URI。URI可以指向存储桶或文件夹。

要写入除 Parquet 以外的格式,请阅读 输入/输出参考

要将数据保存到 Amazon S3,请指定一个带有 s3:// 方案的 URI。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

ds.write_parquet("s3://my-bucket/my-folder")

Ray Data 依赖 PyArrow 进行与 Amazon S3 的认证。有关如何配置您的凭证以兼容 PyArrow 的更多信息,请参阅他们的 S3 文件系统文档

要保存数据到 Google Cloud Storage,请安装 Google Cloud Storage 的文件系统接口

pip install gcsfs

然后,创建一个 GCSFileSystem 并指定一个带有 gcs:// 方案的URI。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

filesystem = gcsfs.GCSFileSystem(project="my-google-project")
ds.write_parquet("gcs://my-bucket/my-folder", filesystem=filesystem)

Ray Data 依赖 PyArrow 进行与 Google Cloud Storage 的认证。有关如何配置您的凭证以兼容 PyArrow 的更多信息,请参阅他们的 GCS 文件系统文档

要将数据保存到 Azure Blob 存储,请安装 Azure-Datalake Gen1 和 Gen2 存储的文件系统接口

pip install adlfs

然后,创建一个 AzureBlobFileSystem 并指定一个带有 az:// 方案的URI。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

filesystem = adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
ds.write_parquet("az://my-bucket/my-folder", filesystem=filesystem)

Ray Data 依赖 PyArrow 进行 Azure Blob 存储的认证。有关如何配置您的凭证以兼容 PyArrow 的更多信息,请参阅他们的 fsspec 兼容文件系统文档

将数据写入NFS#

要将您的 Dataset 保存到 NFS 文件系统,请调用类似 Dataset.write_parquet 的方法,并指定一个挂载目录。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

ds.write_parquet("/mnt/cluster_storage/iris")

要写入除 Parquet 以外的格式,请阅读 输入/输出参考

更改输出文件的数量#

当你调用写入方法时,Ray Data 会将你的数据写入多个文件。要控制输出文件的数量,请配置 num_rows_per_file

备注

num_rows_per_file 是一个提示,不是一个严格的限制。Ray Data 可能会向每个文件写入或多或少的行。

import os
import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.write_csv("/tmp/few_files/", num_rows_per_file=75)

print(os.listdir("/tmp/few_files/"))
['0_000001_000000.csv', '0_000000_000000.csv', '0_000002_000000.csv']

将数据集转换为其他 Python 库#

将数据集转换为 pandas#

要将 Dataset 转换为 pandas DataFrame,请调用 Dataset.to_pandas()。您的数据必须适合头节点上的内存。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_pandas()
print(df)
     sepal length (cm)  sepal width (cm)  ...  petal width (cm)  target
0                  5.1               3.5  ...               0.2       0
1                  4.9               3.0  ...               0.2       0
2                  4.7               3.2  ...               0.2       0
3                  4.6               3.1  ...               0.2       0
4                  5.0               3.6  ...               0.2       0
..                 ...               ...  ...               ...     ...
145                6.7               3.0  ...               2.3       2
146                6.3               2.5  ...               1.9       2
147                6.5               3.0  ...               2.0       2
148                6.2               3.4  ...               2.3       2
149                5.9               3.0  ...               1.8       2
<BLANKLINE>
[150 rows x 5 columns]

将数据集转换为分布式DataFrame#

Ray Data 与分布式数据处理框架如 DaskSparkModinMars 相互操作。

要将 Dataset 转换为 Dask DataFrame,请调用 Dataset.to_dask()

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_dask()

要将 Dataset 转换为 Spark DataFrame,请调用 Dataset.to_spark()

import ray
import raydp

spark = raydp.init_spark(
    app_name = "example",
    num_executors = 1,
    executor_cores = 4,
    executor_memory = "512M"
)

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
df = ds.to_spark(spark)

要将 Dataset 转换为 Modin DataFrame,请调用 Dataset.to_modin()

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

mdf = ds.to_modin()

要将 Dataset 从 Mars DataFrame 转换,请调用 Dataset.to_mars()

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

mdf = ds.to_mars()