在本节中,我们将使用 EMR Serverless 运行 Spark ETL 作业,并与 RDS Hive Metastore 交互以创建表
Hive MetaStore是一个集中位置,用于存储有关表的结构信息,包括架构、分区名称和数据类型。
请注意,在EMR Serverless application 上提交作业时,MetaStore不会自动初始化。
在 EMR Serveless 中调度 Spark 作业之前,需要创建所需的 AWS 资源。为此,我们提供 AWS CloudFormation 模板来创建包含资源的堆栈。当您创建堆栈时,AWS 会在您的账户中创建以下资源:
要启动 CloudFormation 堆栈,请单击启动堆栈。
CloudFormation 堆栈将创建研讨会所需的必要资源。
CloudFormation 堆栈大约需要 15-20 分钟才能完成。
在 CloudFormation 控制台中等待状态CREATE_COMPLETE,如下所示:
1
2
3
4
5
6
7
8
9
10
sudo yum install jq -y
export MariaDBHost=<Replace with EMRServerlessHiveMetastoreDB>
export JOB_ROLE_ARN=<Replace with JobRoleArn>
export S3_BUCKET=<Replace with S3Bucket>
export SPARK_APPLICATION_ID=<Replace with SparkApplicationId>
export SECRET_ID=<Replace with RDSSecretId>
export DBUSER=$(aws secretsmanager get-secret-value --secret-id $SECRET_ID | jq --raw-output '.SecretString' | jq -r .MasterUsername)
export DBPASSWORD=$(aws secretsmanager get-secret-value --secret-id $SECRET_ID | jq --raw-output '.SecretString' | jq -r .MasterUserPassword)
export JDBCDriverClass=org.mariadb.jdbc.Driver
export JDBCDriver=mariadb-connector-java.jar
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 4):
print("Usage: spark-nyctaxi [warehouse-location] [input-folder] [output-folder]")
sys.exit(0)
print("Warehouse location: " + sys.argv[1]+"/warehouse/")
print("CSV folder path: " + sys.argv[2])
print("Writing the parquet file to the folder: " + sys.argv[3])
spark = SparkSession \
.builder \
.config("spark.sql.warehouse.dir", sys.argv[1]+"/warehouse/" ) \
.enableHiveSupport() \
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[2])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.registerTempTable("ny_taxi_table")
spark.sql("SHOW DATABASES").show()
spark.sql("CREATE DATABASE IF NOT EXISTS `hivemetastore`")
spark.sql("DROP TABLE IF EXISTS hivemetastore.ny_taxi_parquet")
updatedNYTaxi.write.option("path",sys.argv[3]).mode("overwrite").format("parquet").saveAsTable("hivemetastore.ny_taxi_parquet")
1
aws s3 cp spark-nyctaxi.py "s3://${S3_BUCKET}"
1
2
wget https://downloads.mariadb.com/Connectors/java/connector-java-3.0.7/mariadb-java-client-3.0.7.jar
aws s3 cp mariadb-java-client-3.0.7.jar "s3://${S3_BUCKET}/mariadb-connector-java.jar"
1
2
3
4
5
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install -i ~/aws-cli
alias aws=~/aws-cli/v2/current/bin/aws
aws --version
您应该看到类似于下面的输出。请注意 aws cli 的版本是 2.xx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
aws emr-serverless start-job-run \
--application-id $SPARK_APPLICATION_ID \
--execution-role-arn $JOB_ROLE_ARN \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://'${S3_BUCKET}'/spark-nyctaxi.py",
"entryPointArguments":["s3://'"${S3_BUCKET}"'/","s3://aws-data-analytics-workshops/shared_datasets/tripdata/","s3://'"${S3_BUCKET}"'/taxi-data-rds-hive-metastore/"],
"sparkSubmitParameters": "--jars s3://'${S3_BUCKET}'/'${JDBCDriver}' --conf spark.hadoop.javax.jdo.option.ConnectionDriverName='${JDBCDriverClass}' --conf spark.hadoop.javax.jdo.option.ConnectionUserName='${DBUSER}' --conf spark.hadoop.javax.jdo.option.ConnectionPassword='${DBPASSWORD}' --conf spark.hadoop.javax.jdo.option.ConnectionURL=\"jdbc:mariadb://'${MariaDBHost}':3306/hivemetastore\" --conf spark.driver.cores=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=2"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "s3://'${S3_BUCKET}'/sparklogs/"
}
}
}'
该作业将需要 8-10 分钟才能完成。
请随意在文件夹内浏览。您会注意到 parquet 文件已被写入。
1
mysql --host $MariaDBHost --user=$DBUSER --password=$DBPASSWORD hivemetastore
您可以看到包含不同元存储信息的元存储表,例如 TBLS 表中的表和 COLUMNS_V2 表中的列。
对 TBLS 表运行 SELECT 查询以查看我们在 Spark 程序中创建的表。
继续下一节。