如果想要为 Spark/Hive 应用添加自己的库和依赖,可以自定义 EMR Serverless runtime image
。
我们将把Pydeequ库添加到runtime image中。Pydeequ 是一个构建在 Apache Spark 之上的开源库,用于数据单元测试,测量大型数据集中的数据质量。我们将在 Pyspark 脚本中使用 Pydeequ 库来分析上一节创建的 CSV 数据。
进入cloud 9,执行下面命令下载文件并解压:
wget https://pingfan.s3.amazonaws.com/files/custom-image-resources.zip
mkdir resources && cd resources/ && unzip ../custom-image-resources.zip
解压后的目录格式:
Cloud9 IDE默认带有10GB磁盘空间,当我们设置开发环境时,很容易打满。将以下内容保存为change_size.sh
并运行sh change_size.sh
,它将磁盘大小增加到 20GB:
#!/bin/bash
TOKEN=`curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600"`
# Specify the desired volume size in GiB as a command line argument. If not specified, default to 20 GiB.
SIZE=${1:-20}
# Get the ID of the environment host Amazon EC2 instance.
INSTANCEID=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/instance-id)
REGION=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/')
# Get the ID of the Amazon EBS volume associated with the instance.
VOLUMEID=$(aws ec2 describe-instances \
--instance-id $INSTANCEID \
--query "Reservations[0].Instances[0].BlockDeviceMappings[0].Ebs.VolumeId" \
--output text \
--region $REGION)
echo $INSTANCEID
echo $REGION
echo $VOLUMEID
# Resize the EBS volume.
aws ec2 modify-volume --volume-id $VOLUMEID --size $SIZE
# Wait for the resize to finish.
while [ \
"$(aws ec2 describe-volumes-modifications \
--volume-id $VOLUMEID \
--filters Name=modification-state,Values="optimizing","completed" \
--query "length(VolumesModifications)"\
--output text)" != "1" ]; do
sleep 1
done
#Check if we're on an NVMe filesystem
if [[ -e "/dev/xvda" && $(readlink -f /dev/xvda) = "/dev/xvda" ]]
then
# Rewrite the partition table so that the partition takes up all the space that it can.
sudo growpart /dev/xvda 1
# Expand the size of the file system.
# Check if we're on AL2
STR=$(cat /etc/os-release)
SUB="VERSION_ID=\"2023\""
if [[ "$STR" == *"$SUB"* ]]
then
sudo xfs_growfs -d /
else
sudo resize2fs /dev/xvda1
fi
else
# Rewrite the partition table so that the partition takes up all the space that it can.
sudo growpart /dev/nvme0n1 1
# Expand the size of the file system.
# Check if we're on AL2
STR=$(cat /etc/os-release)
SUB="VERSION_ID=\"2023\""
if [[ "$STR" == *"$SUB"* ]]
then
sudo xfs_growfs -d /
else
sudo resize2fs /dev/nvme0n1p1
fi
fi
检查磁盘扩容成功:
运行此命令来设置创建自定义 docker 映像所需的环境变量,并将稍后使用的python script和jar文件复制到 S3 存储桶:
cd custom-image-resources
export ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
TOKEN=`curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600"`
export AWS_REGION=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/')
export IMAGE_URI="${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/emr-serverless-repository:latest"
aws s3 cp pydeequ_analyze.py $S3_BUCKET/scripts/
aws s3 cp deequ-2.0.2-spark-3.3.jar $S3_BUCKET/jars/
echo $IMAGE_URI
从 EMR Serverless 基础镜像创建自定义 docker镜像:
docker build . -t ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/emr-serverless-repository
Dockerfile内容如下, 它在EMR基础镜像的基础上多安装了pydeequ库:
FROM public.ecr.aws/emr-serverless/spark/emr-6.9.0:latest AS base
USER root
RUN yum -y update
RUN pip3 install pydeequ==1.0.1
# EMRS will run the image as hadoop
USER hadoop:hadoop
将镜像上传到ECR:
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com
docker push ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/emr-serverless-repository
将custom-image-application.json
中的$IMAGE_URI
进行替换:
sed -i 's@<replace-image-uri>@'"$IMAGE_URI"'@g' custom-image-application.json
替换完成后格式:
{
"name": "emr-serverless-custom-image-application",
"releaseLabel": "emr-6.9.0",
"type": "SPARK",
"imageConfiguration": {
"imageUri": "256659126572.dkr.ecr.us-east-1.amazonaws.com/emr-serverless-repository:latest"
}
}
使用我们刚刚构建的custom iamge, 创建EMR Serverless application:
export APPLICATION_ID=$(aws emr-serverless create-application --cli-input-json file://custom-image-application.json --output text --query applicationId)
提交一个 pyspark job,该job使用 pydeequ 库分析上节创建的 CSV 数据:
aws emr-serverless start-job-run --application-id ${APPLICATION_ID} --execution-role-arn ${JOB_ROLE_ARN} --name "pydeequ-analyze" --job-driver '{
"sparkSubmit": {
"entryPoint": "'"$S3_BUCKET"'/scripts/pydeequ_analyze.py",
"entryPointArguments": [
"'"$S3_BUCKET"'/"
],
"sparkSubmitParameters": "--jars '"$S3_BUCKET"'/jars/deequ-2.0.2-spark-3.3.jar --conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1 --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}'
在EMR Studio页面上,可以看到job的运行状态(在新创建的application中运行):
运行完成后,在 S3 路径中看到 pydeequ 的输出,如下所示:
我们在 pyspark 脚本中使用 pydeequ 的大小分析器来计算 CSV 数据集的大小,结果如下:
import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import pydeequ
from pydeequ.analyzers import *
if __name__ == "__main__":
"""
Usage: wordcount [destination path]
"""
if len(sys.argv) > 1:
output_path = sys.argv[1]
else:
print("S3 output location not specified printing top 10 results to output stream")
spark = SparkSession\
.builder\
.config("spark.jars.packages", pydeequ.deequ_maven_coord) \
.config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
.appName("WordCount")\
.getOrCreate()
df = spark.read.csv(output_path + "wordcount_output/")
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
analysisResult_df.write.format("csv").save(output_path + "pydeequ_output/")
spark.stop()