ssh -i <<key-pair>> hadoop@<<emr-master-public-dns-address>>
cat <<EOT >> spark-etl.py
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 3):
print("Usage: spark-etl [input-folder] [output-folder]")
sys.exit(0)
spark = SparkSession\
.builder\
.appName("SparkETL")\
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.printSchema()
print(updatedNYTaxi.show())
print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet(sys.argv[2])
EOT
向集群提交pySpark任务。 这个Spark任务将获取S3 input目录下的数据,增加一列"current_date”,将结果以Parquet文件形式写入到S3的output目录。在Cloud9终端执行以下命令(将桶名替换):
spark-submit spark-etl.py s3://<YOUR-BUCKET>/input/ s3://<YOUR-BUCKET>/output/spark
等待约半分钟后执行完成,检查S3的output/spark目录,将会看到输出结果: