数据压缩 - Data Compaction

Iceberg跟踪表中的每个数据文件。更多的数据文件导致在清单文件中存储更多的元数据,而小的数据文件会导致不必要的元数据量和由于文件打开成本而降低查询效率。但更重要的是,查询将不得不解析其清单和清单列表中的大量文件,更不用说打开文件的成本了。

Iceberg允许我们以编程方式调整文件大小,使其达到查询的最佳大小。我们可以设置所需的目标文件大小,并确保读取针对快速查询性能进行了优化。通过使用<strong>rewrite_data_files</strong> 表过程来实现这一点。

让我们检查每个分区的当前文件数量及其相关大小。为此,我们使用files元数据表:

spark.sql(f"""
SELECT partition, count(*) as num_files, collect_list( ROUND((file_size_in_bytes / 1024 / 1024), 2)) as file_size_in_megabytes
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.files
GROUP BY partition 
ORDER BY partition
"""
).show(truncate=False)

image-20250315163911055

请注意,year分区转换从1970年开始提取日期或时间戳年份(例如,year=2001 => year=31)。

让我们从属于2000年分区的一个文件中取100行。我们可以看到文件中的记录没有按ws_order_number列排序。

partition_2000_file=spark.sql(f"""
SELECT input_file_name() as file_name
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}
WHERE year(ws_sales_time) = 2000
LIMIT 1
"""
).collect()[0]["file_name"]

df=spark.read.parquet(partition_2000_file)
df.show(100)

image-20250315164031566

我们将使用rewrite_data_files过程不仅将文件打包成所需的大小,还将根据指定的列(ws_order_number)对它们进行排序。请记住,此选项会由于排序阶段而减慢重写速度。

对于Iceberg 1.4.0,我们有以下影响rewrite_data_files过程的默认值:

  • target-file-size-bytes = 536870912(512 MB):目标输出文件大小
  • min-file-size-bytes = target-file-size-bytes的75%(384 MB):低于此大小的文件将被考虑重写
  • max-file-size-bytes = target-file-size-bytes的180%(921.6 MB):高于此大小的文件将被考虑重写
  • min-input-files = 5:在一个分区内,我们需要至少这么多文件才能触发压缩(超过此数量的文件组将被重写)
  • max-concurrent-file-group-rewrite = 5:可以并行重写的文件组的最大数量
  • max-file-group-size-bytes = 107374182400(100GB):在单个文件组中应重写的最大数据量

所有这些值最终都可以使用options参数进行修改。要获取更多信息,请查看Iceberg文档中的rewrite_data_files 部分。

请注意,标记为重写(压缩)的文件组仅包括位于单个分区内的文件(文件组不跨越多个分区)。

让我们按ws_order_number列对表进行压缩排序,指定最多可以并行压缩4个文件组。此过程可能需要2-3分钟。

compaction_df=spark.sql(f"""
CALL system.rewrite_data_files(
table => '`{ICEBERG_DB}`.`{ICEBERG_SALES_TABLE}`', 
strategy => 'sort', 
sort_order => 'ws_order_number ASC NULLS LAST',
options => map('max-concurrent-file-group-rewrites', '4')
)
"""
)
compaction_df.show(truncate=False)

image-20250315164350694

下面,我们可以看到每个分区的新文件数量及其相关大小。

spark.sql(f"""
SELECT partition, count(*) as num_files, collect_list( ROUND((file_size_in_bytes / 1024 / 1024), 2)) as file_size_in_megabytes
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.files
GROUP BY partition 
ORDER BY partition
"""
).show(truncate=False)

image-20250315164413125

请注意,只有压缩文件已被排序。例如,我们可以看到2000年分区内生成的文件中的记录是按ws_order_number列排序的。

partition_2000_file=spark.sql(f"""
SELECT input_file_name() as file_name
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}
WHERE year(ws_sales_time) = 2000
LIMIT 1
"""
).collect()[0]["file_name"]

df=spark.read.parquet(partition_2000_file)
df.show(100)

image-20250315164443539

请注意,压缩操作生成了一个类型为replace的新快照:

%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at

image-20250315164519791