ray.data.DataIterator.to_tf#

DataIterator.to_tf(feature_columns: str | List[str], label_columns: str | List[str], *, additional_columns: str | None | List[str] = None, prefetch_batches: int = 1, batch_size: int = 1, drop_last: bool = False, local_shuffle_buffer_size: int | None = None, local_shuffle_seed: int | None = None, feature_type_spec: tf.TypeSpec | Dict[str, tf.TypeSpec] = None, label_type_spec: tf.TypeSpec | Dict[str, tf.TypeSpec] = None, additional_type_spec: tf.TypeSpec | None | Dict[str, tf.TypeSpec] = None) tf.data.Dataset[源代码]#

返回此数据集上的 TF 数据集。

警告

如果你的数据集包含不规则张量,此方法会报错。为防止错误,请 调整你的张量大小

示例

>>> import ray
>>> ds = ray.data.read_csv(
...     "s3://anonymous@air-example-data/iris.csv"
... )
>>> it = ds.iterator(); it
DataIterator(Dataset(
   num_rows=?,
   schema={
      sepal length (cm): double,
      sepal width (cm): double,
      petal length (cm): double,
      petal width (cm): double,
      target: int64
   }
))

如果你的模型接受单个张量作为输入,请指定单个特征列。

>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target")
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

如果你的模型接受字典作为输入,请指定一个特征列的列表。

>>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target")
<_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

如果你的数据集包含多个特征,但你的模型接受单个张量作为输入,请使用 Concatenator 来合并特征。

>>> from ray.data.preprocessors import Concatenator
>>> preprocessor = Concatenator(output_column_name="features", exclude="target")
>>> it = preprocessor.transform(ds).iterator()
>>> it
DataIterator(Concatenator
+- Dataset(
      num_rows=?,
      schema={
         sepal length (cm): double,
         sepal width (cm): double,
         petal length (cm): double,
         petal width (cm): double,
         target: int64
      }
   ))
>>> it.to_tf("features", "target")
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

如果你的模型接受不同类型、形状或名称的张量作为输入,请指定类型规范。如果未指定类型规范,它们会根据迭代器的模式自动推断。

>>> import tensorflow as tf
>>> it.to_tf(
...     feature_columns="features",
...     label_columns="target",
...     feature_type_spec=tf.TensorSpec(shape=(None, 4), dtype=tf.float32, name="features"),
...     label_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="label")
... )
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float32, name='features'), TensorSpec(shape=(None,), dtype=tf.float32, name='label'))>

如果你的模型除了特征和标签外还接受额外的元数据,请指定一个额外的列或一个额外列的列表。一个常见的用例是在数据样本中包含样本权重,并使用 tf.keras.Model.fit 训练 tf.keras.Model

>>> ds = ds.add_column("sample weights", lambda df: 1)
>>> it = ds.iterator()
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target", additional_columns="sample weights")
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>

如果你的模型接受不同类型、形状或名称的附加元数据,请指定附加列的类型规范。

>>> it.to_tf(
...     feature_columns="sepal length (cm)",
...     label_columns="target",
...     additional_columns="sample weights",
...     additional_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="weight")
... )
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.float32, name='weight'))>
参数:
  • feature_columns – 对应于模型输入的列。如果这是一个字符串,输入数据是一个张量。如果这是一个列表,输入数据是一个 dict ,它将列名映射到它们的张量表示。

  • label_columns – 对应于模型目标的列。如果这是一个字符串,目标数据是一个张量。如果这是一个列表,目标数据是一个 dict ,它将列名映射到它们的张量表示。

  • additional_columns – 对应于样本权重或其他元数据的列。如果这是一个字符串,权重数据是一个张量。如果这是一个列表,权重数据是一个 dict ,它将列名映射到它们的张量表示。

  • prefetch_batches – 要预取的批次数量,超过当前批次。如果设置为大于0,将使用一个单独的线程池来将对象获取到本地节点,格式化批次,并应用collate_fn。默认为1。

  • batch_size – 记录批次大小。默认为 1。

  • drop_last – 设置为 True 以丢弃最后一个不完整的批次,如果数据集大小不能被批次大小整除。如果为 False 且数据集大小不能被批次大小整除,则最后一个批次将会较小。默认为 False。

  • local_shuffle_buffer_size – 如果非空,数据将使用本地内存中的随机洗牌缓冲区进行随机洗牌,并且此值将作为本地内存中随机洗牌缓冲区中必须存在的最小行数,以便生成一个批次。当没有更多的行可以添加到缓冲区时,缓冲区中剩余的行将被排空。此缓冲区大小必须大于或等于 batch_size,因此在使用本地洗牌时也必须指定 batch_size

  • local_shuffle_seed – 用于本地随机洗牌的种子。

  • feature_type_specfeature_columnstf.TypeSpec。如果只有一列,指定一个 tf.TypeSpec。如果有多个列,指定一个 dict,将列名映射到它们的 tf.TypeSpec。默认是 None,自动推断每列的类型。

  • label_type_speclabel_columnstf.TypeSpec。如果只有一列,指定一个 tf.TypeSpec。如果有多个列,指定一个 dict,将列名映射到它们的 tf.TypeSpec。默认是 None,自动推断每一列的类型。

  • additional_type_specadditional_columnstf.TypeSpec。如果只有一列,指定一个 tf.TypeSpec。如果有多个列,指定一个 dict,将列名映射到它们的 tf.TypeSpec。默认是 None,自动推断每一列的类型。

返回:

一个产生输入和目标的 tf.data.Dataset

PublicAPI (测试版): 此API目前处于测试阶段,在成为稳定版本之前可能会发生变化。