打开之前使用的EMR Studio:
下载此notebook: https://pingfan.s3.amazonaws.com/files/emr_iceberg_advanced.ipynb
使用 Upload Files 按钮上传:
上传后双击notebook打开它,确认右上角的内核设置为 Pyspark
。
等待内核进入 Idle 状态(左下角):
之前我们测试了Iceberg表的创建、删除、更新、Time Travel和Schema Evolution。现在,我们将使用包含7000万销售事件的更大表来评估Iceberg的性能效率。
我们将使用TPC-DS 的web_sales表的修改版本,称为prepared_web_sales
以下步骤和之前一样,创建了一个名为iceberg_catalog的新Spark Catalog,该目录与Glue Data Catalog兼容,并支持Iceberg表。spark.sql.catalog.<catalog_name>.warehouse
属性用于设置iceberg_catalog中创建的数据库和表的默认位置。更新属性spark.sql.catalog.iceberg_catalog.warehouse
,并将**<your-account-id>**占位符替换为我们当前的账户ID。
%%configure -f
{
"numExecutors": 3,
"executorMemory": "14G",
"executorCores": 4,
"conf":{
"spark.sql.catalog.iceberg_catalog.warehouse":"s3://otfs-workshop-data-<your-account-id>/datasets/emr_iceberg/",
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.iceberg_catalog":"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.iceberg_catalog.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
"spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.dynamicAllocation.enabled": "false"
}
}
更新属性BUCKET_NAME并将<your-account-id>
占位符替换为我们当前的账户ID:
#Modify this variable
BUCKET_NAME = "otfs-workshop-data-<your-account-id>"
# python configs - do not modify
SERVICE = 'emr'
TPC_DS_DATABASE = "tpcds"
TPC_DS_CUSTOMER_TABLE = "prepared_customer"
TPC_DS_SALES_TABLE = "prepared_web_sales"
ICEBERG_CATALOG="iceberg_catalog"
ICEBERG_DB = f"{SERVICE}_iceberg_db"
ICEBERG_CUSTOMER_TABLE = "customer_iceberg"
ICEBERG_SALES_TABLE = "web_sales_iceberg"
ICEBERG_CATALOG_PATH=f"datasets/{SERVICE}_iceberg"
ICEBERG_DATABASE_PATH=f"{ICEBERG_CATALOG_PATH}/{ICEBERG_DB}.db"
WAREHOUSE_PATH=f"s3://{BUCKET_NAME}/{ICEBERG_CATALOG_PATH}"
# sparkmagic SQL configs - do not modify
spark.conf.set('ICEBERG_CATALOG', ICEBERG_CATALOG)
spark.conf.set('ICEBERG_DB', ICEBERG_DB)
spark.conf.set('ICEBERG_CUSTOMER_TABLE', ICEBERG_CUSTOMER_TABLE)
spark.conf.set('ICEBERG_SALES_TABLE', ICEBERG_SALES_TABLE)
spark.conf.set('TPC_DS_DATABASE', TPC_DS_DATABASE)
spark.conf.set('TPC_DS_CUSTOMER_TABLE', TPC_DS_CUSTOMER_TABLE)
spark.conf.set('TPC_DS_SALES_TABLE', TPC_DS_SALES_TABLE)
导入了一些将在整个notebook中使用的库:
import boto3
import json
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import col
from datetime import datetime, timedelta
s3_client = boto3.client('s3')
current_region = s3_client.meta.region_name
glue_client = boto3.client('glue', current_region)
def list_files_recursively(bucket_name, prefix):
paginator = s3_client.get_paginator('list_objects_v2')
operation_parameters = {'Bucket': bucket_name, 'Prefix': prefix}
files_dict = {}
for page in paginator.paginate(**operation_parameters):
if 'Contents' in page:
for obj in page['Contents']:
file_key = obj['Key']
if file_key.endswith('/'): # It's a directory
relative_key = file_key[len(prefix):] # Removing the prefix
if relative_key: # Check if not an empty string
folders = relative_key.split('/')[:-1] # Remove the last empty string element
current_dict = files_dict
for folder in folders:
if folder not in current_dict:
……
下面们配置**%%sql**魔术命令以显示每列更多字符,以便能够阅读表中的所有内容:
%%local
import pandas as pd
pd.set_option('display.max_colwidth', 10000)
切换到Iceberg目录。
%%sql
USE iceberg_catalog
创建数据库:
%%sql
CREATE DATABASE IF NOT EXISTS ${ICEBERG_DB}
隐藏分区是 Iceberg 表结构中的一个关键概念,指的是分区信息完全由 Iceberg 在元数据中管理,对用户和查询引擎透明。与传统的 Hive 风格分区不同,隐藏分区不会在文件路径中显式表示分区值。
Hive 分区(显式分区):
/data/table/date=2023-01-01/id=123/file.parquet
Iceberg 隐藏分区:
/data/table/00000-3-8d6d60e8-d427-4400-fc90-814329312c5c-00000.parquet
分区信息存储在元数据中,不在路径中体现。优势是:
路径简洁:文件路径不再包含分区信息,避免了长路径问题
查询优化:查询引擎可直接使用元数据进行分区裁剪,无需路径解析
假设我们当前以hive格式存储销售表,按年份分区。为了编写高性能查询,终端用户必须了解表分区结构。
Iceberg的隐藏分区是对Hive方法的改进,使分区变得声明性。创建表时,我们可以使用称为分区规范
的构造来配置如何对其进行分区,如day(event_ts)
。在读取时,Iceberg使用这些关系自动将查询转换为分区查询。
选择使用years(ws_sales_time)
表达式按销售年份对数据进行分区:
%%sql
CREATE TABLE ${ICEBERG_DB}.${ICEBERG_SALES_TABLE} (
ws_order_number INT,
ws_item_sk INT,
ws_quantity INT,
ws_sales_price DOUBLE,
ws_warehouse_sk INT,
ws_sales_time TIMESTAMP
) USING iceberg
TBLPROPERTIES (
'format-version'='2'
)
COMMENT 'This table contains sales transaction data'
PARTITIONED BY (
years(ws_sales_time)
)
我们首先插入2001年之前的所有销售记录。以下是该数据的示例。
%%sql
SELECT *
FROM spark_catalog.${TPC_DS_DATABASE}.${TPC_DS_SALES_TABLE} where year(ws_sales_time) < 2001
LIMIT 10
如果上面查询执行失败,是cloudformation没有创建好这个表。
在athena中找到这个query并执行:
执行结果如下:
Iceberg 1.4.0默认将hash
设置为分区表的write.distribution-mode
。这意味着在写入之前,数据集按分区字段进行哈希分区。这种方式确保具有相同分区值的记录被发送到同一个写入任务,同时所有写入任务的工作量尽可能均衡。 这样减少小文件问题:相同分区的数据集中写入,生成更合适大小的文件
我们现在执行两个插入操作,第一个插入1998年和1999年的数据,第二个插入2000年的数据。我们这样做是为了生成两个单独的快照,因为我们稍后需要展示Iceberg的expire_snapshots
功能。
%%sql
INSERT INTO ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
SELECT *
FROM spark_catalog.${TPC_DS_DATABASE}.${TPC_DS_SALES_TABLE} where year(ws_sales_time) < 2000
%%sql
INSERT INTO ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
SELECT *
FROM spark_catalog.${TPC_DS_DATABASE}.${TPC_DS_SALES_TABLE} where year(ws_sales_time) = 2000
首先,我们可以使用简单的select *查询验证插入。请注意,分区列years(ws_sales_time)
不会在查询结果中返回。正如名称所示,Iceberg隐藏了分区信息。
%%sql
SELECT *
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
LIMIT 10
打印出web_sales表的S3文件夹,以验证表是按年份分区的
show_tables_files(ICEBERG_SALES_TABLE)
对Iceberg表执行的每个更改数据的操作都会生成表的新快照。快照表示表在特定时间点的状态。Iceberg确保表的读取者始终看到一致的快照,即使写入者同时修改表(快照隔离)。
我们可以使用SQL查询检查最新N个快照的详细信息:
SELECT * FROM iceberg_db.iceberg_table.snapshots ORDER BY committed_at DESC LIMIT {N}
检查第一个快照的摘要,我们可以看到:
df=spark.sql(f"""
SELECT summary
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at
""")
print(json.dumps(df.first()['summary'], sort_keys=True, indent=4))
检查第二个快照的摘要,我们可以看到:
df=spark.sql(f"""
SELECT summary
FROM {ICEBERG_DB}.{ICEBERG_SALES_TABLE}.snapshots
ORDER BY committed_at DESC
""")
print(json.dumps(df.first()['summary'], sort_keys=True, indent=4))
尽管Iceberg隐藏了查询时的分区信息,但在读取时,Iceberg使用分区规范自动将查询转换为分区查询。
以下查询对ws_sales_time
字段执行范围过滤,该字段是分区字段。因此,Iceberg隐藏分区生效,自动将查询转换为分区查询,防止全表扫描:
%%sql
SELECT COUNT(DISTINCT(ws_order_number)) AS num_orders
FROM ${ICEBERG_DB}.${ICEBERG_SALES_TABLE}
WHERE ws_sales_time >= TIMESTAMP('2000-01-01 00:00:00') AND ws_sales_time < TIMESTAMP('2000-02-01 00:00:00')
隐藏分区让Iceberg自动避免读取不必要的分区。消费者不需要知道表是如何分区的,也不需要向查询添加额外的过滤条件。