Iceberg on Athena

本节我们将使用Athena来完成之前在EMR Studio里的Iceberg操作

Amazon Athena 提供了对 Apache Iceberg 的内置支持,因此我们可以读取和写入 Iceberg 表,而无需添加任何额外的依赖项或配置。

选择 使用 Trino SQL 查询数据,然后选择 Launch query editor

image-20250315084804483

创建 Iceberg 表

要创建数据库,将以下查询复制到查询编辑器中并点击 运行:

create database athena_iceberg_db;

image-20250315085139985


要创建 Iceberg 表,将以下查询复制到查询编辑器中,将 <your-account-id> 替换为当前账户 ID,然后点击 运行

CREATE TABLE athena_iceberg_db.customer_iceberg (
    c_customer_sk INT COMMENT 'unique id', 
    c_customer_id STRING, 
    c_first_name STRING, 
    c_last_name STRING, 
    c_email_address STRING)
LOCATION 's3://otfs-workshop-data-<your-account-id>/datasets/athena_iceberg/customer_iceberg'
TBLPROPERTIES (
  'table_type'='iceberg',
  'format'='PARQUET',
  'write_compression'='zstd'
);

image-20250315085359122


使用 DESCRIBE 查询检查表的架构和分区定义。还可以在左侧面板上选择表名称以查看表详细信息

DESCRIBE customer_iceberg;

image-20250315085614111

使用 <table_name>$files 语法运行 SELECT 查询来查询 Iceberg files 表元数据。customer 表目前不包含任何数据。因此它不会显示任何文件。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$files"

image-20250315085731746

使用 <table_name>$manifests 语法运行 SELECT 查询来查询 Iceberg manifests 表元数据。由于表为空,以下查询将不显示任何数据。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$manifests"

image-20250315085806145

使用 <table_name>$snapshots 语法运行 SELECT 查询来查询 Iceberg $snapshots 表元数据。由于表为空,以下查询将不显示任何数据。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$snapshots"

插入数据

现在从 tpcds.prepared_customer 表中获取一些记录,并将它们插入到 customer_iceberg 表中。

INSERT INTO athena_iceberg_db.customer_iceberg
SELECT * FROM tpcds.prepared_customer 

我们应该在查询结果中看到"查询成功"消息。

image-20250315090230751

通过查询表来验证数据是否正确加载。将以下查询复制到查询编辑器中并点击 运行

select * from athena_iceberg_db.customer_iceberg limit 10;

image-20250315090309124

验证插入命令已将 2000000 条客户记录插入到表中。

select count(*) from athena_iceberg_db.customer_iceberg;

我们应该在查询结果中看到 2000000

image-20250315090333580

在 S3 表位置:s3://otfs-workshop-data-<your-account-id>/datasets/athena_iceberg/customer_iceberg/<your-account-id> 是当前的账户 ID),将看到两个文件夹,datametadatadata 文件夹保存 parquet 格式的实际数据,metadata 文件夹保存各种元数据文件。

image-20250315090458141

有三种类型的元数据文件:

  • 元数据文件,以 .metadata.json 结尾
  • manifest列表,以 *-m*.avro 结尾
  • manifest文件,格式为 snap-*.avro

每次对表进行任何更改时,都会创建一个新的元数据文件。

元数据文件夹:

image-20250315090551083

数据文件夹:

image-20250315090611822

查询 Iceberg 表元数据。运行以下语句列出 Iceberg 表文件。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$files"

在查询结果中,将在 file_path 列中看到有关 S3 数据文件路径的详细信息(.parquet 扩展名):

image-20250315090721521

运行以下语句列出 Iceberg 表的清单。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$manifests"

在查询结果中,我们将在 path 列中看到有关 S3 manifest文件路径的详细信息(.avro 扩展名)。

# path length partition_spec_id added_snapshot_id added_data_files_count added_rows_count existing_data_files_count existing_rows_count deleted_data_files_count deleted_rows_count partitions
1 s3://otfs-workshop-data-145197526627/datasets/athena_iceberg/customer_iceberg/metadata/4a0aa1b2-3ed7-46f8-90fe-2d7804201b31-m0.avro 7040 0 2411840078250196916 1 2000000 0 0 0 0 []

运行以下语句查看 Iceberg 表的操作历史记录。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$history"
# made_current_at snapshot_id parent_id is_current_ancestor
1 2025-03-15 01:02:20.765 UTC 2411840078250196916 true

在查询结果中,我们将看到 snapshot_idparent_id 等。

运行以下语句查看 Iceberg 表快照详细信息。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$snapshots"
# committed_at snapshot_id parent_id operation manifest_list summary
1 2025-03-15 01:02:20.765 UTC 2411840078250196916 append s3://otfs-workshop-data-145197526627/datasets/athena_iceberg/customer_iceberg/metadata/snap-2411840078250196916-1-4a0aa1b2-3ed7-46f8-90fe-2d7804201b31.avro {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=2000000, trino_query_id=20250315_010214_00031_sjwnm, total-position-deletes=0, added-files-size=43960655, total-delete-files=0, total-files-size=43960655, total-records=2000000, total-data-files=1}

在查询结果中,我们将看到 snapshot_idparent_idmanifest_list 等。

更新记录

运行以下查询:

select * from athena_iceberg_db.customer_iceberg
WHERE c_customer_sk = 15

image-20250315091137570

注意用户 Tonya 的姓氏(c_last_name)和电子邮件地址(c_email_address)为 null

以使用 UPDATE 进行更改:

UPDATE athena_iceberg_db.customer_iceberg
SET c_last_name = 'John', c_email_address = 'johnTonya@abx.com' WHERE c_customer_sk = 15

验证 Tonya 的姓氏和电子邮件地址是否已修复:

select * from athena_iceberg_db.customer_iceberg
WHERE c_customer_sk = 15

注意 Tonya 的姓氏和电子邮件地址现在已更新:

image-20250315091233939

Athena 对 UPDATE 操作使用 merge-on-read。这意味着更新被写入单独的增量文件, 原始数据保持不变, 读取时需要实时合并基础数据和增量数据

copy-on-write 更新相比,merge-on-read 更新更高效,因为它不会重写整个数据文件。、

我们可以在s3中检查对应的data目录,发现新创建的文件并不是40M,而是1K左右:

image-20250315094707713

使用parquet-tool检查它的内容,发现它只记录了更新的记录:

pip3 install parquet-cli

parquet-tools cat 20250315_011210_00015_4m7ed-a4a8f232-02f9-44fc-b564-77fced9df99b.parquet
[{"C_customer_id":"AAAAAAAAPAAAAAAA","C_customer_sk":15,"C_email_address":"johnTonya@abx.com","C_first_name":"Tonya","C_last_name":"John"}]

当我们读取配置了 merge-on-read 更新的 Iceberg 表时,引擎会将 Iceberg 数据文件合并,以生成表的最新视图。

适用场景

CoW:读多写少的场景更合适, MoR:写多读少的场景更合适

读取性能

CoW:读取快速且高效,直接读取最新文件, MoR:读取较慢,需要合并基础数据和增量数据

从 Iceberg 表中删除行

Athena 对 DELETE 操作使用 merge-on-read。在运行 DELETE 语句时会创建基于位置的删除文件。

基于位置的删除通过文件和位置标识一个或多个数据文件中的已删除行。

copy-on-write 删除相比,merge-on-read 删除更高效,因为它不会重写整个数据文件。

当我们读取配置了 merge-on-read 删除的 Iceberg 表时,引擎会将 Iceberg 位置删除文件与数据文件合并,以生成表的最新视图。

执行以下语句:

delete from athena_iceberg_db.customer_iceberg
WHERE c_customer_sk = 15

验证 Tonya 的记录是否已从 Iceberg 表中删除:

SELECT * FROM athena_iceberg_db.customer_iceberg WHERE c_customer_sk = 15

查询结果中看到"无结果”。

Time travel

Time Travel使指向特定表快照的可重现查询成为可能,并让用户轻松检查更改。

对 Iceberg 表的每次更改都会创建元数据树的独立版本,称为快照。使用以下查询,我们将看到总共 3 个快照。

  • 初始插入操作(追加)
  • 更新操作(覆盖)
  • 删除操作(删除) 注意下面的查询针对的是历史元数据表。

查询表历史记录:

SELECT * FROM "athena_iceberg_db"."customer_iceberg$history" 
order by made_current_at;

image-20250315095744789

  • 第 1 行对应于我们执行的初始插入操作以。snapshot_id 列显示第一个创建的快照。
  • 第 2 行对应于我们执行的更新操作。snapshot_id 列显示第二个创建的快照。
  • 第 3 行对应于我们执行的删除操作。snapshot_id 列显示第三个(最新)创建的快照。

使用第 2 行的 snapshot_id,以查询对应于第二个快照的表状态(在执行删除操作之前)。

select * from athena_iceberg_db.customer_iceberg 
FOR VERSION AS OF  <snaptshot-id-v>
WHERE c_customer_sk = 15

在查询结果中,我们应该看到客户 Tonya 的记录:

image-20250315095943110

或者使用 made_current_at 列时间来查询特定快照。使用第 2 行的 made_current_at 值:

select * from athena_iceberg_db.customer_iceberg
FOR TIMESTAMP AS OF TIMESTAMP '2024-04-16 17:21:49.771 UTC'
WHERE c_customer_sk = 15

Schema Evolution

Iceberg Schema更新仅是元数据更改。执行Schema更新时,不会更改任何数据文件。Iceberg 格式支持以下Schema更改:

  • 添加 – 向表或嵌套结构添加新列。
  • 删除 – 从表或嵌套结构中删除现有列。
  • 重命名 – 重命名表中或嵌套结构中的现有列或字段。
  • 重新排序 – 更改列的顺序。
  • 类型更改 – 扩展列、结构字段、映射键、映射值或列表元素的类型。目前,Iceberg 表支持以下情况:
    • 整数到大整数
    • 浮点数到双精度
    • 增加十进制类型的精度

查询数据文件。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$files"

现在更改列名。我们可以使用以下 DDL 命令将 c_email_address 列名更改为 email

ALTER TABLE athena_iceberg_db.customer_iceberg change column c_email_address email STRING

查询应该执行没有任何错误。

让我们再次查询数据文件,以验证数据文件没有更改。

SELECT * FROM "athena_iceberg_db"."customer_iceberg$files"

注意,由于schema更改,没有创建新的数据文件。schema更改存储在元数据层。

通过执行查询 DESCRIBE customer_iceberg; 验证列已重命名。

使用以下 DDL 命令添加一个名为 c_birth_date 的新列。

ALTER TABLE athena_iceberg_db.customer_iceberg ADD COLUMNS (c_birth_date int)

通过执行查询 DESCRIBE customer_iceberg; 验证新列已添加。

运行以下查询以查看带有新列的表。请注意,对于表中当前存在的所有记录,新列的值为 null

SELECT *
FROM athena_iceberg_db.customer_iceberg
LIMIT 10

image-20250315100450942