在本节,我们将使用 EMR Serverless 运行 Spark ETL 作业,并与 AWS Glue MetaStore 交互以创建表。
在下面的代码中,Spark从S3 读取nyc-taxi-trip
数据。该脚本将数据以 parquet 格式写入S3,最后在 Glue 中创建数据库, 并在数据库中创建一个表以再次运行查询。请注意,输入、输出位置和数据库名称均作为参数:
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 4):
print("Usage: spark-etl-glue [input-folder] [output-folder] [dbName]")
sys.exit(0)
spark = SparkSession\
.builder\
.appName("Python Spark SQL Glue integration example")\
.enableHiveSupport()\
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.printSchema()
print(updatedNYTaxi.show())
print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet(sys.argv[2])
updatedNYTaxi.registerTempTable("ny_taxi_table")
dbName = sys.argv[3]
spark.sql("CREATE database if not exists " + dbName)
spark.sql("USE " + dbName)
spark.sql("CREATE table if not exists ny_taxi_parquet USING PARQUET LOCATION '" + sys.argv[2] + "' AS SELECT * from ny_taxi_table ")
spark.stop()
从 EMR Serverless 控制台提交job:
job参数如下:
Name | Spark-ETL-Glue-Metastore |
Runtime role | EMRServerlessS3RuntimeRole |
Script location | s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/spark-etl-glue.py |
Script arguments | [“s3://aws-data-analytics-workshops/shared_datasets/tripdata/” , “$S3_Bucket/taxi-data-glue”, “tripdata”] , 将$S3_bucket替换为自己的 |
S3 输出位置和数据库名称将作为参数传递给脚本,这作为Script Arguments传递
从控制台提交作业时,Glue Catalog配置会自动传递到作业: –conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
点击Submit Job。单击进入job,然后单击“Spark properties”选项,验证这个配置确实是传递过去了:
如果用CLI提交job,需要指定这个配置:
aws emr-serverless start-job-run --application-id ${APPLICATION_ID} --execution-role-arn ${JOB_ROLE_ARN} --name "Spark-ETL-Glue-Metastore-CLI" --job-driver '{
"sparkSubmit": {
"entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/spark-etl-glue.py",
"entryPointArguments": [
"s3://aws-data-analytics-workshops/shared_datasets/tripdata/","'"$S3_BUCKET"'/taxi-data-glue/","tripdata"
],
"sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1 --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}' --configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "'"$S3_BUCKET"'/logs/"
}
}
}'
等待job运行成功,检查 S3 存储桶中的输出,会注意到创建了一个新文件夹taxi-data-glue:
进入athena并检查数据库和表,运行查询进行验证: