默认情况下,Iceberg不会删除由事务创建的快照。因此,我们可以执行诸如Time Travel查询、将当前表快照回滚到先前的快照
正如我们所看到的,对于Copy on Write表,当Iceberg表发生更改时,无论是更新还是删除,与受影响记录相关的数据文件都将被复制和更新。
虽然拥有多个快照允许我们执行上述操作,但也有一些注意事项。例如,保留更多元数据文件意味着我们的存储成本也会相应增加。避免表无限增长的简单方法是建立一个与我们对Time Travel查询、回滚或CDC的要求相匹配的时间窗口,并删除早于该窗口(例如30天)的快照(以及任何相关的过时文件)。
我们将在下面看到如何使用expire_snapshots
表过程使快照过期。此过程将删除旧快照和仅由这些旧快照唯一需要的数据文件。expire_snapshots
过程永远不会删除非过期快照仍然需要的文件。
目前,我们有3个快照。两个快照是通过追加操作(INSERT)生成的,一个是通过覆盖(MERGE)生成的。
%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at
让我们使第一个快照过期:
expire_time_df=spark.sql(f"""
SELECT committed_at FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at
"""
)
expire_time_ts=expire_time_df.collect()[0][0]
expire_time_ts_plus1 = str(expire_time_ts + timedelta(seconds=1))
print(f"Expire Snapshots older than: {expire_time_ts_plus1}")
expire_snapshot_df=spark.sql(f"""
CALL system.expire_snapshots(table => '`{ICEBERG_DB}`.`{ICEBERG_SALES_TABLE}`', older_than => TIMESTAMP '{expire_time_ts_plus1}')
"""
)
输出数据框包含有关过期操作的统计信息,如删除的数据文件、清单文件和清单列表文件的数量。
expire_snapshot_df.show(truncate=False, vertical=True)
我们可以看到第一个快照已被删除。
%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at
让我们检查每个分区的当前文件数量。
show_data_files_count(ICEBERG_SALES_TABLE)
让我们使除最后一个快照之外的所有快照过期:
timenow = str(datetime.now())
expire_snapshot_df=spark.sql(f"""
CALL system.expire_snapshots(table => '`{ICEBERG_DB}`.`{ICEBERG_SALES_TABLE}`', older_than => TIMESTAMP '{timenow}', retain_last => 1)
"""
)
expire_snapshot_df.show(truncate=False, vertical=True)
我们现在可以看到只有最新的快照可用。
%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at
我们可以看到文件的总数现在减少到只包括当前文件(当前快照指向的文件)。
show_data_files_count(ICEBERG_SALES_TABLE)
快照过期机制可以: