在EKS上向EMR提交作业

在本节中,我们将介绍提交Spark 作业到EMR on EKS的三种方式。

  1. 使用aws emr-containers start-job-run命令

  2. 使用Spark Operator提交任务

  3. 使用Spark Submit提交任务

参考 https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/job-runs-main.html

start-job-run命令提交任务

  1. 获取virtual-cluster-id:从以下命令的输出中复制集群 ID。
aws emr-containers list-virtual-clusters
  1. 获取execution-role-arn的值

     aws iam get-role --role-name EMRContainers-JobExecutionRole
    

并将这些值导出到 Cloud9 终端:

export EMR_EKS_CLUSTER_ID=<virtual-cluster-id>
export EMR_EKS_EXECUTION_ARN=<arn\:aws\:iam::xxxxx\:role/EMR_EKS_Job_Execution_Role>

# 例如:
# export EMR_EKS_EXECUTION_ARN=arn:aws:iam::145197526627:role/EMRContainers-JobExecutionRole
# export EMR_EKS_CLUSTER_ID=t70uk0wzackkgxphg59nsntw1

以下命令在EMR 6.9.0上运行 Spark Pi Python 代码。通过运行以下命令提交作业:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-6.9.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }'

image-20240225232447150在EMR控制台查看任务:

image-20240225232535176

可以尝试使用不同的 EMR 版本运行相同的作业。以下命令在EMR 5.33.0上运行代码:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }'

pi.py代码如下:

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

由此可见EMR on EKS的强大,可以支持多版本的Spark任务同时运行;这在传统的EMR on EC2是很难做到的

使用Spark Operator提交任务

Spark Operator 是一个 Kubernetes Operator,它可以在 Kubernetes 集群中部署和管理 Spark 应用程序。它提供了一种简单的方式来创建、配置和管理 Spark 应用程序,同时提供了一些高级功能,如动态资源分配和动态扩展。Spark Operator 可以在 Kubernetes 集群中自动启动和停止 Spark 应用程序,以及监控和管理它们的运行状态。

安装spark operator

ECR-registry-account进行替换,可在 https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-tag.html#docker-custom-images-ECR 中找到,例如us-west-2895885662937

aws ecr get-login-password \
--region $AWS_REGION | helm registry login \
--username AWS \
--password-stdin <ECR-registry-account>.dkr.ecr.$AWS_REGION.amazonaws.com

image-20240225235916398

使用以下命令安装spark operator:

helm install spark-operator-demo \
  oci://895885662937.dkr.ecr.$AWS_REGION.amazonaws.com/spark-operator \
  --set emrContainers.awsRegion=$AWS_REGION \
  --version 7.0.0 \
  --namespace spark-operator \
  --create-namespace 

image-20240226000155369

上面命令创建了一个service account emr-containers-sa-spark-operator,下面我们会用这个sa来提交任务。

运行spark应用

cat <<EoF > spark-pi.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-operator
spec:
  type: Scala
  mode: cluster
  image: "895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.10.0:latest"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///usr/lib/spark/examples/jars/spark-examples.jar"
  sparkVersion: "3.3.1"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.3.1
    serviceAccount: emr-containers-sa-spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.3.1
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
EoF

kubectl apply -f spark-pi.yaml

一些主要的参数如下所述:

描述
metadata.name SparkApplication 的名称
metadata.namespace SparkApplication 将运行的命名空间
spec.name 编程语言 – 我们这里使用 Python
spec.mode 作业提交模式
spec.image 作业中使用的 docker 容器镜像
spec.mainApplicationFile 应用程序代码的位置、路径和名称
spec.driver.cores Spark 驱动程序的 CPU 核心数
spec.driver.memory 分配给 Spark 驱动程序的内存
spec.driver.serviceAccount Spark 驱动程序使用的服务帐户
spec.executor.cores Spark 执行器的 CPU 核心数
spec.executor.memory 分配给 Spark executor的内存
spec.executor.serviceAccount Spark 执行器使用的服务帐户

查看spark-pi-driver运行状态,等待它运行完成:

kubectl get po -n spark-operator

image-20240226000804760

查看日志,得到pi计算的值:

kubectl logs spark-pi-driver -n spark-operator

image-20240226000856980

小结

spark operator的核心组件:

  1. SparkApplication CRD
    • 这是一个 Kubernetes 自定义资源,用于定义 Spark 应用程序的配置,包括镜像、主类、参数、资源请求等。
  2. ScheduledSparkApplication CRD
    • 用于定义定时运行的 Spark 应用程序,类似于 Kubernetes 的 CronJob。
  3. Spark Operator 控制器
    • 这是一个 Kubernetes 控制器,负责监听和处理 SparkApplicationScheduledSparkApplication 的变化,提交和管理 Spark 作业。

Spark Operator 通过将 Spark 的复杂管理工作抽象成 Kubernetes 资源,大大简化了在 Kubernetes 上运行 Spark 作业的过程。它利用 Kubernetes 的强大功能,如自动扩展、资源调度和监控,提供了一种高效、灵活和可扩展的 Spark 运行环境。

使用spark submit提交任务

为了提交作业,需要使用与 EMR 中可用的 Spark 版本相匹配的 Spark 版本;还需要安装 Java。对于 Amazon EMR 6.10,需要下载 Spark 3.3 版本。

从 Apache Spark 3.3.2 下载 Spark 并解压tgz文件,这将创建文件夹spark-3.3.2-bin-hadoop3

curl -O https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

tar zxvf spark-3.3.2-bin-hadoop3.tgz

cd spark-3.3.2-bin-hadoop3

找到eks集群endpoint URL:

aws eks describe-cluster --name <eks-name> --query cluster.endpoint

image-20240226002214239

要提交 Spark 作业,运行以下命令:

export MASTER_URL=k8s://B6BA4EA8A6F4CEFC2A5E5C6A6F9A1042.gr7.us-west-2.eks.amazonaws.com


./bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master $MASTER_URL \
 --conf spark.kubernetes.container.image=895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.10.0:latest \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=emr-containers-sa-spark-operator \
 --deploy-mode cluster \
 --conf spark.kubernetes.namespace=spark-operator \
 local:///usr/lib/spark/examples/jars/spark-examples.jar 20 # 这个文件在容器内部

执行完成后,看到completed字样:

image-20240226003912177

查看pod:

kongpingfan:~/environment/workshops/emr-on-eks/spark-3.3.2-bin-hadoop3 $ kubectl get po -n spark-operator
NAME                                                        READY   STATUS      RESTARTS   AGE
org-apache-spark-examples-sparkpi-1b6ca48de121dd8f-driver   0/1     Completed   0          50s
spark-operator-demo-55d49d69c6-pt6dc                        1/1     Running     0          37m
spark-pi-driver                                             0/1     Completed   0          32m

查看pod日志:

kubectl logs org-apache-spark-examples-sparkpi-1b6ca48de121dd8f-driver -n spark-operator

image-20240226004054642