高级: 读写自定义文件类型#

本指南展示了如何扩展 Ray Data 以读取和写入非原生支持的文件类型。这是一个高级指南,您将使用不稳定的内部 API。

图像已经通过 read_images()write_images() API 得到支持,但这个示例展示了如何为说明目的实现它们。

从文件读取数据#

小技巧

如果你不参与 Ray Data 的贡献,你不需要创建一个 Datasource。相反,你可以调用 read_binary_files() 并使用 map() 解码文件。

读取文件的核心抽象是 FileBasedDatasource。它在 Datasource 接口的基础上提供了文件特定的功能。

要子类化 FileBasedDatasource,实现构造函数和 _read_stream

实现构造函数#

调用超类构造函数并指定您要读取的文件。可选地,指定有效的文件扩展名。Ray Data 忽略具有其他扩展名的文件。

from ray.data.datasource import FileBasedDatasource

class ImageDatasource(FileBasedDatasource):
    def __init__(self, paths: Union[str, List[str]], *, mode: str):
        super().__init__(
            paths,
            file_extensions=["png", "jpg", "jpeg", "bmp", "gif", "tiff"],
        )

        self.mode = mode  # Specify read options in the constructor

实现 _read_stream#

_read_stream 是一个生成器,它从文件中产生一个或多个数据块。

Blocks 是 Data 内部用于表示行集合的抽象。它们可以是 PyArrow 表、pandas DataFrame 或 NumPy 数组的字典。

不要直接创建块。相反,将数据行添加到 DelegatingBlockBuilder 中。

    def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
        import io
        import numpy as np
        from PIL import Image
        from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder

        data = f.readall()
        image = Image.open(io.BytesIO(data))
        image = image.convert(self.mode)

        # Each block contains one row
        builder = DelegatingBlockBuilder()
        array = np.array(image)
        item = {"image": array}
        builder.add(item)
        yield builder.build()

读取你的数据#

一旦你实现了 ImageDatasource,调用 read_datasource() 来将图像读入一个 Dataset。Ray Data 会并行读取你的文件。

import ray

ds = ray.data.read_datasource(
    ImageDatasource("s3://anonymous@ray-example-data/batoidea", mode="RGB")
)

将数据写入文件#

备注

写入接口正在积极开发中,未来可能会发生变化。如果您有功能请求,请 在 GitHub 上提交问题

写入文件的核心抽象是 RowBasedFileDatasinkBlockBasedFileDatasink。它们在 Datasink 接口的基础上提供了文件特定的功能。

如果你想为每个文件写入一行,请继承 RowBasedFileDatasink。否则,请继承 BlockBasedFileDatasink

在这个例子中,你将为每个文件写入一张图片,因此你需要继承 RowBasedFileDatasink。要继承 RowBasedFileDatasink,请实现构造函数和 write_row_to_file()

实现构造函数#

调用超类构造函数并指定要写入的文件夹。可选地,指定一个表示文件格式的字符串(例如,"png")。Ray Data 使用文件格式作为文件扩展名。

from ray.data.datasource import RowBasedFileDatasink

class ImageDatasink(RowBasedFileDatasink):
    def __init__(self, path: str, column: str, file_format: str):
        super().__init__(path, file_format=file_format)

        self.column = column
        self.file_format = file_format  # Specify write options in the constructor

实现 write_row_to_file#

write_row_to_file 将一行数据写入文件。每一行是一个字典,将列名映射到值。

    def write_row_to_file(self, row: Dict[str, Any], file: pyarrow.NativeFile):
        import io
        from PIL import Image

        # PIL can't write to a NativeFile, so we have to write to a buffer first.
        image = Image.fromarray(row[self.column])
        buffer = io.BytesIO()
        image.save(buffer, format=self.file_format)
        file.write(buffer.getvalue())

写入你的数据#

一旦你实现了 ImageDatasink ,调用 write_datasink() 将图像写入文件。Ray Data 并行写入多个文件。

ds.write_datasink(ImageDatasink("/tmp/results", column="image", file_format="png"))