隐藏分区 - Hidden Partitioning

打开之前使用的EMR Studio:

image-20250315113329774

下载此notebook: https://pingfan.s3.amazonaws.com/files/emr_iceberg_advanced.ipynb

使用 Upload Files 按钮上传:

image-20250315113351862

上传后双击notebook打开它,确认右上角的内核设置为 Pyspark

等待内核进入 Idle 状态(左下角):

image-20250315113446432

隐藏分区和表维护

之前我们测试了Iceberg表的创建、删除、更新、Time Travel和Schema Evolution。现在,我们将使用包含7000万销售事件的更大表来评估Iceberg的性能效率。

我们将使用TPC-DS 的web_sales表的修改版本,称为prepared_web_sales

以下步骤和之前一样,创建了一个名为iceberg_catalog的新Spark Catalog,该目录与Glue Data Catalog兼容,并支持Iceberg表。spark.sql.catalog.<catalog_name>.warehouse属性用于设置iceberg_catalog中创建的数据库和表的默认位置。更新属性spark.sql.catalog.iceberg_catalog.warehouse,并将**<your-account-id>**占位符替换为我们当前的账户ID。

%%configure -f
{   
    "numExecutors": 3, 
    "executorMemory": "14G",
    "executorCores": 4, 
    "conf":{
         "spark.sql.catalog.iceberg_catalog.warehouse":"s3://otfs-workshop-data-<your-account-id>/datasets/emr_iceberg/",
         "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
         "spark.sql.catalog.iceberg_catalog":"org.apache.iceberg.spark.SparkCatalog",
         "spark.sql.catalog.iceberg_catalog.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
         "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
         "spark.dynamicAllocation.enabled": "false"
        }
}

更新属性BUCKET_NAME并将<your-account-id>占位符替换为我们当前的账户ID:

#Modify this variable
BUCKET_NAME = "otfs-workshop-data-<your-account-id>"

# python configs - do not modify
SERVICE = 'emr'
TPC_DS_DATABASE = "tpcds"
TPC_DS_CUSTOMER_TABLE = "prepared_customer"
TPC_DS_SALES_TABLE = "prepared_web_sales"
ICEBERG_CATALOG="iceberg_catalog"
ICEBERG_DB = f"{SERVICE}_iceberg_db"
ICEBERG_CUSTOMER_TABLE = "customer_iceberg"
ICEBERG_SALES_TABLE = "web_sales_iceberg"
ICEBERG_CATALOG_PATH=f"datasets/{SERVICE}_iceberg"
ICEBERG_DATABASE_PATH=f"{ICEBERG_CATALOG_PATH}/{ICEBERG_DB}.db"
WAREHOUSE_PATH=f"s3://{BUCKET_NAME}/{ICEBERG_CATALOG_PATH}"

# sparkmagic SQL configs - do not modify
spark.conf.set('ICEBERG_CATALOG', ICEBERG_CATALOG)
spark.conf.set('ICEBERG_DB', ICEBERG_DB)
spark.conf.set('ICEBERG_CUSTOMER_TABLE', ICEBERG_CUSTOMER_TABLE)
spark.conf.set('ICEBERG_SALES_TABLE', ICEBERG_SALES_TABLE)
spark.conf.set('TPC_DS_DATABASE', TPC_DS_DATABASE)
spark.conf.set('TPC_DS_CUSTOMER_TABLE', TPC_DS_CUSTOMER_TABLE)
spark.conf.set('TPC_DS_SALES_TABLE', TPC_DS_SALES_TABLE)

导入了一些将在整个notebook中使用的库:

import boto3
import json
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col
from datetime import datetime, timedelta

s3_client = boto3.client('s3')
current_region = s3_client.meta.region_name
glue_client = boto3.client('glue', current_region)

def list_files_recursively(bucket_name, prefix):
    paginator = s3_client.get_paginator('list_objects_v2')
    operation_parameters = {'Bucket': bucket_name, 'Prefix': prefix}

    files_dict = {}
    for page in paginator.paginate(**operation_parameters):
        if 'Contents' in page:
            for obj in page['Contents']:
                file_key = obj['Key']
                if file_key.endswith('/'):  # It's a directory
                    relative_key = file_key[len(prefix):]  # Removing the prefix
                    if relative_key:  # Check if not an empty string
                        folders = relative_key.split('/')[:-1]  # Remove the last empty string element
                        current_dict = files_dict
                        for folder in folders:
                            if folder not in current_dict:
  
 ……

下面们配置**%%sql**魔术命令以显示每列更多字符,以便能够阅读表中的所有内容:

%%local
import pandas as pd
pd.set_option('display.max_colwidth', 10000)

切换到Iceberg目录。

%%sql
USE iceberg_catalog

创建数据库:

%%sql
CREATE DATABASE IF NOT EXISTS ${ICEBERG_DB}

隐藏分区(Hidden Partitioning)

隐藏分区是 Iceberg 表结构中的一个关键概念,指的是分区信息完全由 Iceberg 在元数据中管理,对用户和查询引擎透明。与传统的 Hive 风格分区不同,隐藏分区不会在文件路径中显式表示分区值。

Hive 分区(显式分区)

/data/table/date=2023-01-01/id=123/file.parquet

Iceberg 隐藏分区

/data/table/00000-3-8d6d60e8-d427-4400-fc90-814329312c5c-00000.parquet

分区信息存储在元数据中,不在路径中体现。优势是:

路径简洁:文件路径不再包含分区信息,避免了长路径问题

查询优化:查询引擎可直接使用元数据进行分区裁剪,无需路径解析

假设我们当前以hive格式存储销售表,按年份分区。为了编写高性能查询,终端用户必须了解表分区结构。

Iceberg的隐藏分区是对Hive方法的改进,使分区变得声明性。创建表时,我们可以使用称为分区规范 的构造来配置如何对其进行分区,如day(event_ts)。在读取时,Iceberg使用这些关系自动将查询转换为分区查询。

选择使用years(ws_sales_time)表达式按销售年份对数据进行分区:

%%sql
CREATE TABLE ${ICEBERG_DB}.${ICEBERG_SALES_TABLE} (
    ws_order_number INT,
    ws_item_sk INT,
    ws_quantity INT,
    ws_sales_price DOUBLE,
    ws_warehouse_sk INT,
    ws_sales_time TIMESTAMP
) USING iceberg
TBLPROPERTIES (
    'format-version'='2'
    )
COMMENT 'This table contains sales transaction data'
PARTITIONED BY (
    years(ws_sales_time)
)

插入数据

我们首先插入2001年之前的所有销售记录。以下是该数据的示例。

%%sql
SELECT *
FROM spark_catalog.${TPC_DS_DATABASE}.${TPC_DS_SALES_TABLE} where year(ws_sales_time) < 2001
LIMIT 10

如果上面查询执行失败,是cloudformation没有创建好这个表。

在athena中找到这个query并执行:

image-20250315121448940

执行结果如下:

image-20250315121540825

Iceberg 1.4.0默认将hash设置为分区表的write.distribution-mode 。这意味着在写入之前,数据集按分区字段进行哈希分区。这种方式确保具有相同分区值的记录被发送到同一个写入任务,同时所有写入任务的工作量尽可能均衡。 这样减少小文件问题:相同分区的数据集中写入,生成更合适大小的文件

我们现在执行两个插入操作,第一个插入1998年和1999年的数据,第二个插入2000年的数据。我们这样做是为了生成两个单独的快照,因为我们稍后需要展示Iceberg的expire_snapshots功能。

%%sql
INSERT INTO ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
SELECT *
FROM spark_catalog.${TPC_DS_DATABASE}.${TPC_DS_SALES_TABLE} where year(ws_sales_time) < 2000
%%sql
INSERT INTO ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
SELECT *
FROM spark_catalog.${TPC_DS_DATABASE}.${TPC_DS_SALES_TABLE} where year(ws_sales_time) = 2000

验证数据

首先,我们可以使用简单的select *查询验证插入。请注意,分区列years(ws_sales_time)不会在查询结果中返回。正如名称所示,Iceberg隐藏了分区信息。

%%sql
SELECT * 
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
LIMIT 10

image-20250315122016989

打印出web_sales表的S3文件夹,以验证表是按年份分区的

show_tables_files(ICEBERG_SALES_TABLE)

image-20250315122212703

对Iceberg表执行的每个更改数据的操作都会生成表的新快照。快照表示表在特定时间点的状态。Iceberg确保表的读取者始终看到一致的快照,即使写入者同时修改表(快照隔离)。

我们可以使用SQL查询检查最新N个快照的详细信息:

SELECT * FROM iceberg_db.iceberg_table.snapshots ORDER BY committed_at DESC LIMIT {N}

检查第一个快照的摘要,我们可以看到:

  • 两个分区被更改,changed-partition-count。第一个INSERT触及了1998年和1999年的分区
  • added-data-files的数量反映了在1998年和1999年分区内创建的数据文件数量
df=spark.sql(f"""
SELECT summary
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at
""")

print(json.dumps(df.first()['summary'], sort_keys=True, indent=4))

image-20250315122438221

检查第二个快照的摘要,我们可以看到:

  • 一个分区被更改,changed-partition-count。第二个INSERT触及了2000年的分区
  • added-data-files的数量反映了在2000年分区内创建的数据文件数量
df=spark.sql(f"""
SELECT summary
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at DESC
""")

print(json.dumps(df.first()['summary'], sort_keys=True, indent=4))

image-20250315122513457

隐藏分区的Partition Purning

尽管Iceberg隐藏了查询时的分区信息,但在读取时,Iceberg使用分区规范自动将查询转换为分区查询。

以下查询对ws_sales_time字段执行范围过滤,该字段是分区字段。因此,Iceberg隐藏分区生效,自动将查询转换为分区查询,防止全表扫描:

%%sql
SELECT COUNT(DISTINCT(ws_order_number)) AS num_orders
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
WHERE ws_sales_time >= TIMESTAMP('2000-01-01 00:00:00') AND ws_sales_time < TIMESTAMP('2000-02-01 00:00:00')

image-20250315154611916

隐藏分区让Iceberg自动避免读取不必要的分区。消费者不需要知道表是如何分区的,也不需要向查询添加额外的过滤条件。