EMR Steps

使用AWS控制台、AWS CLI或EMR API都可以添加EMR步骤步骤的数量没有限制,但一个群集同时支持最多256个PENDING和RUNNING状态的步骤,例如安装Apache Pig、安装Hive、安装HBase等。可以并行执行步骤以缩短执行时间。

在创建集群时配置可以EMR步骤(EMR Steps),也可以在创建后配置。本实验将采用后面这种方式。

当你完成EMR的开发工作并准备好ETL脚本时,就可以使用EMR步骤来提交任务了。

让我们通过EMR Step方式来运行上一节的ETL任务


创建Python脚本

将以下代码保存为spark-etl.py(注意最后一行代码有变化),将其上传至之前创建的S3桶的files目录下。

import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 3):
        print("Usage: spark-etl [input-folder] [output-folder]")
        sys.exit(0)

    spark = SparkSession\
        .builder\
        .appName("SparkETL")\
        .getOrCreate()

    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])

    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))

    updatedNYTaxi.printSchema()

    print(updatedNYTaxi.show())

    print("Total number of records: " + str(updatedNYTaxi.count()))

    updatedNYTaxi.write.format("parquet").mode("overwrite").save(sys.argv[2])

添加EMR Step

现在将这个python文件以步骤的方式提交到EMR。 在EMR集群的界面上,点击添加步骤

image-20210430222854643

步骤类型选择自定义JAR

JAR位置输入command-runner.jar

自变量部分输入以下代码,将桶名根据实际情况作替换:

spark-submit s3://<bucketname>/files/spark-etl.py s3://<bucketname>/input s3://<bucketname>/output

image-20210501100151221

点击添加,在步骤Tab页查看执行的进度:

image-20210501100421119

点击查看日志,可以查看执行中过程中的stdout日志。


一旦任务执行完成,在S3的**output/**目录下会生成文件:

image-20210501100747142