使用spark-submit执行任务

我们将演示在EMR集群上使用多种方式提交Spark ETL任务:

  • 登录到Primary使用spark-submit提交任务
  • 使用JupyterHub notebook来提交
  • 通过添加EMR Steps来实现

任务介绍

  • 从S3上读取CSV数据
  • 在数据集上新增一列当前日期
  • 将更新的数据集以Parquet格式保存到S3

数据准备

创建一个S3桶:

aws s3 mb s3://emr-workshop-xxx

往桶的input目录上传一个csv文件:

wget https://pingfan.s3.amazonaws.com/files/JC-202202-citibike-tripdata.csv
aws s3 cp JC-202202-citibike-tripdata.csv s3://emr-workshop-xxx/input/

image-20240305092657289

提交ETL任务

登录EMR Primary Node后,使用命令行创建一个新文件 spark-etl.py

cat <<EOT >> spark-etl.py
import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 3):
        print("Usage: spark-etl [input-folder] [output-folder]")
        sys.exit(0)

    spark = SparkSession\
        .builder\
        .appName("SparkETL")\
        .getOrCreate()

    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])

    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
    updatedNYTaxi.printSchema()
    print(updatedNYTaxi.show())
    print("Total number of records: " + str(updatedNYTaxi.count()))
    updatedNYTaxi.write.parquet(sys.argv[2])
EOT

这个Spark任务将:

  1. 获取S3 input目录下的数据
  2. 增加一列current_date
  3. 将结果以Parquet文件形式写入到S3的output目录。

执行以下命令(将桶名替换):

spark-submit spark-etl.py s3://<YOUR-BUCKET>/input/ s3://<YOUR-BUCKET>/output/spark

image-20240305093328230

等待约半分钟后执行完成,检查S3的output/spark目录,将会看到输出结果:

image-20240305093431124