使用CLI执行任务

  1. 在AWS控制台进入S3界面。
  2. 创建一个S3桶( 随意命名 ),并在桶内创建以下文件夹:
    • files
    • logs
    • input
    • outputimage-20210430190620899
  3. 下载CSV文件(1.8MB): https://s3.amazonaws.com/aws-data-analytics-blog/emrimmersionday/tripdata.csv
  4. 将文件上传到刚刚创建S3的input目录下。
  5. 使用Cloud9 IDE登录EMR集群(参考上一节)
ssh -i <<key-pair>> hadoop@<<emr-master-public-dns-address>>
  1. 登录EMR终端后,使用命令行创建一个新文件 spark-etl.py
cat <<EOT >> spark-etl.py
import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 3):
        print("Usage: spark-etl [input-folder] [output-folder]")
        sys.exit(0)

    spark = SparkSession\
        .builder\
        .appName("SparkETL")\
        .getOrCreate()

    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])

    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))

    updatedNYTaxi.printSchema()

    print(updatedNYTaxi.show())

    print("Total number of records: " + str(updatedNYTaxi.count()))

    updatedNYTaxi.write.parquet(sys.argv[2])
EOT

向集群提交pySpark任务。 这个Spark任务将获取S3 input目录下的数据,增加一列"current_date”,将结果以Parquet文件形式写入到S3的output目录。在Cloud9终端执行以下命令(将桶名替换):

spark-submit spark-etl.py s3://<YOUR-BUCKET>/input/ s3://<YOUR-BUCKET>/output/spark

等待约半分钟后执行完成,检查S3的output/spark目录,将会看到输出结果:

image-20210430191313586