高级: 读写自定义文件类型#
本指南展示了如何扩展 Ray Data 以读取和写入非原生支持的文件类型。这是一个高级指南,您将使用不稳定的内部 API。
图像已经通过 read_images()
和 write_images()
API 得到支持,但这个示例展示了如何为说明目的实现它们。
从文件读取数据#
小技巧
如果你不参与 Ray Data 的贡献,你不需要创建一个 Datasource
。相反,你可以调用 read_binary_files()
并使用 map()
解码文件。
读取文件的核心抽象是 FileBasedDatasource
。它在 Datasource
接口的基础上提供了文件特定的功能。
要子类化 FileBasedDatasource
,实现构造函数和 _read_stream
。
实现构造函数#
调用超类构造函数并指定您要读取的文件。可选地,指定有效的文件扩展名。Ray Data 忽略具有其他扩展名的文件。
from ray.data.datasource import FileBasedDatasource
class ImageDatasource(FileBasedDatasource):
def __init__(self, paths: Union[str, List[str]], *, mode: str):
super().__init__(
paths,
file_extensions=["png", "jpg", "jpeg", "bmp", "gif", "tiff"],
)
self.mode = mode # Specify read options in the constructor
实现 _read_stream
#
_read_stream
是一个生成器,它从文件中产生一个或多个数据块。
Blocks 是 Data 内部用于表示行集合的抽象。它们可以是 PyArrow 表、pandas DataFrame 或 NumPy 数组的字典。
不要直接创建块。相反,将数据行添加到 DelegatingBlockBuilder 中。
def _read_stream(self, f: "pyarrow.NativeFile", path: str) -> Iterator[Block]:
import io
import numpy as np
from PIL import Image
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
data = f.readall()
image = Image.open(io.BytesIO(data))
image = image.convert(self.mode)
# Each block contains one row
builder = DelegatingBlockBuilder()
array = np.array(image)
item = {"image": array}
builder.add(item)
yield builder.build()
读取你的数据#
一旦你实现了 ImageDatasource
,调用 read_datasource()
来将图像读入一个 Dataset
。Ray Data 会并行读取你的文件。
import ray
ds = ray.data.read_datasource(
ImageDatasource("s3://anonymous@ray-example-data/batoidea", mode="RGB")
)
将数据写入文件#
备注
写入接口正在积极开发中,未来可能会发生变化。如果您有功能请求,请 在 GitHub 上提交问题。
写入文件的核心抽象是 RowBasedFileDatasink
和 BlockBasedFileDatasink
。它们在 Datasink
接口的基础上提供了文件特定的功能。
如果你想为每个文件写入一行,请继承 RowBasedFileDatasink
。否则,请继承 BlockBasedFileDatasink
。
在这个例子中,你将为每个文件写入一张图片,因此你需要继承 RowBasedFileDatasink
。要继承 RowBasedFileDatasink
,请实现构造函数和 write_row_to_file()
。
实现构造函数#
调用超类构造函数并指定要写入的文件夹。可选地,指定一个表示文件格式的字符串(例如,"png"
)。Ray Data 使用文件格式作为文件扩展名。
from ray.data.datasource import RowBasedFileDatasink
class ImageDatasink(RowBasedFileDatasink):
def __init__(self, path: str, column: str, file_format: str):
super().__init__(path, file_format=file_format)
self.column = column
self.file_format = file_format # Specify write options in the constructor
实现 write_row_to_file
#
write_row_to_file
将一行数据写入文件。每一行是一个字典,将列名映射到值。
def write_row_to_file(self, row: Dict[str, Any], file: pyarrow.NativeFile):
import io
from PIL import Image
# PIL can't write to a NativeFile, so we have to write to a buffer first.
image = Image.fromarray(row[self.column])
buffer = io.BytesIO()
image.save(buffer, format=self.file_format)
file.write(buffer.getvalue())
写入你的数据#
一旦你实现了 ImageDatasink
,调用 write_datasink()
将图像写入文件。Ray Data 并行写入多个文件。
ds.write_datasink(ImageDatasink("/tmp/results", column="image", file_format="png"))