在本节中,我们将学习如何在EMR on EKS上运行Spark ETL作业,并与AWS Glue MetaStore交互并创建一个表。
在下面的代码中,Spark从Amazon S3读取NY出租车行程数据。该脚本更新timestamp列,打印schema和行数,并以parquet格式将数据写入Amazon S3,最后在AWS Glue中创建一个数据库并在该数据库中创建一个表以再次运行查询。
输入、输出位置和数据库名称作为参数传递:
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 4):
print("Usage: spark-etl-glue [input-folder] [output-folder] [dbName]")
sys.exit(0)
spark = SparkSession\
.builder\
.appName("Python Spark SQL Glue integration example")\
.enableHiveSupport()\
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.printSchema()
print(updatedNYTaxi.show())
print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet(sys.argv[2])
updatedNYTaxi.registerTempTable("ny_taxi_table")
dbName = sys.argv[3]
spark.sql("CREATE database if not exists " + dbName)
spark.sql("USE " + dbName)
spark.sql("CREATE table if not exists ny_taxi_parquet USING PARQUET LOCATION '" + sys.argv[2] + "' AS SELECT * from ny_taxi_table ")
在EMR on EKS集群上执行Spark ETL Glue作业。S3输出位置和数据库名称将作为参数传递给该脚本, 这些参数将作为entryPointArguments传递。
aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-etl-s3-awsglue-integration \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.35.0-latest \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/spark-etl-glue.py",
"entryPointArguments": [
"s3://aws-data-analytics-workshops/shared_datasets/tripdata/","'"$S3_BUCKET"'/taxi-data-glue/","tripdata"
],
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
}
}' \
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-containers/jobs",
"logStreamNamePrefix": "emr-eks-workshop"
},
"s3MonitoringConfiguration": {
"logUri": "'"$S3_BUCKET"'/logs/"
}
}
}'
等待任务执行完成,在s3下会有新的目录:
进入glue服务,创建了一个新的数据库tripdata
:
里面创建了一张表:
从AWS控制台进入Amazon Athena, 检查数据库和表, 并运行查询进行验证: