接下来我们将:
EMR Serverless应用EMR Serverless应用我们先用CloudFormation创建必须的资源。进入us-east-1地区的CloudFormation页面,创建一个新的stack:

使用以下URL创建stack:
https://pingfan.s3.amazonaws.com/files/emr-serverless-cfn-v2.yml

为Stack命名为emr-workshop:

点击Next 直到最后创建stack。整个过程应该在3-5分钟创建完成,它创建了一个Cloud9,ECR仓库,Policy和Role,以及一个S3桶:

在向 EMR Serverless 提交作业之前,我们需要创建一个应用。可以创建一个或多个使用开源分析框架的应用。
要创建应用,需要指定:
在 EMR 控制台中选择 EMR Serverless:

选择Get started。单击Create and launch EMR Studio:

创建 EMR Serverless 应用:

为应用命名。使用 my-serverless-application 作为应用名称。选择 Spark,选择 emr-6.15.0。选择默认设置。创建应用:

创建application完成后,进入application,点击Submit Job:

在Runtime role部分,可以让 EMR Serverless 为作业创建角色,但我们将使用 Cloudformation 模板已创建的角色:

输入以下详细信息:
| Name | word_count |
| Runtime role | EMRServerlessS3RuntimeRole |
| Script location S3 URI | s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py |
| Script arguments | [“s3://YOUR_BUCKET/wordcount_output/"] YOUR_BUCKET 使用CloudFormation中的输出进行替换 |
点击Submit Job:

由于需要准备并启动申请,因此首次提交作业需要花费更长的时间。此外,在应用准备好接受作业之前需要配置容量。
提交作业后,最终运行状态将显示Success:

现在可以验证Job是否已将其输出写入我们在提交作业时作为参数提供的 s3 路径中。转到 s3 并查看 EMR Serverless 应用成功创建的 csv 文件:

spark已经帮我们把word的数量统计出来:

上面运行spark任务的源码如下:
import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: wordcount [destination path]
"""
spark = SparkSession\
.builder\
.appName("WordCount")\
.getOrCreate()
output_path = None
if len(sys.argv) > 1:
output_path = sys.argv[1]
else:
print("S3 output location not specified printing top 10 results to output stream")
region = os.getenv("AWS_REGION")
text_file = spark.sparkContext.textFile("s3://" + region + ".elasticmapreduce/emr-containers/samples/wordcount/input")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False)
counts_df = counts.toDF(["word","count"])
if output_path:
counts_df.write.mode("overwrite").csv(output_path)
print("WordCount job completed successfully. Refer output at S3 path: " + output_path)
else:
counts_df.show(10, False)
print("WordCount job completed successfully.")
spark.stop()