Spark ETL 任务

在本节中,我们将学习如何在 EMR on EMR 运行 Spark ETL 任务。

在下面的代码中,Spark 从 Amazon S3 读取 NY 出租车行程数据。该脚本更新时间戳列,打印schema和行数,最后以 parquet 格式将数据写入 Amazon S3。最后一部分可能需要一些时间,具体取决于 EKS 集群的大小。

请注意,输入和输出位置作为参数传递。

import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 3):
        print("Usage: spark-etl [input-folder] [output-folder]")
        sys.exit(0)

    spark = SparkSession\
        .builder\
        .appName("SparkETL")\
        .getOrCreate()

    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
    updatedNYTaxi.printSchema()
    print(updatedNYTaxi.show())
    print("Total number of records: " + str(updatedNYTaxi.count()))  
    updatedNYTaxi.write.parquet(sys.argv[2])

运行以下命令,在 EMR on EKS 集群上执行 Spark ETL 任务。S3 输出位置将使用entryPointArguments参数传递给该脚本:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-etl \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-6.2.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/spark-etl.py",
        "entryPointArguments": ["s3://aws-data-analytics-workshops/shared_datasets/tripdata/",
          "'"$S3_BUCKET"'/taxi-data/"
        ],
        "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }'\
    --configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.driver.memory":"2G"
         }
      }
    ], 
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs", 
        "logStreamNamePrefix": "emr-eks-workshop"
      }, 
      "s3MonitoringConfiguration": {
        "logUri": "'"$S3_BUCKET"'/logs/"
      }
    }
}'

等待ETL任务执行完成后,到s3桶查看输出:

image-20240620111056397