使用AWS控制台、AWS CLI
或EMR API
都可以添加EMR Step。
Step的数量没有限制,但一个群集同时支持最多256个PENDING和RUNNING状态的Step,例如安装Apache Pig、安装Hive、安装HBase等。可以并行执行步骤以缩短执行时间。
在创建集群时配置可以EMR步骤(EMR Steps)
,也可以在创建后配置。本实验将采用后面这种方式。
当完成EMR的开发工作并准备好ETL脚本时,就可以使用EMR Step来提交任务了。让我们通过EMR Step方式来运行上一节的ETL任务
将以下代码保存为spark-etl.py
(注意最后一行代码有变化),将其上传至之前创建的S3桶的files
目录下。
cat <<EOT >> spark-etl.py
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 3):
print("Usage: spark-etl [input-folder] [output-folder]")
sys.exit(0)
spark = SparkSession\
.builder\
.appName("SparkETL")\
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.printSchema()
print(updatedNYTaxi.show())
print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.format("parquet").mode("overwrite").save(sys.argv[2])
EOT
aws s3 cp spark-etl.py s3://<bucket-name>/files/
现在将这个python文件以Steps的方式提交到EMR。 在EMR集群的界面上,点击Add step:
步骤类型选择Custom Jar
; JAR Location输入command-runner.jar
Arguments
部分输入以下代码,将桶名根据实际情况作替换:
spark-submit s3://<bucketname>/files/spark-etl.py s3://<bucketname>/input s3://<bucketname>/output
点击添加,在Tab页查看执行的进度:
点击查看日志,可以查看执行中过程中的stdout日志。
一旦任务执行完成,在S3的 output/ 目录下会生成文件