在本节中,我们将介绍提交Spark 作业到EMR on EKS
的三种方式。
使用aws emr-containers start-job-run
命令
使用Spark Operator
提交任务
使用Spark Submit
提交任务
参考 https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/job-runs-main.html
aws emr-containers list-virtual-clusters
获取execution-role-arn的值
aws iam get-role --role-name EMRContainers-JobExecutionRole
并将这些值导出到 Cloud9 终端:
export EMR_EKS_CLUSTER_ID=<virtual-cluster-id>
export EMR_EKS_EXECUTION_ARN=<arn\:aws\:iam::xxxxx\:role/EMR_EKS_Job_Execution_Role>
# 例如:
# export EMR_EKS_EXECUTION_ARN=arn:aws:iam::145197526627:role/EMRContainers-JobExecutionRole
# export EMR_EKS_CLUSTER_ID=t70uk0wzackkgxphg59nsntw1
以下命令在EMR 6.9.0上运行 Spark Pi
Python 代码。通过运行以下命令提交作业:
aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-6.9.0-latest \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
}
}'
在EMR控制台查看任务:
可以尝试使用不同的 EMR 版本运行相同的作业。以下命令在EMR 5.33.0上运行代码:
aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
}
}'
pi.py代码如下:
import sys from random import random from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": """ Usage: pi [partitions] """ spark = SparkSession\ .builder\ .appName("PythonPi")\ .getOrCreate() partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions def f(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop()
由此可见EMR on EKS的强大,可以支持多版本的Spark任务同时运行;这在传统的EMR on EC2是很难做到的
Spark Operator 是一个 Kubernetes Operator,它可以在 Kubernetes 集群中部署和管理 Spark 应用程序。它提供了一种简单的方式来创建、配置和管理 Spark 应用程序,同时提供了一些高级功能,如动态资源分配和动态扩展。Spark Operator 可以在 Kubernetes 集群中自动启动和停止 Spark 应用程序,以及监控和管理它们的运行状态。
将ECR-registry-account
进行替换,可在 https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-tag.html#docker-custom-images-ECR 中找到,例如us-west-2
是895885662937
:
aws ecr get-login-password \
--region $AWS_REGION | helm registry login \
--username AWS \
--password-stdin <ECR-registry-account>.dkr.ecr.$AWS_REGION.amazonaws.com
使用以下命令安装spark operator:
helm install spark-operator-demo \
oci://895885662937.dkr.ecr.$AWS_REGION.amazonaws.com/spark-operator \
--set emrContainers.awsRegion=$AWS_REGION \
--version 7.0.0 \
--namespace spark-operator \
--create-namespace
上面命令创建了一个service account emr-containers-sa-spark-operator
,下面我们会用这个sa来提交任务。
cat <<EoF > spark-pi.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-operator
spec:
type: Scala
mode: cluster
image: "895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.10.0:latest"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///usr/lib/spark/examples/jars/spark-examples.jar"
sparkVersion: "3.3.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.3.1
serviceAccount: emr-containers-sa-spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.3.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
EoF
kubectl apply -f spark-pi.yaml
一些主要的参数如下所述:
键 | 描述 |
---|---|
metadata.name | SparkApplication 的名称 |
metadata.namespace | SparkApplication 将运行的命名空间 |
spec.name | 编程语言 – 我们这里使用 Python |
spec.mode | 作业提交模式 |
spec.image | 作业中使用的 docker 容器镜像 |
spec.mainApplicationFile | 应用程序代码的位置、路径和名称 |
spec.driver.cores | Spark 驱动程序的 CPU 核心数 |
spec.driver.memory | 分配给 Spark 驱动程序的内存 |
spec.driver.serviceAccount | Spark 驱动程序使用的服务帐户 |
spec.executor.cores | Spark 执行器的 CPU 核心数 |
spec.executor.memory | 分配给 Spark executor的内存 |
spec.executor.serviceAccount | Spark 执行器使用的服务帐户 |
查看spark-pi-driver运行状态,等待它运行完成:
kubectl get po -n spark-operator
查看日志,得到pi计算的值:
kubectl logs spark-pi-driver -n spark-operator
spark operator的核心组件:
SparkApplication
和 ScheduledSparkApplication
的变化,提交和管理 Spark 作业。Spark Operator 通过将 Spark 的复杂管理工作抽象成 Kubernetes 资源,大大简化了在 Kubernetes 上运行 Spark 作业的过程。它利用 Kubernetes 的强大功能,如自动扩展、资源调度和监控,提供了一种高效、灵活和可扩展的 Spark 运行环境。
为了提交作业,需要使用与 EMR 中可用的 Spark 版本相匹配的 Spark 版本;还需要安装 Java。对于 Amazon EMR 6.10,需要下载 Spark 3.3 版本。
从 Apache Spark 3.3.2 下载 Spark 并解压tgz
文件,这将创建文件夹spark-3.3.2-bin-hadoop3
:
curl -O https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
tar zxvf spark-3.3.2-bin-hadoop3.tgz
cd spark-3.3.2-bin-hadoop3
找到eks集群endpoint URL:
aws eks describe-cluster --name <eks-name> --query cluster.endpoint
要提交 Spark 作业,运行以下命令:
export MASTER_URL=k8s://B6BA4EA8A6F4CEFC2A5E5C6A6F9A1042.gr7.us-west-2.eks.amazonaws.com
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master $MASTER_URL \
--conf spark.kubernetes.container.image=895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.10.0:latest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=emr-containers-sa-spark-operator \
--deploy-mode cluster \
--conf spark.kubernetes.namespace=spark-operator \
local:///usr/lib/spark/examples/jars/spark-examples.jar 20 # 这个文件在容器内部
执行完成后,看到completed字样:
查看pod:
kongpingfan:~/environment/workshops/emr-on-eks/spark-3.3.2-bin-hadoop3 $ kubectl get po -n spark-operator
NAME READY STATUS RESTARTS AGE
org-apache-spark-examples-sparkpi-1b6ca48de121dd8f-driver 0/1 Completed 0 50s
spark-operator-demo-55d49d69c6-pt6dc 1/1 Running 0 37m
spark-pi-driver 0/1 Completed 0 32m
查看pod日志:
kubectl logs org-apache-spark-examples-sparkpi-1b6ca48de121dd8f-driver -n spark-operator