Iceberg跟踪表中的每个数据文件。更多的数据文件导致在清单文件中存储更多的元数据,而小的数据文件会导致不必要的元数据量和由于文件打开成本而降低查询效率。但更重要的是,查询将不得不解析其清单和清单列表中的大量文件,更不用说打开文件的成本了。
Iceberg允许我们以编程方式调整文件大小,使其达到查询的最佳大小。我们可以设置所需的目标文件大小,并确保读取针对快速查询性能进行了优化。通过使用<strong>rewrite_data_files</strong> 表过程来实现这一点。
让我们检查每个分区的当前文件数量及其相关大小。为此,我们使用files元数据表:
spark.sql(f"""
SELECT partition, count(*) as num_files, collect_list( ROUND((file_size_in_bytes / 1024 / 1024), 2)) as file_size_in_megabytes
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.files
GROUP BY partition
ORDER BY partition
"""
).show(truncate=False)
请注意,year分区转换从1970年开始提取日期或时间戳年份(例如,year=2001 => year=31)。
让我们从属于2000年分区的一个文件中取100行。我们可以看到文件中的记录没有按ws_order_number
列排序。
partition_2000_file=spark.sql(f"""
SELECT input_file_name() as file_name
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}
WHERE year(ws_sales_time) = 2000
LIMIT 1
"""
).collect()[0]["file_name"]
df=spark.read.parquet(partition_2000_file)
df.show(100)
我们将使用rewrite_data_files过程不仅将文件打包成所需的大小,还将根据指定的列(ws_order_number
)对它们进行排序。请记住,此选项会由于排序阶段而减慢重写速度。
对于Iceberg 1.4.0,我们有以下影响rewrite_data_files过程的默认值:
所有这些值最终都可以使用options参数进行修改。要获取更多信息,请查看Iceberg文档中的rewrite_data_files 部分。
请注意,标记为重写(压缩)的文件组仅包括位于单个分区内的文件(文件组不跨越多个分区)。
让我们按ws_order_number
列对表进行压缩排序,指定最多可以并行压缩4个文件组。此过程可能需要2-3分钟。
compaction_df=spark.sql(f"""
CALL system.rewrite_data_files(
table => '`{ICEBERG_DB}`.`{ICEBERG_SALES_TABLE}`',
strategy => 'sort',
sort_order => 'ws_order_number ASC NULLS LAST',
options => map('max-concurrent-file-group-rewrites', '4')
)
"""
)
compaction_df.show(truncate=False)
下面,我们可以看到每个分区的新文件数量及其相关大小。
spark.sql(f"""
SELECT partition, count(*) as num_files, collect_list( ROUND((file_size_in_bytes / 1024 / 1024), 2)) as file_size_in_megabytes
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.files
GROUP BY partition
ORDER BY partition
"""
).show(truncate=False)
请注意,只有压缩文件已被排序。例如,我们可以看到2000年分区内生成的文件中的记录是按ws_order_number
列排序的。
partition_2000_file=spark.sql(f"""
SELECT input_file_name() as file_name
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}
WHERE year(ws_sales_time) = 2000
LIMIT 1
"""
).collect()[0]["file_name"]
df=spark.read.parquet(partition_2000_file)
df.show(100)
请注意,压缩操作生成了一个类型为replace
的新快照:
%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at