使用Merge进行CDC

我们将使用Merge Into语句将CDC源中的数据更新到Iceberg表中。

MERGE INTO 是 Apache Iceberg 支持的一种强大的 DML 操作,用于在单个原子操作中执行条件更新和插入。这种语法通常被称为"Upsert"操作,其特点是可以同时处理数据插入和更新,从而高效地合并变更。

下面,通过单个操作,我们将:

  • 插入2001年的数据
  • 将2000年相关记录的仓库ID从10更新为16
  • 删除1999年仓库ID设置为9的相关记录

为此,我们创建一个临时表merge_table,它是插入、更新和删除的联合。每条记录都包含一个operation列,指示它是插入(I)、更新(U)还是删除(D)。

insert_df=spark.sql(f"""
SELECT ws_order_number, ws_item_sk, ws_quantity, ws_sales_price, ws_warehouse_sk, ws_sales_time, 'I' as operation
FROM spark_catalog.{TPC_DS_DATABASE}.{TPC_DS_SALES_TABLE} where year(ws_sales_time) = 2001
"""
)

update_df=spark.sql(f"""
SELECT ws_order_number, ws_item_sk, ws_quantity, ws_sales_price, 16 AS ws_warehouse_sk, ws_sales_time, 'U' as operation  
FROM spark_catalog.{TPC_DS_DATABASE}.{TPC_DS_SALES_TABLE} where year(ws_sales_time) = 2000 AND ws_warehouse_sk = 10
"""
)

delete_df=spark.sql(f"""
SELECT ws_order_number, ws_item_sk, ws_quantity, ws_sales_price, ws_warehouse_sk, ws_sales_time, 'D' as operation  
FROM spark_catalog.{TPC_DS_DATABASE}.{TPC_DS_SALES_TABLE} where year(ws_sales_time) = 1999 AND ws_warehouse_sk = 9
"""
)


insert_count=insert_df.count()
update_count=update_df.count()
delete_count=delete_df.count()

#union the 3 previously created dataframes inside a single one 
merge_df=insert_df.union(update_df).union(delete_df)

#create a temporary view called `merge_table` with the unioned dataframe
merge_df.createOrReplaceTempView("merge_table")

print(f"Records which will be inserted: {insert_count}")
print(f"Records which will be updated: {update_count}")
print(f"Records which will be deleted: {delete_count}")

image-20250315160230087

让我们将merge_table临时表与iceberg表合并。该操作可能需要1-2分钟才能完成。

%%sql
MERGE INTO ${ICEBERG_DB}.${ICEBERG_SALES_TABLE} t
USING merge_table s
    ON t.ws_order_number = s.ws_order_number AND t.ws_item_sk = s.ws_item_sk
WHEN MATCHED AND s.operation like 'D' THEN DELETE
WHEN MATCHED AND s.operation like 'U' THEN UPDATE SET t.ws_order_number = s.ws_order_number, t.ws_item_sk = s.ws_item_sk, t.ws_quantity = s.ws_quantity, t.ws_sales_price = s.ws_sales_price, t.ws_warehouse_sk = s.ws_warehouse_sk, t.ws_sales_time = s.ws_sales_time
WHEN NOT MATCHED THEN INSERT (ws_order_number, ws_item_sk, ws_quantity, ws_sales_price, ws_warehouse_sk, ws_sales_time) VALUES (s.ws_order_number, s.ws_item_sk, s.ws_quantity, s.ws_sales_price, s.ws_warehouse_sk, s.ws_sales_time)
show_tables_files(ICEBERG_SALES_TABLE)

image-20250315160406483

验证2001年的数据:

%%sql
SELECT YEAR(ws_sales_time) AS year, COUNT(*) as records_per_year
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
GROUP BY (YEAR(ws_sales_time))
ORDER BY year

image-20250315160449314

我们可以验证2000年没有仓库ID设置为10的条目,而是设置为16:

%%sql
SELECT ws_warehouse_sk, COUNT(*) as records_per_warehouse
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
WHERE YEAR(ws_sales_time) = 2000
GROUP BY ws_warehouse_sk
ORDER BY ws_warehouse_sk

image-20250315160553031

我们可以在表上看到,1999年没有仓库ID为9的条目:

%%sql
SELECT ws_warehouse_sk, COUNT(*) as records_per_warehouse
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
WHERE YEAR(ws_sales_time) = 1999
GROUP BY ws_warehouse_sk
ORDER BY ws_warehouse_sk

image-20250315160636879

通过检查合并操作创建的最新快照的摘要,我们可以看到:

  • 3个changed-partition-count(MERGE触及了1999年、2000年和2001年的分区)
  • deleted-data-files的数量反映了在执行MERGE操作之前1999年和2000年分区中的文件数量(MERGE的一部分的DELETE和UPDATE操作影响了1999年和2000年分区中所有数据文件中的记录)。
df=spark.sql(f"""
SELECT summary
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at DESC
""")

print(json.dumps(df.first()['summary'], sort_keys=True, indent=4))

image-20250315160708277