在EKS上向EMR提交作业

在本节中,我们将介绍提交Spark 作业到EMR on EKS的三种方式。

  1. 使用aws emr-containers start-job-run命令

  2. 使用Spark Operator提交任务

  3. 使用Spark Submit提交任务

参考 https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/job-runs-main.html

start-job-run命令提交任务

  1. 获取virtual-cluster-id:从以下命令的输出中复制集群 ID。
aws emr-containers list-virtual-clusters
  1. 获取execution-role-arn的值

     aws iam get-role --role-name EMRContainers-JobExecutionRole
    

并将这些值导出到 Cloud9 终端:

export EMR_EKS_CLUSTER_ID=<virtual-cluster-id>
export EMR_EKS_EXECUTION_ARN=<arn\:aws\:iam::xxxxx\:role/EMR_EKS_Job_Execution_Role>

# 例如:
# export EMR_EKS_EXECUTION_ARN=arn:aws:iam::145197526627:role/EMRContainers-JobExecutionRole
# export EMR_EKS_CLUSTER_ID=t70uk0wzackkgxphg59nsntw1

以下命令在EMR 6.9.0上运行 Spark Pi Python 代码。通过运行以下命令提交作业:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-6.9.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }'

image-20240225232447150在EMR控制台查看任务:

image-20240225232535176

可以尝试使用不同的 EMR 版本运行相同的作业。以下命令在EMR 5.33.0上运行代码:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }'

pi.py代码如下:

import sys
from random import random
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

使用Spark Operator提交任务

Spark Operator 是一个 Kubernetes Operator,它可以在 Kubernetes 集群中部署和管理 Spark 应用程序。它提供了一种简单的方式来创建、配置和管理 Spark 应用程序,同时提供了一些高级功能,如动态资源分配和动态扩展。Spark Operator 可以在 Kubernetes 集群中自动启动和停止 Spark 应用程序,以及监控和管理它们的运行状态。

安装spark operator

ECR-registry-account进行替换,可在 https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-tag.html#docker-custom-images-ECR 中找到,例如us-west-2895885662937

aws ecr get-login-password \
--region $AWS_REGION | helm registry login \
--username AWS \
--password-stdin <ECR-registry-account>.dkr.ecr.$AWS_REGION.amazonaws.com

image-20240225235916398

使用以下命令安装spark operator:

helm install spark-operator-demo \
  oci://895885662937.dkr.ecr.$AWS_REGION.amazonaws.com/spark-operator \
  --set emrContainers.awsRegion=$AWS_REGION \
  --version 7.0.0 \
  --namespace spark-operator \
  --create-namespace 

image-20240226000155369

上面命令创建了一个service account emr-containers-sa-spark-operator,下面我们会用这个sa来提交任务。

运行spark应用

cat <<EoF > spark-pi.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-operator
spec:
  type: Scala
  mode: cluster
  image: "895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.10.0:latest"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///usr/lib/spark/examples/jars/spark-examples.jar"
  sparkVersion: "3.3.1"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.3.1
    serviceAccount: emr-containers-sa-spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.3.1
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
EoF

kubectl apply -f spark-pi.yaml

查看spark-pi-driver运行状态,等待它运行完成:

kubectl get po -n spark-operator

image-20240226000804760

查看日志,得到pi计算的值:

kubectl logs spark-pi-driver -n spark-operator

image-20240226000856980

使用spark submit提交任务

为了提交作业,需要使用与 EMR 中可用的 Spark 版本相匹配的 Spark 版本;还需要安装 Java。对于 Amazon EMR 6.10,需要下载 Spark 3.3 版本。

从 Apache Spark 3.3.2 下载 Spark 并解压tgz文件,这将创建文件夹spark-3.3.2-bin-hadoop3

curl -O https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

tar zxvf spark-3.3.2-bin-hadoop3.tgz

cd spark-3.3.2-bin-hadoop3

找到eks集群endpoint URL:

aws eks describe-cluster --name <eks-name> --query cluster.endpoint

image-20240226002214239

要提交 Spark 作业,运行以下命令:

export MASTER_URL=k8s://B6BA4EA8A6F4CEFC2A5E5C6A6F9A1042.gr7.us-west-2.eks.amazonaws.com


./bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master $MASTER_URL \
 --conf spark.kubernetes.container.image=895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.10.0:latest \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=emr-containers-sa-spark-operator \
 --deploy-mode cluster \
 --conf spark.kubernetes.namespace=spark-operator \
 local:///usr/lib/spark/examples/jars/spark-examples.jar 20 # 这个文件在容器内部

执行完成后,看到completed字样:

image-20240226003912177

查看pod:

kongpingfan:~/environment/workshops/emr-on-eks/spark-3.3.2-bin-hadoop3 $ kubectl get po -n spark-operator
NAME                                                        READY   STATUS      RESTARTS   AGE
org-apache-spark-examples-sparkpi-1b6ca48de121dd8f-driver   0/1     Completed   0          50s
spark-operator-demo-55d49d69c6-pt6dc                        1/1     Running     0          37m
spark-pi-driver                                             0/1     Completed   0          32m

查看pod日志:

kubectl logs org-apache-spark-examples-sparkpi-1b6ca48de121dd8f-driver -n spark-operator

image-20240226004054642