现在向Iceberg表插入一些客户数据。以下是源表的数据。
%%sql
SELECT *
FROM spark_catalog.${TPC_DS_DATABASE}.${TPC_DS_CUSTOMER_TABLE}
LIMIT 10
插入数据:
%%sql
INSERT INTO ${ICEBERG_DB}.${ICEBERG_CUSTOMER_TABLE}
SELECT *
FROM spark_catalog.${TPC_DS_DATABASE}.${TPC_DS_CUSTOMER_TABLE}
可以看到有200万客户记录被插入到表中:
%%sql
SELECT COUNT(*)
FROM ${ICEBERG_DB}.${ICEBERG_CUSTOMER_TABLE}
再次打印表结构, 将看到两个文件夹,data和metadata,其中data文件夹包含parquet格式的实际数据,metadata文件夹包含各种元数据文件。
有三种类型的元数据文件:
-m*.avro
结尾。每次对表进行任何更改时,都会创建一个新的metadata文件
show_tables_files(ICEBERG_CUSTOMER_TABLE)
可以看到Glue Catalog中的当前元数据指针已更新,指向新的元数据文件。
show_current_metadata_pointer(ICEBERG_CUSTOMER_TABLE)
现在有了第一个Iceberg表。让我们看看如何从这个表中查询数据。
Iceberg支持使用Spark SQL和DataFrames进行查询
%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_CUSTOMER_TABLE} LIMIT 10
# 使用DataFrame查询
spark.table(f"{ICEBERG_DB}.{ICEBERG_CUSTOMER_TABLE}").limit(10).show()
进行一些数据清洗。假设一个客户错误地输入了姓氏和电子邮件这两个字段为null,我们来修复它。
可以使用UPDATE
进行更改。UPDATE查询接受一个过滤器来匹配要更新的行。
注意Tonya的姓氏和电子邮件地址为null:
%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_CUSTOMER_TABLE} WHERE c_customer_sk = 15
%%sql
UPDATE ${ICEBERG_DB}.${ICEBERG_CUSTOMER_TABLE} SET c_last_name = 'John', c_email_address = 'johnTonya@abx.com' WHERE c_customer_sk = 15
注意Tonya的姓氏和电子邮件地址已修复。
%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_CUSTOMER_TABLE} WHERE c_customer_sk = 15
检查这个UPDATE操作如何影响Iceberg的数据层。由于实验使用默认的写时复制(Copy on Write)更改格式,当对特定行或多行进行删除或更新时,包含这些行的数据文件会被复制,新版本具有更新后的行。
由于上述更改,创建了一个新的parquet文件。可以通过不同的LastModified时间戳识别该新文件, 这代表了写时复制更改风格。
show_data_files(ICEBERG_CUSTOMER_TABLE)
Tonya根据她的GDPR权利, 想删除她的帐号。现在需要删除她的记录。
可以使用DELETE FROM
更改
%%sql
DELETE FROM ${ICEBERG_DB}.${ICEBERG_CUSTOMER_TABLE} WHERE c_customer_sk = 15
注意Tonya的记录已从Iceberg表中删除。
%%sql
SELECT * FROM ${ICEBERG_DB}.${ICEBERG_CUSTOMER_TABLE} WHERE c_customer_sk = 15
检查如何影响Iceberg的数据层。如果删除操作匹配表的整个分区,Iceberg将仅执行元数据的删除。如果匹配表的单个行,则Iceberg将仅重写受影响的数据文件。
同样,由于使用默认的Copy on Write行级更改格式,当对特定行或多行进行删除或更新时,包含这些行的数据文件会被复制,新版本具有更新后的行。
由于上述更改,创建了一个新的parquet文件:
show_data_files(ICEBERG_CUSTOMER_TABLE)
注意它们的size都接近: