S3上的数据湖成为很多企业数据的默认存储库,并成为数据分析和机器学习 (ML) 的共同选择。通常,我们将数据从多个来源持续提取到数据湖中,并通过许多分析工具与事务同时查询数据湖。
在S3数据湖上支持 ACID 事务、time travel等功能已成为越来越受欢迎的需求,以便构建运行分析查询并返回一致且最新的结果。AWS 支持在 S3 数据湖上实施事务功能,包括 Apache Hudi、Apache Iceberg、AWS Lake Formation governed table和开源的 DeltaLake。
传统上,客户使用 Hive 或 Presto 作为 S3 数据湖之上的 SQL 引擎来查询数据。然而,这两个 SQL 引擎本质上都不具备构建事务数据湖所需的 ACID 合规性。事务数据湖需要 ACID 事务、并发控制、time travel以及并发upsert / insert
等特性来构建处理 PB 级数据的各种用例。
EMR 旨在提供多种选项来构建事务型数据湖, 目前,EMR 上有 3 种开放表格式可供选择,即Apache Hudi , Iceberg和Delta Lake
以下功能是使用现代表格式增强数据湖的结果:
upsert / delete
- CDC 和 GDPR 所需本节我们先来介绍Hudi
Hudi 是一个开源数据管理框架,用于通过提供记录级insert, update, upsert, delete功能来简化增量数据处理和数据管道开发。通过有效管理数据在 S3 中的布局方式,Hudi 允许近乎实时地提取和更新数据。Hudi 仔细维护对数据集执行的操作的元数据,以帮助确保操作的原子性和一致性。
在本节中,我们将学习如何创建 hudi 表并更新记录。
下载下面文件,解压,并将文件夹上传到之前的S3桶:
https://pingfan.s3.amazonaws.com/files/Hudi-scripts.zip
上传后:
Apache Hudi 提供了两种类型的存储格式来创建 hudi 表 - merge on read和copy on write。
在Spark application中提交作业 :
设置job名称(如hudi_copy_on_write
),选择EMRServerlessS3RuntimeRole
。
Script Location设置为上面的hudi-cow.py
;Script arguments是输出的S3桶,这里也设置为相同的桶:
hudi-cow.py
内容如下:
import sys
from pyspark.sql import SparkSession
if len(sys.argv) == 1:
print('no arguments passed')
sys.exit()
spark = SparkSession\
.builder\
.appName("hudi_cow")\
.getOrCreate()
# General Constants
OUTPUT_BUCKET="s3://"+sys.argv[1]+"/hudi/hudi_trips_cow"
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
DELETE_OPERATION_OPT_VAL = "delete"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
KEEP_LATEST_FILE_VERSIONS = "KEEP_LATEST_FILE_VERSIONS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
HUDI_FILES_RETAINED = "hoodie.cleaner.fileversions.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class.key()"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"
# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"
# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"
#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.query.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"
#Bootstrap Constants
BOOTSTRAP_BASE_PATH_PROP="hoodie.bootstrap.base.path"
BOOTSTRAP_KEYGEN_CLASS="hoodie.bootstrap.keygen.class"
## Generates Data
from datetime import datetime
def get_json_data(start, count, dest):
time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
return data
# Creates the Dataframe
def create_json_df(spark, data):
sc = spark.sparkContext
return spark.read.json(sc.parallelize(data))
## CHANGE ME ##
config = {
"table_name": "hudi_trips_cow",
"primary_key": "trip_id",
"sort_key": "tstamp"
}
dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
df1 = create_json_df(spark, get_json_data(0, 2000000, dest))
(df1.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
.option(BULK_INSERT_PARALLELISM, 10)
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.mode("Overwrite")
.save(OUTPUT_BUCKET))
在 Spark properties下,选择Edit in text
并使用以下配置:
--conf spark.jars=/usr/lib/hudi/hudi-spark-bundle.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
提交Job,检查运行状态以确定Job完成。
现在我们的 hudi copy on write表已创建,让我们验证Athena 的记录数。在 Athena 查询编辑器上运行以下查询:
注意- EMR Serverless 目前不支持交互式分析, 因此不得不使用其他平台
select count(*) from default.hudi_trips_cow
现在我们已经验证了我们的 hudi 表。让我们执行upsert操作。但首先通过在 athena 上运行以下查询来验证我们要修改的记录:
select trip_id, route_id, tstamp, destination from default.hudi_trips_cow where trip_id between 999996 and 1000013
让我们提交另一个作业,将行程 ID 1000000到1000010的destination都更改为一样的。
与上面步骤一样,再提交一个新的job,然后选择hudi-upsert-cow.py
,其他参数保持一致:
一旦Job完成。通过在 athena 上再次运行以下查询来验证更新的行。我们会注意到 trip_id 1000000 到 1000010 的destination列已更新为Boston:
select trip_id, route_id, tstamp, destination from default.hudi_trips_cow where trip_id between 999996 and 1000013
hudi-upsert-cow.py
内容如下:
import sys
from pyspark.sql import SparkSession
if len(sys.argv) == 1:
print('no arguments passed')
sys.exit()
spark = SparkSession\
.builder\
.appName("hudi_cow_upsert")\
.getOrCreate()
# General Constants
OUTPUT_BUCKET="s3://"+sys.argv[1]+"/hudi/hudi_trips_cow"
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
DELETE_OPERATION_OPT_VAL = "delete"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
KEEP_LATEST_FILE_VERSIONS = "KEEP_LATEST_FILE_VERSIONS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
HUDI_FILES_RETAINED = "hoodie.cleaner.fileversions.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class.key()"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"
# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"
# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"
#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.query.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"
#Bootstrap Constants
BOOTSTRAP_BASE_PATH_PROP="hoodie.bootstrap.base.path"
BOOTSTRAP_KEYGEN_CLASS="hoodie.bootstrap.keygen.class"
## Generates Data
from datetime import datetime
def get_json_data(start, count, dest):
time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
return data
# Creates the Dataframe
def create_json_df(spark, data):
sc = spark.sparkContext
return spark.read.json(sc.parallelize(data))
## CHANGE ME ##
config = {
"table_name": "hudi_trips_cow",
"primary_key": "trip_id",
"sort_key": "tstamp"
}
upsert_dest = ["Boston", "Boston", "Boston", "Boston", "Boston","Boston","Boston","Boston","Boston","Boston"]
df = create_json_df(spark, get_json_data(1000000, 10, upsert_dest))
(df.write.format(HUDI_FORMAT)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(TABLE_NAME, config['table_name'])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option("hoodie.cleaner.commits.retained",2)
.option(UPSERT_PARALLELISM, 10)
.option(HIVE_TABLE_OPT_KEY,config['table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.mode("Append")
.save(OUTPUT_BUCKET))