与Glue MetaStore集成

在本节,我们将使用 EMR Serverless 运行 Spark ETL 作业,并与 AWS Glue MetaStore 交互以创建表。

在下面的代码中,Spark从S3 读取nyc-taxi-trip数据。该脚本将数据以 parquet 格式写入S3,最后在 Glue 中创建数据库, 并在数据库中创建一个表以再次运行查询。请注意,输入、输出位置和数据库名称均作为参数:

import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 4):
        print("Usage: spark-etl-glue [input-folder] [output-folder] [dbName]")
        sys.exit(0)

    spark = SparkSession\
        .builder\
        .appName("Python Spark SQL Glue integration example")\
        .enableHiveSupport()\
        .getOrCreate()

    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])

    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))

    updatedNYTaxi.printSchema()

    print(updatedNYTaxi.show())

    print("Total number of records: " + str(updatedNYTaxi.count()))
    
    updatedNYTaxi.write.parquet(sys.argv[2])

    updatedNYTaxi.registerTempTable("ny_taxi_table")

    dbName = sys.argv[3]
    spark.sql("CREATE database if not exists " + dbName)
    spark.sql("USE " + dbName)
    spark.sql("CREATE table if not exists ny_taxi_parquet USING PARQUET LOCATION '" + sys.argv[2] + "' AS SELECT * from ny_taxi_table ")

    spark.stop()

从 EMR Serverless 控制台提交job:

image-20231221155752590

job参数如下:

Name Spark-ETL-Glue-Metastore
Runtime role EMRServerlessS3RuntimeRole
Script location s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/spark-etl-glue.py
Script arguments [“s3://aws-data-analytics-workshops/shared_datasets/tripdata/” , “$S3_Bucket/taxi-data-glue”, “tripdata”] , 将$S3_bucket替换为自己的

image-20231221160213685

  • S3 输出位置和数据库名称将作为参数传递给脚本,这作为Script Arguments传递

  • 从控制台提交作业时,Glue Catalog配置会自动传递到作业: –conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

点击Submit Job。单击进入job,然后单击“Spark properties”选项,验证这个配置确实是传递过去了:

image-20231221160423298

如果用CLI提交job,需要指定这个配置:

aws emr-serverless start-job-run --application-id ${APPLICATION_ID} --execution-role-arn ${JOB_ROLE_ARN} --name "Spark-ETL-Glue-Metastore-CLI" --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/spark-etl-glue.py",
            "entryPointArguments": [
          "s3://aws-data-analytics-workshops/shared_datasets/tripdata/","'"$S3_BUCKET"'/taxi-data-glue/","tripdata"
        ],
            "sparkSubmitParameters": "--conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1 --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
    }' --configuration-overrides '{
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {
            "logUri": "'"$S3_BUCKET"'/logs/"
        }
    }
}'

等待job运行成功,检查 S3 存储桶中的输出,会注意到创建了一个新文件夹taxi-data-glue

image-20231221160738524

进入athena并检查数据库和表,运行查询进行验证:

image-20231221162446852