本节我们将使用Athena来完成之前在EMR Studio里的Iceberg操作
Amazon Athena 提供了对 Apache Iceberg 的内置支持,因此我们可以读取和写入 Iceberg 表,而无需添加任何额外的依赖项或配置。
选择 使用 Trino SQL 查询数据,然后选择 Launch query editor。
要创建数据库,将以下查询复制到查询编辑器中并点击 运行:
create database athena_iceberg_db;
要创建 Iceberg 表,将以下查询复制到查询编辑器中,将 <your-account-id>
替换为当前账户 ID,然后点击 运行。
CREATE TABLE athena_iceberg_db.customer_iceberg (
c_customer_sk INT COMMENT 'unique id',
c_customer_id STRING,
c_first_name STRING,
c_last_name STRING,
c_email_address STRING)
LOCATION 's3://otfs-workshop-data-<your-account-id>/datasets/athena_iceberg/customer_iceberg'
TBLPROPERTIES (
'table_type'='iceberg',
'format'='PARQUET',
'write_compression'='zstd'
);
使用 DESCRIBE 查询检查表的架构和分区定义。还可以在左侧面板上选择表名称以查看表详细信息
DESCRIBE customer_iceberg;
使用 <table_name>$files
语法运行 SELECT
查询来查询 Iceberg files
表元数据。customer
表目前不包含任何数据。因此它不会显示任何文件。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$files"
使用 <table_name>$manifests
语法运行 SELECT
查询来查询 Iceberg manifests
表元数据。由于表为空,以下查询将不显示任何数据。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$manifests"
使用 <table_name>$snapshots
语法运行 SELECT
查询来查询 Iceberg $snapshots
表元数据。由于表为空,以下查询将不显示任何数据。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$snapshots"
现在从 tpcds.prepared_customer
表中获取一些记录,并将它们插入到 customer_iceberg
表中。
INSERT INTO athena_iceberg_db.customer_iceberg
SELECT * FROM tpcds.prepared_customer
我们应该在查询结果中看到"查询成功"消息。
通过查询表来验证数据是否正确加载。将以下查询复制到查询编辑器中并点击 运行。
select * from athena_iceberg_db.customer_iceberg limit 10;
验证插入命令已将 2000000 条客户记录插入到表中。
select count(*) from athena_iceberg_db.customer_iceberg;
我们应该在查询结果中看到 2000000
。
在 S3 表位置:s3://otfs-workshop-data-<your-account-id>/datasets/athena_iceberg/customer_iceberg/
(<your-account-id>
是当前的账户 ID),将看到两个文件夹,data 和 metadata。data 文件夹保存 parquet 格式的实际数据,metadata 文件夹保存各种元数据文件。
有三种类型的元数据文件:
.metadata.json
结尾*-m*.avro
结尾snap-*.avro
每次对表进行任何更改时,都会创建一个新的元数据文件。
元数据文件夹:
数据文件夹:
查询 Iceberg 表元数据。运行以下语句列出 Iceberg 表文件。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$files"
在查询结果中,将在 file_path
列中看到有关 S3 数据文件路径的详细信息(.parquet
扩展名):
运行以下语句列出 Iceberg 表的清单。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$manifests"
在查询结果中,我们将在 path
列中看到有关 S3 manifest文件路径的详细信息(.avro
扩展名)。
# | path | length | partition_spec_id | added_snapshot_id | added_data_files_count | added_rows_count | existing_data_files_count | existing_rows_count | deleted_data_files_count | deleted_rows_count | partitions |
---|---|---|---|---|---|---|---|---|---|---|---|
1 | s3://otfs-workshop-data-145197526627/datasets/athena_iceberg/customer_iceberg/metadata/4a0aa1b2-3ed7-46f8-90fe-2d7804201b31-m0.avro | 7040 | 0 | 2411840078250196916 | 1 | 2000000 | 0 | 0 | 0 | 0 | [] |
运行以下语句查看 Iceberg 表的操作历史记录。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$history"
# | made_current_at | snapshot_id | parent_id | is_current_ancestor |
---|---|---|---|---|
1 | 2025-03-15 01:02:20.765 UTC | 2411840078250196916 | true |
在查询结果中,我们将看到 snapshot_id
、parent_id
等。
运行以下语句查看 Iceberg 表快照详细信息。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$snapshots"
# | committed_at | snapshot_id | parent_id | operation | manifest_list | summary |
---|---|---|---|---|---|---|
1 | 2025-03-15 01:02:20.765 UTC | 2411840078250196916 | append | s3://otfs-workshop-data-145197526627/datasets/athena_iceberg/customer_iceberg/metadata/snap-2411840078250196916-1-4a0aa1b2-3ed7-46f8-90fe-2d7804201b31.avro | {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=2000000, trino_query_id=20250315_010214_00031_sjwnm, total-position-deletes=0, added-files-size=43960655, total-delete-files=0, total-files-size=43960655, total-records=2000000, total-data-files=1} |
在查询结果中,我们将看到 snapshot_id
、parent_id
、manifest_list
等。
运行以下查询:
select * from athena_iceberg_db.customer_iceberg
WHERE c_customer_sk = 15
注意用户 Tonya 的姓氏(c_last_name
)和电子邮件地址(c_email_address
)为 null
。
以使用 UPDATE
进行更改:
UPDATE athena_iceberg_db.customer_iceberg
SET c_last_name = 'John', c_email_address = 'johnTonya@abx.com' WHERE c_customer_sk = 15
验证 Tonya 的姓氏和电子邮件地址是否已修复:
select * from athena_iceberg_db.customer_iceberg
WHERE c_customer_sk = 15
注意 Tonya 的姓氏和电子邮件地址现在已更新:
Athena 对 UPDATE 操作使用 merge-on-read
。这意味着更新被写入单独的增量文件, 原始数据保持不变, 读取时需要实时合并基础数据和增量数据
与 copy-on-write 更新相比,merge-on-read 更新更高效,因为它不会重写整个数据文件。、
我们可以在s3中检查对应的data目录,发现新创建的文件并不是40M,而是1K左右:
使用parquet-tool检查它的内容,发现它只记录了更新的记录:
pip3 install parquet-cli
parquet-tools cat 20250315_011210_00015_4m7ed-a4a8f232-02f9-44fc-b564-77fced9df99b.parquet
[{"C_customer_id":"AAAAAAAAPAAAAAAA","C_customer_sk":15,"C_email_address":"johnTonya@abx.com","C_first_name":"Tonya","C_last_name":"John"}]
当我们读取配置了 merge-on-read 更新的 Iceberg 表时,引擎会将 Iceberg 数据文件合并,以生成表的最新视图。
CoW:读多写少的场景更合适, MoR:写多读少的场景更合适
CoW:读取快速且高效,直接读取最新文件, MoR:读取较慢,需要合并基础数据和增量数据
Athena 对 DELETE 操作使用 merge-on-read
。在运行 DELETE 语句时会创建基于位置的删除文件。
基于位置的删除通过文件和位置标识一个或多个数据文件中的已删除行。
与 copy-on-write 删除相比,merge-on-read 删除更高效,因为它不会重写整个数据文件。
当我们读取配置了 merge-on-read 删除的 Iceberg 表时,引擎会将 Iceberg 位置删除文件与数据文件合并,以生成表的最新视图。
执行以下语句:
delete from athena_iceberg_db.customer_iceberg
WHERE c_customer_sk = 15
验证 Tonya 的记录是否已从 Iceberg 表中删除:
SELECT * FROM athena_iceberg_db.customer_iceberg WHERE c_customer_sk = 15
查询结果中看到"无结果”。
Time Travel使指向特定表快照的可重现查询成为可能,并让用户轻松检查更改。
对 Iceberg 表的每次更改都会创建元数据树的独立版本,称为快照。使用以下查询,我们将看到总共 3 个快照。
查询表历史记录:
SELECT * FROM "athena_iceberg_db"."customer_iceberg$history"
order by made_current_at;
snapshot_id
列显示第一个创建的快照。snapshot_id
列显示第二个创建的快照。snapshot_id
列显示第三个(最新)创建的快照。使用第 2 行的 snapshot_id
,以查询对应于第二个快照的表状态(在执行删除操作之前)。
select * from athena_iceberg_db.customer_iceberg
FOR VERSION AS OF <snaptshot-id-v>
WHERE c_customer_sk = 15
在查询结果中,我们应该看到客户 Tonya 的记录:
或者使用 made_current_at
列时间来查询特定快照。使用第 2 行的 made_current_at
值:
select * from athena_iceberg_db.customer_iceberg
FOR TIMESTAMP AS OF TIMESTAMP '2024-04-16 17:21:49.771 UTC'
WHERE c_customer_sk = 15
Iceberg Schema更新仅是元数据更改。执行Schema更新时,不会更改任何数据文件。Iceberg 格式支持以下Schema更改:
查询数据文件。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$files"
现在更改列名。我们可以使用以下 DDL 命令将 c_email_address
列名更改为 email
。
ALTER TABLE athena_iceberg_db.customer_iceberg change column c_email_address email STRING
查询应该执行没有任何错误。
让我们再次查询数据文件,以验证数据文件没有更改。
SELECT * FROM "athena_iceberg_db"."customer_iceberg$files"
注意,由于schema更改,没有创建新的数据文件。schema更改存储在元数据层。
通过执行查询 DESCRIBE customer_iceberg;
验证列已重命名。
使用以下 DDL 命令添加一个名为 c_birth_date
的新列。
ALTER TABLE athena_iceberg_db.customer_iceberg ADD COLUMNS (c_birth_date int)
通过执行查询 DESCRIBE customer_iceberg;
验证新列已添加。
运行以下查询以查看带有新列的表。请注意,对于表中当前存在的所有记录,新列的值为 null
。
SELECT *
FROM athena_iceberg_db.customer_iceberg
LIMIT 10