我们将使用Merge Into语句将CDC源中的数据更新到Iceberg表中。
MERGE INTO 是 Apache Iceberg 支持的一种强大的 DML 操作,用于在单个原子操作中执行条件更新和插入。这种语法通常被称为"Upsert"操作,其特点是可以同时处理数据插入和更新,从而高效地合并变更。
下面,通过单个操作,我们将:
为此,我们创建一个临时表merge_table
,它是插入、更新和删除的联合。每条记录都包含一个operation
列,指示它是插入(I)、更新(U)还是删除(D)。
insert_df=spark.sql(f"""
SELECT ws_order_number, ws_item_sk, ws_quantity, ws_sales_price, ws_warehouse_sk, ws_sales_time, 'I' as operation
FROM spark_catalog.{TPC_DS_DATABASE}.{TPC_DS_SALES_TABLE} where year(ws_sales_time) = 2001
"""
)
update_df=spark.sql(f"""
SELECT ws_order_number, ws_item_sk, ws_quantity, ws_sales_price, 16 AS ws_warehouse_sk, ws_sales_time, 'U' as operation
FROM spark_catalog.{TPC_DS_DATABASE}.{TPC_DS_SALES_TABLE} where year(ws_sales_time) = 2000 AND ws_warehouse_sk = 10
"""
)
delete_df=spark.sql(f"""
SELECT ws_order_number, ws_item_sk, ws_quantity, ws_sales_price, ws_warehouse_sk, ws_sales_time, 'D' as operation
FROM spark_catalog.{TPC_DS_DATABASE}.{TPC_DS_SALES_TABLE} where year(ws_sales_time) = 1999 AND ws_warehouse_sk = 9
"""
)
insert_count=insert_df.count()
update_count=update_df.count()
delete_count=delete_df.count()
#union the 3 previously created dataframes inside a single one
merge_df=insert_df.union(update_df).union(delete_df)
#create a temporary view called `merge_table` with the unioned dataframe
merge_df.createOrReplaceTempView("merge_table")
print(f"Records which will be inserted: {insert_count}")
print(f"Records which will be updated: {update_count}")
print(f"Records which will be deleted: {delete_count}")
让我们将merge_table
临时表与iceberg表合并。该操作可能需要1-2分钟才能完成。
%%sql
MERGE INTO ${ICEBERG_DB}.${ICEBERG_SALES_TABLE} t
USING merge_table s
ON t.ws_order_number = s.ws_order_number AND t.ws_item_sk = s.ws_item_sk
WHEN MATCHED AND s.operation like 'D' THEN DELETE
WHEN MATCHED AND s.operation like 'U' THEN UPDATE SET t.ws_order_number = s.ws_order_number, t.ws_item_sk = s.ws_item_sk, t.ws_quantity = s.ws_quantity, t.ws_sales_price = s.ws_sales_price, t.ws_warehouse_sk = s.ws_warehouse_sk, t.ws_sales_time = s.ws_sales_time
WHEN NOT MATCHED THEN INSERT (ws_order_number, ws_item_sk, ws_quantity, ws_sales_price, ws_warehouse_sk, ws_sales_time) VALUES (s.ws_order_number, s.ws_item_sk, s.ws_quantity, s.ws_sales_price, s.ws_warehouse_sk, s.ws_sales_time)
show_tables_files(ICEBERG_SALES_TABLE)
验证2001年的数据:
%%sql
SELECT YEAR(ws_sales_time) AS year, COUNT(*) as records_per_year
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
GROUP BY (YEAR(ws_sales_time))
ORDER BY year
我们可以验证2000年没有仓库ID设置为10的条目,而是设置为16:
%%sql
SELECT ws_warehouse_sk, COUNT(*) as records_per_warehouse
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
WHERE YEAR(ws_sales_time) = 2000
GROUP BY ws_warehouse_sk
ORDER BY ws_warehouse_sk
我们可以在表上看到,1999年没有仓库ID为9的条目:
%%sql
SELECT ws_warehouse_sk, COUNT(*) as records_per_warehouse
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
WHERE YEAR(ws_sales_time) = 1999
GROUP BY ws_warehouse_sk
ORDER BY ws_warehouse_sk
通过检查合并操作创建的最新快照的摘要,我们可以看到:
df=spark.sql(f"""
SELECT summary
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at DESC
""")
print(json.dumps(df.first()['summary'], sort_keys=True, indent=4))