EMR Steps

  • 使用AWS控制台、AWS CLIEMR API都可以添加EMR Step

  • Step的数量没有限制,但一个群集同时支持最多256个PENDING和RUNNING状态的Step,例如安装Apache Pig、安装Hive、安装HBase等。可以并行执行步骤以缩短执行时间。

  • 在创建集群时配置可以EMR步骤(EMR Steps),也可以在创建后配置。本实验将采用后面这种方式。

当完成EMR的开发工作并准备好ETL脚本时,就可以使用EMR Step来提交任务了。让我们通过EMR Step方式来运行上一节的ETL任务

创建Python脚本

将以下代码保存为spark-etl.py(注意最后一行代码有变化),将其上传至之前创建的S3桶的files目录下。

cat <<EOT >> spark-etl.py
import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 3):
        print("Usage: spark-etl [input-folder] [output-folder]")
        sys.exit(0)

    spark = SparkSession\
        .builder\
        .appName("SparkETL")\
        .getOrCreate()

    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])

    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))

    updatedNYTaxi.printSchema()

    print(updatedNYTaxi.show())

    print("Total number of records: " + str(updatedNYTaxi.count()))

    updatedNYTaxi.write.format("parquet").mode("overwrite").save(sys.argv[2])	
EOT
aws s3 cp spark-etl.py s3://<bucket-name>/files/

添加EMR Step

现在将这个python文件以Steps的方式提交到EMR。 在EMR集群的界面上,点击Add step

image-20240306150112423

步骤类型选择Custom Jar; JAR Location输入command-runner.jar

Arguments部分输入以下代码,将桶名根据实际情况作替换:

spark-submit s3://<bucketname>/files/spark-etl.py s3://<bucketname>/input s3://<bucketname>/output

image-20240306150343561

点击添加,在Tab页查看执行的进度:

image-20240306150603915

点击查看日志,可以查看执行中过程中的stdout日志。

image-20240306150928372

image-20240306150954006

一旦任务执行完成,在S3的 output/ 目录下会生成文件