使用 Dask 进行 Hive 分区
内容
使用 Dask 进行 Hive 分区¶
有时使用类似Hive的目录方案来编写数据集是有用的。例如,如果你的数据框包含 'year'
和 'semester'
列,基于Hive的目录结构可能看起来像下面这样:
output-path/
├── year=2022/
│ ├── semester=fall/
│ │ └── part.0.parquet
│ └── semester=spring/
│ ├── part.0.parquet
│ └── part.1.parquet
└── year=2023/
└── semester=fall/
└── part.1.parquet
使用这种自描述结构意味着 'output-path/year=2022/semester=fall/'
目录中的所有行将在 'year'
列中包含值 2022
,在 'semester'
列中包含值 'fall'
。
生成一个hive分区的数据集的主要优势是,某些IO过滤器可以通过 read_parquet()
应用,而不需要解析任何文件元数据。换句话说,当数据集已经在 'year'
列上进行了hive分区时,以下命令通常会更快。
>>> dd.read_parquet("output-path", filters=[("year", ">", 2022)])
使用Hive分区写入Parquet数据¶
Dask 的 to_parquet()
函数在使用 partition_on
选项时会自动生成一个 hive 分区的目录结构。
>>> df.to_parquet("output-path", partition_on=["year", "semester"])
>>> os.listdir("output-path")
["year=2022", "year=2023"]
>>> os.listdir("output-path/year=2022")
["semester=fall", "semester=spring"]
>>> os.listdir("output-path/year=2022/semester=spring")
['part.0.parquet', 'part.1.parquet']
重要的是要认识到,Dask 不会 聚合每个叶目录中写入的数据文件。这是因为每个DataFrame分区在执行 to_parquet()
任务图期间是独立写入的。为了写出分区 i 的数据,分区-i写入任务将对列 ["year", "semester"]
执行 groupby 操作,然后每个不同的组将使用文件名 'part.{i}.parquet'
写入相应的目录。因此,hive分区写入可能会在每个叶目录中生成大量文件(每个DataFrame分区一个文件)。
如果你的应用程序要求你为每个Hive分区生成一个单独的parquet文件,一种可能的解决方案是在调用 to_parquet()
之前对分区列进行排序或洗牌。
>>> partition_on = ["year", "semester"]
>>> df.shuffle(on=partition_on).to_parquet(partition_on=partition_on)
使用这种全局洗牌操作极其昂贵,应尽可能避免。然而,它也能保证生成最少数量的文件,这在某些情况下可能是值得的牺牲。
使用Hive分区读取Parquet数据¶
在大多数情况下,read_parquet()
会自动处理 hive 分区的数据。默认情况下,所有 hive 分区的列将被解释为分类列。
>>> ddf = dd.read_parquet("output-path", columns=["year", "semester"])
>>> ddf
Dask DataFrame Structure:
year semester
npartitions=4
category[known] category[known]
... ...
... ...
... ...
... ...
Dask Name: read-parquet, 1 graph layer
>>> ddf.compute()
year semester
0 2022 fall
1 2022 fall
2 2022 fall
3 2022 spring
4 2022 spring
5 2022 spring
6 2023 fall
7 2023 fall
定义自定义分区方案¶
可以为hive分区列指定自定义模式。然后,这些列将使用指定的类型读取,而不是作为 category 读取。
>>> schema = pa.schema([("year", pa.int16()), ("semester", pa.string())])
>>> ddf2 = dd.read_parquet(
... path,
... columns=["year", "semester"],
... dataset={"partitioning": {"flavor": "hive", "schema": schema}}
... )
Dask DataFrame Structure:
year semester
npartitions=4
int16 object
... ...
... ...
... ...
... ...
如果你的分区列中包含空值,你必须以这种方式指定分区模式。
虽然这不是必需的,但如果您需要在高基数列上进行分区,我们也建议您指定分区模式。这是因为默认的 'category'
dtype 会以一种可以显著增加 Dask 集合整体内存占用的方式跟踪已知类别。实际上,read_parquet()
已经出于同样的原因清除了其他列的“已知类别”(参见 分类变量)。
最佳实践¶
尽管Hive分区有时可以通过简化过滤来提高读取性能,但在其他情况下也可能导致性能下降和错误。
避免高基数¶
一个经验法则是避免在 float 列上进行分区,或任何包含许多唯一值(即高基数)的列。
使用 hive 分区时,用户体验不佳的最常见原因是分区列的高基数。例如,如果你尝试对一个包含数百万个唯一值的列进行分区,那么 :func:`to_parquet` 将需要生成数百万个目录。这些目录的管理可能会对文件系统造成压力,而每个目录中需要许多小文件的需求无疑会加剧这一问题。
使用简单数据类型进行分区¶
由于Hive分区数据是“自我描述”的,我们建议您避免在复杂数据类型上进行分区,并在可能的情况下选择基于整数或字符串的数据类型。如果您的数据类型不能从用于定义目录名称的字符串值中轻松推断出来,那么IO引擎可能会在解析这些值时遇到困难。
例如,直接对 datetime64
数据类型的列进行分区可能会生成如下目录名称:
output-path/
├── date=2022-01-01 00:00:00/
├── date=2022-02-01 00:00:00/
├── ...
└── date=2022-12-01 00:00:00/
这些目录名称将不会被正确解释为 datetime64
值,并且在Windows系统上甚至被视为非法。为了更可靠的行为,我们建议将此类列分解为一个或多个“简单”列。例如,可以轻松使用 'date'
来构造 'year'
、'month'
和 'day'
列(根据需要)。
在读取时聚合文件¶
警告
aggregate_files
参数目前被列为实验性功能。然而,目前没有计划在未来的版本中移除该参数或改变其行为。
由于hive分区通常会产生大量小文件,read_parquet()
的性能通常会受益于 aggregate_files
参数的正确使用。以以下数据集为例:
dataset-path/
├── region=1/
│ ├── section=a/
│ │ └── 01.parquet
│ │ └── 02.parquet
│ │ └── 03.parquet
│ ├── section=b/
│ └── └── 04.parquet
│ └── └── 05.parquet
└── region=2/
├── section=a/
│ ├── 06.parquet
│ ├── 07.parquet
│ ├── 08.parquet
如果我们为这种情况设置 aggregate_files=True
,我们是在告诉 Dask 任何 parquet 数据文件都可以被聚合到同一个输出 DataFrame 分区中。相反,如果我们指定一个分区列的名称(例如 'region'
或 'section'
),我们允许聚合任何两个共享文件路径的文件,直到并包括相应的目录名。例如,如果 aggregate_files
设置为 'section'
,04.parquet
和 05.parquet
可能会被聚合在一起,但 03.parquet
和 04.parquet
不能。然而,如果 aggregate_files
设置为 'region'
,04.parquet
可能会与 05.parquet
聚合,并且 03.parquet
可能会与 04.parquet
聚合。
使用 aggregate_files
通常会通过使 DataFrame 分区更接近 blocksize
参数指定的大小来提高性能。相比之下,默认行为可能会产生大量远小于 blocksize
的分区。