控制台将Spark Job提交到 EMR Serverless

接下来我们将:

  • 为 Spark 创建 EMR Serverless应用
  • 将 Spark 作业提交到 EMR Serverless应用
  • 为 Hive 创建 EMR Serverless应用
  • 将 Hive 作业提交到 EMR Serverless应用
  • 使用 Spark UI 进行监控和调试

CloudFormation创建资源

我们先用CloudFormation创建必须的资源。进入us-east-1地区的CloudFormation页面,创建一个新的stack:

image-20231221064809395

使用以下URL创建stack:

https://pingfan.s3.amazonaws.com/files/emr-serverless-cfn-v2.yml

image-20231221063004041

为Stack命名为emr-workshop:

image-20231221063019005

点击Next 直到最后创建stack。整个过程应该在3-5分钟创建完成,它创建了一个Cloud9,ECR仓库,Policy和Role,以及一个S3桶:

image-20231221064910782

控制台将 Spark 作业提交到 EMR Serverless

在向 EMR Serverless 提交作业之前,我们需要创建一个应用。可以创建一个或多个使用开源分析框架的应用。

要创建应用,需要指定:

  • 要使用的开源框架(例如,Apache Spark 或 Apache Hive)
  • 开源框架版本的EMR 版本(例如EMR 版本 6.4对应Apache Spark 3.1.2)
  • 应用的名称

在 EMR 控制台中选择 EMR Serverless:

image-20231221065206622

选择Get started。单击Create and launch EMR Studio:

image-20231221065353850

创建 EMR Serverless 应用:

image-20231221065517138

为应用命名。使用 my-serverless-application 作为应用名称。选择 Spark,选择 emr-6.15.0。选择默认设置。创建应用:

image-20231221065825285

创建application完成后,进入application,点击Submit Job

image-20231221065958488

Runtime role部分,可以让 EMR Serverless 为作业创建角色,但我们将使用 Cloudformation 模板已创建的角色:

image-20231221070057961

输入以下详细信息:

Name word_count
Runtime role EMRServerlessS3RuntimeRole
Script location S3 URI s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py
Script arguments [“s3://YOUR_BUCKET/wordcount_output/"] YOUR_BUCKET 使用CloudFormation中的输出进行替换

点击Submit Job

image-20231221070458033

由于需要准备并启动申请,因此首次提交作业需要花费更长的时间。此外,在应用准备好接受作业之前需要配置容量。

提交作业后,最终运行状态将显示Success

image-20231221070724206

现在可以验证Job是否已将其输出写入我们在提交作业时作为参数提供的 s3 路径中。转到 s3 并查看 EMR Serverless 应用成功创建的 csv 文件:

image-20231221070853202

spark已经帮我们把word的数量统计出来:

image-20231221071044604

word_count.py

上面运行spark任务的源码如下:

import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

if __name__ == "__main__":
    """
        Usage: wordcount [destination path]
    """
    spark = SparkSession\
        .builder\
        .appName("WordCount")\
        .getOrCreate()

    output_path = None

    if len(sys.argv) > 1:
        output_path = sys.argv[1]
    else:
        print("S3 output location not specified printing top 10 results to output stream")

    region = os.getenv("AWS_REGION")
    text_file = spark.sparkContext.textFile("s3://" + region  + ".elasticmapreduce/emr-containers/samples/wordcount/input")
    counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False)
    counts_df = counts.toDF(["word","count"])

    if output_path:
        counts_df.write.mode("overwrite").csv(output_path)
        print("WordCount job completed successfully. Refer output at S3 path: " + output_path)
    else:
        counts_df.show(10, False)
        print("WordCount job completed successfully.")

    spark.stop()