ray.data.读取_冰山#

ray.data.read_iceberg(*, table_identifier: str, row_filter: str | BooleanExpression = None, parallelism: int = -1, selected_fields: Tuple[str, ...] = ('*',), snapshot_id: int | None = None, scan_kwargs: Dict[str, str] | None = None, catalog_kwargs: Dict[str, str] | None = None, ray_remote_args: Dict[str, Any] | None = None, override_num_blocks: int | None = None) Dataset[源代码]#

从 Iceberg 表创建一个 Dataset

要读取的表是通过一个完全限定的 table_identifier 指定的。使用 PyIceberg,任何预期的行过滤器、特定字段的选择以及特定快照 ID 的选择都会被应用,并且满足查询的文件会分布在 Ray 读取任务中。输出块的数量由 override_num_blocks 决定,可以从这个接口请求,或者如果未指定则自动选择。

小技巧

有关 PyIceberg 的更多详情,请参阅 - URI: https://py.iceberg.apache.org/

示例

>>> import ray
>>> from pyiceberg.expressions import EqualTo  
>>> ds = ray.data.read_iceberg( 
...     table_identifier="db_name.table_name",
...     row_filter=EqualTo("column_name", "literal_value"),
...     catalog_kwargs={"name": "default", "type": "glue"}
... )
参数:
  • table_identifier – 完全限定的表标识符 (db_name.table_name)

  • row_filter – 一个 PyIceberg BooleanExpression 用于在读取数据 之前 过滤数据

  • parallelism – 此参数已弃用。请使用 override_num_blocks 参数。

  • selected_fields – 要从数据中读取哪些列,直接传递给 PyIceberg 的加载函数。应为字符串列名的元组。

  • snapshot_id – Iceberg 表的可选快照 ID,默认使用最新快照

  • scan_kwargs – 传递给 PyIceberg 的 Table.scan() 函数的可选参数(例如,case_sensitive、limit 等)

  • catalog_kwargs – 传递给 PyIceberg 的 catalog.load_catalog() 函数的可选参数(例如,名称、类型等)。有关函数定义,请参见 pyiceberg 目录

  • ray_remote_args – 传递给 ray.remote 的可选参数,用于读取任务

  • override_num_blocks – 覆盖所有读取任务的输出块数量。默认情况下,输出块的数量是根据输入数据大小和可用资源动态决定的,并且上限为要读取的物理文件数量。在大多数情况下,您不应手动设置此值。

返回:

Dataset 包含来自 Iceberg 表的行。