我们将演示在EMR集群上使用多种方式提交Spark ETL
任务:
JupyterHub notebook
来提交EMR Steps
来实现创建一个S3桶:
aws s3 mb s3://emr-workshop-xxx
往桶的input目录上传一个csv文件:
wget https://pingfan.s3.amazonaws.com/files/JC-202202-citibike-tripdata.csv
aws s3 cp JC-202202-citibike-tripdata.csv s3://emr-workshop-xxx/input/
登录EMR Primary Node后,使用命令行创建一个新文件 spark-etl.py:
cat <<EOT >> spark-etl.py
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 3):
print("Usage: spark-etl [input-folder] [output-folder]")
sys.exit(0)
spark = SparkSession\
.builder\
.appName("SparkETL")\
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.printSchema()
print(updatedNYTaxi.show())
print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet(sys.argv[2])
EOT
这个Spark任务将:
current_date
,执行以下命令(将桶名替换):
spark-submit spark-etl.py s3://<YOUR-BUCKET>/input/ s3://<YOUR-BUCKET>/output/spark
等待约半分钟后执行完成,检查S3的output/spark目录,将会看到输出结果: