与AWS Glue MetaStore集成

在本节中,我们将学习如何在EMR on EKS上运行Spark ETL作业,并与AWS Glue MetaStore交互并创建一个表。

在下面的代码中,Spark从Amazon S3读取NY出租车行程数据。该脚本更新timestamp列,打印schema和行数,并以parquet格式将数据写入Amazon S3,最后在AWS Glue中创建一个数据库并在该数据库中创建一个表以再次运行查询。

输入、输出位置和数据库名称作为参数传递:

import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 4):
        print("Usage: spark-etl-glue [input-folder] [output-folder] [dbName]")
        sys.exit(0)

    spark = SparkSession\
        .builder\
        .appName("Python Spark SQL Glue integration example")\
        .enableHiveSupport()\
        .getOrCreate()

    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])

    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))

    updatedNYTaxi.printSchema()
    print(updatedNYTaxi.show())
    print("Total number of records: " + str(updatedNYTaxi.count()))
    updatedNYTaxi.write.parquet(sys.argv[2])

    updatedNYTaxi.registerTempTable("ny_taxi_table")
    dbName = sys.argv[3]
    spark.sql("CREATE database if not exists " + dbName)
    spark.sql("USE " + dbName)
    
    spark.sql("CREATE table if not exists ny_taxi_parquet USING PARQUET LOCATION '" + sys.argv[2] + "' AS SELECT * from ny_taxi_table ")

在EMR on EKS集群上执行Spark ETL Glue作业。S3输出位置和数据库名称将作为参数传递给该脚本, 这些参数将作为entryPointArguments传递。

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-etl-s3-awsglue-integration \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.35.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/spark-etl-glue.py",
        "entryPointArguments": [
          "s3://aws-data-analytics-workshops/shared_datasets/tripdata/","'"$S3_BUCKET"'/taxi-data-glue/","tripdata"
        ],
        "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
         }
      }
    ], 
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs", 
        "logStreamNamePrefix": "emr-eks-workshop"
      }, 
      "s3MonitoringConfiguration": {
        "logUri": "'"$S3_BUCKET"'/logs/"
      }
    }
}'

等待任务执行完成,在s3下会有新的目录:

image-20240620112037969

进入glue服务,创建了一个新的数据库tripdata:

image-20240620114143454

里面创建了一张表:

image-20240620154126684

查询数据

从AWS控制台进入Amazon Athena, 检查数据库和表, 并运行查询进行验证:

image-20240620160005929