Transaction型数据湖 - Hudi II

Merge On Read

和上面提交Job的流程一样,提交一个新的Job,命名为hudi_merge_on_read

Spark properties为:

--conf spark.jars=/usr/lib/hudi/hudi-spark-bundle.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer

脚本使用hudi-mor.py:

image-20231222105258503

提交Job等待运行完成。

它将创建两个表:_ro(Read Optimized)_rt(Real Time)。可以参考: https://aws.amazon.com/blogs/big-data/part-1-query-an-apache-hudi-dataset-in-an-amazon-s3-data-lake-with-amazon-athena-part-1-read-optimized-queries/

在 Athena 上运行以下查询以验证这些表中的数据。

select count(*) from default.hudi_trips_mor_ro
select count(*) from default.hudi_trips_mor_rt

image-20231222105825393

现在让我们更新一些记录, 并看看read optimizedreal time的行为如何。我们将更新行程 ID 为 1000000 到 1000010 的记录。因此,让我们验证两种查询类型中的记录。两者将具有相同的数据。

select trip_id, route_id, tstamp, destination from default.hudi_trips_mor_ro where trip_id between 999996 and 1000013
select trip_id, route_id, tstamp, destination from default.hudi_trips_mor_rt where trip_id between 999996 and 1000013

image-20231222105859273

提交一个作业,将行程 ID 1000000到1000010的destination作更新 。提交一个新的Job,选择hudi-upsert-mor.py,其他参数跟之前一致:

image-20231222110949144

提交Job,等待完成。

在 Athena 上运行以下查询,会注意到 hudi_trips_mor_rt_ro 已将 trip_id 1000000 到 1000010 的destination作更新:

select trip_id, route_id, tstamp, destination from default.hudi_trips_mor_rt where trip_id between 999996 and 1000013
select trip_id, route_id, tstamp, destination from default.hudi_trips_mor_ro where trip_id between 999996 and 1000013

针对读取优化运行查询将比实时运行查询更快,实时查询始终提供最新的数据快照

image-20231222112551944

注意他们扫描数据量的区别:

image-20231222112625707

hudi-mor.py代码:

import sys
from pyspark.sql import SparkSession

if len(sys.argv) == 1:
   print('no arguments passed')
   sys.exit()

spark = SparkSession\
        .builder\
        .appName("hudi_mor")\
        .getOrCreate()


# General Constants
OUTPUT_BUCKET="s3://"+sys.argv[1]+"/hudi/hudi_trips_mor"
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
DELETE_OPERATION_OPT_VAL = "delete"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
STORAGE_TYPE_OPT_KEY="hoodie.datasource.write.storage.type"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
KEEP_LATEST_FILE_VERSIONS = "KEEP_LATEST_FILE_VERSIONS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
HUDI_FILES_RETAINED = "hoodie.cleaner.fileversions.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class.key()"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.query.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

#Bootstrap Constants
BOOTSTRAP_BASE_PATH_PROP="hoodie.bootstrap.base.path"
BOOTSTRAP_KEYGEN_CLASS="hoodie.bootstrap.keygen.class"


## Generates Data

from datetime import datetime

def get_json_data(start, count, dest):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))



 ## CHANGE ME ##
config = {
    "table_name": "hudi_trips_mor",
    "primary_key": "trip_id",
    "sort_key": "tstamp"
}


dest = ["Seattle", "New York", "New Jersey", "Los Angeles", "Las Vegas", "Tucson","Washington DC","Philadelphia","Miami","San Francisco"]
df1 = create_json_df(spark, get_json_data(0, 2000000, dest))


(df1.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
      .option(BULK_INSERT_PARALLELISM, 10)
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
      .mode("Overwrite")
      .save(OUTPUT_BUCKET))

hudi_upsert_mor.py:

import sys
from pyspark.sql import SparkSession

if len(sys.argv) == 1:
   print('no arguments passed')
   sys.exit()

spark = SparkSession\
        .builder\
        .appName("hudi_cow")\
        .getOrCreate()


# General Constants
OUTPUT_BUCKET="s3://"+sys.argv[1]+"/hudi/hudi_trips_mor"
HUDI_FORMAT = "org.apache.hudi"
TABLE_NAME = "hoodie.table.name"
RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
UPSERT_OPERATION_OPT_VAL = "upsert"
DELETE_OPERATION_OPT_VAL = "delete"
BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
STORAGE_TYPE_OPT_KEY="hoodie.datasource.write.storage.type"
HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
KEEP_LATEST_FILE_VERSIONS = "KEEP_LATEST_FILE_VERSIONS"
HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
HUDI_FILES_RETAINED = "hoodie.cleaner.fileversions.retained"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class.key()"
EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.EmptyHoodieRecordPayload"

# Hive Constants
HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

# Partition Constants
NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

#Incremental Constants
VIEW_TYPE_OPT_KEY="hoodie.datasource.query.type"
BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

#Bootstrap Constants
BOOTSTRAP_BASE_PATH_PROP="hoodie.bootstrap.base.path"
BOOTSTRAP_KEYGEN_CLASS="hoodie.bootstrap.keygen.class"


## Generates Data

from datetime import datetime

def get_json_data(start, count, dest):
    time_stamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    data = [{"trip_id": i, "tstamp": time_stamp, "route_id": chr(65 + (i % 10)), "destination": dest[i%10]} for i in range(start, start + count)]
    return data

# Creates the Dataframe
def create_json_df(spark, data):
    sc = spark.sparkContext
    return spark.read.json(sc.parallelize(data))



 ## CHANGE ME ##
config = {
    "table_name": "hudi_trips_mor",
    "primary_key": "trip_id",
    "sort_key": "tstamp"
}


upsert_dest = ["Boston", "Boston", "Boston", "Boston", "Boston","Boston","Boston","Boston","Boston","Boston"]
df = create_json_df(spark, get_json_data(1000000, 10, upsert_dest))


(df.write.format(HUDI_FORMAT)
      .option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
      .option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
      .option(TABLE_NAME, config['table_name'])
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .option(UPSERT_PARALLELISM, 10)
      .option(HIVE_TABLE_OPT_KEY,config['table_name'])
      .option("hoodie.compact.inline.max.delta.commits",2)
      .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
      .option(STORAGE_TYPE_OPT_KEY, "MERGE_ON_READ")
      .mode("Append")
      .save(OUTPUT_BUCKET))