在本节中,我们将介绍提交Spark 作业到EMR on EKS
的三种方式。
使用aws emr-containers start-job-run
命令
使用Spark Operator
提交任务
使用Spark Submit
提交任务
参考 https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/job-runs-main.html
aws emr-containers list-virtual-clusters
获取execution-role-arn的值
aws iam get-role --role-name EMRContainers-JobExecutionRole
并将这些值导出到 Cloud9 终端:
export EMR_EKS_CLUSTER_ID=<virtual-cluster-id>
export EMR_EKS_EXECUTION_ARN=<arn\:aws\:iam::xxxxx\:role/EMR_EKS_Job_Execution_Role>
# 例如:
# export EMR_EKS_EXECUTION_ARN=arn:aws:iam::145197526627:role/EMRContainers-JobExecutionRole
# export EMR_EKS_CLUSTER_ID=t70uk0wzackkgxphg59nsntw1
以下命令在EMR 6.9.0上运行 Spark Pi
Python 代码。通过运行以下命令提交作业:
aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-6.9.0-latest \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
}
}'
在EMR控制台查看任务:
可以尝试使用不同的 EMR 版本运行相同的作业。以下命令在EMR 5.33.0上运行代码:
aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
}
}'
pi.py代码如下:
import sys from random import random from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": """ Usage: pi [partitions] """ spark = SparkSession\ .builder\ .appName("PythonPi")\ .getOrCreate() partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions def f(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x ** 2 + y ** 2 <= 1 else 0 count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) print("Pi is roughly %f" % (4.0 * count / n)) spark.stop()
Spark Operator 是一个 Kubernetes Operator,它可以在 Kubernetes 集群中部署和管理 Spark 应用程序。它提供了一种简单的方式来创建、配置和管理 Spark 应用程序,同时提供了一些高级功能,如动态资源分配和动态扩展。Spark Operator 可以在 Kubernetes 集群中自动启动和停止 Spark 应用程序,以及监控和管理它们的运行状态。
将ECR-registry-account
进行替换,可在 https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/docker-custom-images-tag.html#docker-custom-images-ECR 中找到,例如us-west-2
是895885662937
:
aws ecr get-login-password \
--region $AWS_REGION | helm registry login \
--username AWS \
--password-stdin <ECR-registry-account>.dkr.ecr.$AWS_REGION.amazonaws.com
使用以下命令安装spark operator:
helm install spark-operator-demo \
oci://895885662937.dkr.ecr.$AWS_REGION.amazonaws.com/spark-operator \
--set emrContainers.awsRegion=$AWS_REGION \
--version 7.0.0 \
--namespace spark-operator \
--create-namespace
上面命令创建了一个service account emr-containers-sa-spark-operator
,下面我们会用这个sa来提交任务。
cat <<EoF > spark-pi.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-operator
spec:
type: Scala
mode: cluster
image: "895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.10.0:latest"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///usr/lib/spark/examples/jars/spark-examples.jar"
sparkVersion: "3.3.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.3.1
serviceAccount: emr-containers-sa-spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.3.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
EoF
kubectl apply -f spark-pi.yaml
查看spark-pi-driver运行状态,等待它运行完成:
kubectl get po -n spark-operator
查看日志,得到pi计算的值:
kubectl logs spark-pi-driver -n spark-operator
为了提交作业,需要使用与 EMR 中可用的 Spark 版本相匹配的 Spark 版本;还需要安装 Java。对于 Amazon EMR 6.10,需要下载 Spark 3.3 版本。
从 Apache Spark 3.3.2 下载 Spark 并解压tgz
文件,这将创建文件夹spark-3.3.2-bin-hadoop3
:
curl -O https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
tar zxvf spark-3.3.2-bin-hadoop3.tgz
cd spark-3.3.2-bin-hadoop3
找到eks集群endpoint URL:
aws eks describe-cluster --name <eks-name> --query cluster.endpoint
要提交 Spark 作业,运行以下命令:
export MASTER_URL=k8s://B6BA4EA8A6F4CEFC2A5E5C6A6F9A1042.gr7.us-west-2.eks.amazonaws.com
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master $MASTER_URL \
--conf spark.kubernetes.container.image=895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.10.0:latest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=emr-containers-sa-spark-operator \
--deploy-mode cluster \
--conf spark.kubernetes.namespace=spark-operator \
local:///usr/lib/spark/examples/jars/spark-examples.jar 20 # 这个文件在容器内部
执行完成后,看到completed字样:
查看pod:
kongpingfan:~/environment/workshops/emr-on-eks/spark-3.3.2-bin-hadoop3 $ kubectl get po -n spark-operator
NAME READY STATUS RESTARTS AGE
org-apache-spark-examples-sparkpi-1b6ca48de121dd8f-driver 0/1 Completed 0 50s
spark-operator-demo-55d49d69c6-pt6dc 1/1 Running 0 37m
spark-pi-driver 0/1 Completed 0 32m
查看pod日志:
kubectl logs org-apache-spark-examples-sparkpi-1b6ca48de121dd8f-driver -n spark-operator