我们将演示在EMR集群上使用多种方式提交Spark ETL任务:
JupyterHub notebook来提交EMR Steps来实现创建一个S3桶:
aws s3 mb s3://emr-workshop-xxx
往桶的input目录上传一个csv文件:
wget https://pingfan.s3.amazonaws.com/files/JC-202202-citibike-tripdata.csv
aws s3 cp JC-202202-citibike-tripdata.csv s3://emr-workshop-xxx/input/

登录EMR Primary Node后,使用命令行创建一个新文件 spark-etl.py:
cat <<EOT >> spark-etl.py
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
    print(len(sys.argv))
    if (len(sys.argv) != 3):
        print("Usage: spark-etl [input-folder] [output-folder]")
        sys.exit(0)
    spark = SparkSession\
        .builder\
        .appName("SparkETL")\
        .getOrCreate()
    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
    updatedNYTaxi.printSchema()
    print(updatedNYTaxi.show())
    print("Total number of records: " + str(updatedNYTaxi.count()))
    updatedNYTaxi.write.parquet(sys.argv[2])
EOT
这个Spark任务将:
current_date,执行以下命令(将桶名替换):
spark-submit spark-etl.py s3://<YOUR-BUCKET>/input/ s3://<YOUR-BUCKET>/output/spark

等待约半分钟后执行完成,检查S3的output/spark目录,将会看到输出结果:
