接下来我们将:
EMR Serverless
应用EMR Serverless
应用我们先用CloudFormation创建必须的资源。进入us-east-1
地区的CloudFormation页面,创建一个新的stack:
使用以下URL创建stack:
https://pingfan.s3.amazonaws.com/files/emr-serverless-cfn-v2.yml
为Stack命名为emr-workshop
:
点击Next 直到最后创建stack。整个过程应该在3-5分钟创建完成,它创建了一个Cloud9,ECR仓库,Policy和Role,以及一个S3桶:
在向 EMR Serverless 提交作业之前,我们需要创建一个应用。可以创建一个或多个使用开源分析框架的应用。
要创建应用,需要指定:
在 EMR 控制台中选择 EMR Serverless:
选择Get started。单击Create and launch EMR Studio:
创建 EMR Serverless 应用:
为应用命名。使用 my-serverless-application
作为应用名称。选择 Spark,选择 emr-6.15.0。选择默认设置。创建应用:
创建application完成后,进入application,点击Submit Job
:
在Runtime role
部分,可以让 EMR Serverless 为作业创建角色,但我们将使用 Cloudformation 模板已创建的角色:
输入以下详细信息:
Name | word_count |
Runtime role | EMRServerlessS3RuntimeRole |
Script location S3 URI | s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py |
Script arguments | [“s3://YOUR_BUCKET/wordcount_output/"] YOUR_BUCKET 使用CloudFormation中的输出进行替换 |
点击Submit Job
:
由于需要准备并启动申请,因此首次提交作业需要花费更长的时间。此外,在应用准备好接受作业之前需要配置容量。
提交作业后,最终运行状态将显示Success:
现在可以验证Job是否已将其输出写入我们在提交作业时作为参数提供的 s3 路径中。转到 s3 并查看 EMR Serverless 应用成功创建的 csv 文件:
spark已经帮我们把word的数量统计出来:
上面运行spark任务的源码如下:
import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: wordcount [destination path]
"""
spark = SparkSession\
.builder\
.appName("WordCount")\
.getOrCreate()
output_path = None
if len(sys.argv) > 1:
output_path = sys.argv[1]
else:
print("S3 output location not specified printing top 10 results to output stream")
region = os.getenv("AWS_REGION")
text_file = spark.sparkContext.textFile("s3://" + region + ".elasticmapreduce/emr-containers/samples/wordcount/input")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False)
counts_df = counts.toDF(["word","count"])
if output_path:
counts_df.write.mode("overwrite").csv(output_path)
print("WordCount job completed successfully. Refer output at S3 path: " + output_path)
else:
counts_df.show(10, False)
print("WordCount job completed successfully.")
spark.stop()