Transaction型数据湖 - Hudi

CDK

S3上的数据湖成为很多企业数据的默认存储库,并成为数据分析和机器学习 (ML) 的共同选择。通常,我们将数据从多个来源持续提取到数据湖中,并通过许多分析工具与事务同时查询数据湖。

在S3数据湖上支持 ACID 事务、time travel等功能已成为越来越受欢迎的需求,以便构建运行分析查询并返回一致且最新的结果。AWS 支持在 S3 数据湖上实施事务功能,包括 Apache Hudi、Apache Iceberg、AWS Lake Formation governed table和开源的 DeltaLake。

传统上,客户使用 Hive 或 Presto 作为 S3 数据湖之上的 SQL 引擎来查询数据。然而,这两个 SQL 引擎本质上都不具备构建事务数据湖所需的 ACID 合规性。事务数据湖需要 ACID 事务、并发控制、time travel以及并发upsert / insert等特性来构建处理 PB 级数据的各种用例。

EMR 旨在提供多种选项来构建事务型数据湖, 目前,EMR 上有 3 种开放表格式可供选择,即Apache Hudi , Iceberg和Delta Lake

以下功能是使用现代表格式增强数据湖的结果:

  • 事务支持以及并发工作负载的 ACID 保证
  • 完整的模式演化 - 能够演化包括列和分区在内的模式
  • 支持不同的工作负载 - 使用多个工具从同一存储库访问数据
  • 支持upsert / delete - CDC 和 GDPR 所需
  • 数据治理——保证数据完整性并审核变更

本节我们先来介绍Hudi

Hudi 是一个开源数据管理框架,用于通过提供记录级insert, update, upsert, delete功能来简化增量数据处理和数据管道开发。通过有效管理数据在 S3 中的布局方式,Hudi 允许近乎实时地提取和更新数据。Hudi 仔细维护对数据集执行的操作的元数据,以帮助确保操作的原子性和一致性。

在本节中,我们将学习如何创建 hudi 表并更新记录。

准备

下载下面文件,解压,并将文件夹上传到之前的S3桶:

https://pingfan.s3.amazonaws.com/files/Hudi-scripts.zip

上传后:

image-20231221191313568

Apache Hudi 提供了两种类型的存储格式来创建 hudi 表 - merge on read和copy on write。

Copy on Write

在Spark application中提交作业 :

image-20231221191434316

设置job名称(如hudi_copy_on_write),选择EMRServerlessS3RuntimeRole

Script Location设置为上面的hudi-cow.py;Script arguments是输出的S3桶,这里也设置为相同的桶:

image-20231221191826966

hudi-cow.py内容如下:

import sys
from pyspark.sql import SparkSession

if len(sys.argv) == 1:
   print('no arguments passed')
   sys.exit()


spark = SparkSession\
        .builder\
        .appName("hudi_cow")\
        .getOrCreate()


# General Constants
OUTPUT_BUCKET="s3://"+sys.argv[1]+"/hudi/hudi_trips_cow"
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
DELETE_OPERATION_OPT_VAL = "delete"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
KEEP_LATEST_FILE_VERSIONS = "KEEP_LATEST_FILE_VERSIONS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
HUDI_FILES_RETAINED = "hoodie.cleaner.fileversions.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class.key()"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.query.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

#Bootstrap Constants
BOOTSTRAP_BASE_PATH_PROP="hoodie.bootstrap.base.path"
BOOTSTRAP_KEYGEN_CLASS="hoodie.bootstrap.keygen.class"


## Generates Data

from datetime import datetime

def get_json_data(start, count, dest):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))



## CHANGE ME ##
config = {
    "table_name": "hudi_trips_cow",
    "primary_key": "trip_id",
    "sort_key": "tstamp"
}


dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
df1 = create_json_df(spark, get_json_data(0, 2000000, dest))


(df1.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 10)
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .mode("Overwrite")
      .save(OUTPUT_BUCKET))

在 Spark properties下,选择Edit in text并使用以下配置:

--conf spark.jars=/usr/lib/hudi/hudi-spark-bundle.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer

image-20231221191914235

提交Job,检查运行状态以确定Job完成。

image-20231221192546697

现在我们的 hudi copy on write表已创建,让我们验证Athena 的记录数。在 Athena 查询编辑器上运行以下查询:

注意- EMR Serverless 目前不支持交互式分析, 因此不得不使用其他平台

select count(*) from default.hudi_trips_cow

image-20231221192636225

现在我们已经验证了我们的 hudi 表。让我们执行upsert操作。但首先通过在 athena 上运行以下查询来验证我们要修改的记录:

select trip_id, route_id, tstamp, destination from default.hudi_trips_cow where trip_id between 999996 and 1000013

image-20231221192713316

让我们提交另一个作业,将行程 ID 1000000到1000010的destination都更改为一样的。

与上面步骤一样,再提交一个新的job,然后选择hudi-upsert-cow.py,其他参数保持一致:

image-20231221192937060

一旦Job完成。通过在 athena 上再次运行以下查询来验证更新的行。我们会注意到 trip_id 1000000 到 1000010 的destination列已更新为Boston:

select trip_id, route_id, tstamp, destination from default.hudi_trips_cow where trip_id between 999996 and 1000013

image-20231221193214324

hudi-upsert-cow.py内容如下:

import sys
from pyspark.sql import SparkSession


if len(sys.argv) == 1:
   print('no arguments passed')
   sys.exit()


spark = SparkSession\
        .builder\
        .appName("hudi_cow_upsert")\
        .getOrCreate()


# General Constants
OUTPUT_BUCKET="s3://"+sys.argv[1]+"/hudi/hudi_trips_cow"
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
DELETE_OPERATION_OPT_VAL = "delete"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
KEEP_LATEST_FILE_VERSIONS = "KEEP_LATEST_FILE_VERSIONS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
HUDI_FILES_RETAINED = "hoodie.cleaner.fileversions.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class.key()"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.query.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

#Bootstrap Constants
BOOTSTRAP_BASE_PATH_PROP="hoodie.bootstrap.base.path"
BOOTSTRAP_KEYGEN_CLASS="hoodie.bootstrap.keygen.class"


## Generates Data

from datetime import datetime

def get_json_data(start, count, dest):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))



 ## CHANGE ME ##
config = {
    "table_name": "hudi_trips_cow",
    "primary_key": "trip_id",
    "sort_key": "tstamp"
}


upsert_dest = ["Boston", "Boston", "Boston", "Boston", "Boston","Boston","Boston","Boston","Boston","Boston"]
df = create_json_df(spark, get_json_data(1000000, 10, upsert_dest))


(df.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option("hoodie.cleaner.commits.retained",2)
      .option(UPSERT_PARALLELISM, 10)
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .mode("Append")
      .save(OUTPUT_BUCKET))