使用 Custom Image

如果想要为 Spark/Hive 应用添加自己的库和依赖,可以自定义 EMR Serverless runtime image

我们将把Pydeequ库添加到runtime image中。Pydeequ 是一个构建在 Apache Spark 之上的开源库,用于数据单元测试,测量大型数据集中的数据质量。我们将在 Pyspark 脚本中使用 Pydeequ 库来分析上一节创建的 CSV 数据。

环境准备

进入cloud 9,执行下面命令下载文件并解压:

wget https://pingfan.s3.amazonaws.com/files/custom-image-resources.zip
mkdir resources && cd resources/ && unzip ../custom-image-resources.zip

image-20231221073408431

解压后的目录格式:

image-20231221085915488

Cloud9 IDE默认带有10GB磁盘空间,当我们设置开发环境时,很容易打满。将以下内容保存为change_size.sh并运行sh change_size.sh,它将磁盘大小增加到 20GB:

#!/bin/bash

TOKEN=`curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600"`

# Specify the desired volume size in GiB as a command line argument. If not specified, default to 20 GiB.
SIZE=${1:-20}

# Get the ID of the environment host Amazon EC2 instance.
INSTANCEID=$(curl -H "X-aws-ec2-metadata-token: $TOKEN"  http://169.254.169.254/latest/meta-data/instance-id)
REGION=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/')

# Get the ID of the Amazon EBS volume associated with the instance.
VOLUMEID=$(aws ec2 describe-instances \
  --instance-id $INSTANCEID \
  --query "Reservations[0].Instances[0].BlockDeviceMappings[0].Ebs.VolumeId" \
  --output text \
  --region $REGION)

echo $INSTANCEID
echo $REGION
echo $VOLUMEID
# Resize the EBS volume.
aws ec2 modify-volume --volume-id $VOLUMEID --size $SIZE

# Wait for the resize to finish.
while [ \
  "$(aws ec2 describe-volumes-modifications \
    --volume-id $VOLUMEID \
    --filters Name=modification-state,Values="optimizing","completed" \
    --query "length(VolumesModifications)"\
    --output text)" != "1" ]; do
sleep 1
done

#Check if we're on an NVMe filesystem
if [[ -e "/dev/xvda" && $(readlink -f /dev/xvda) = "/dev/xvda" ]]
then
  # Rewrite the partition table so that the partition takes up all the space that it can.
  sudo growpart /dev/xvda 1

  # Expand the size of the file system.
  # Check if we're on AL2
  STR=$(cat /etc/os-release)
  SUB="VERSION_ID=\"2023\""
  if [[ "$STR" == *"$SUB"* ]]
  then
    sudo xfs_growfs -d /
  else
    sudo resize2fs /dev/xvda1
  fi

else
  # Rewrite the partition table so that the partition takes up all the space that it can.
  sudo growpart /dev/nvme0n1 1

  # Expand the size of the file system.
  # Check if we're on AL2
  STR=$(cat /etc/os-release)
  SUB="VERSION_ID=\"2023\""
  if [[ "$STR" == *"$SUB"* ]]
  then
    sudo xfs_growfs -d /
  else
    sudo resize2fs /dev/nvme0n1p1
  fi
fi

检查磁盘扩容成功:

image-20231221083010599

构建Docker镜像

运行此命令来设置创建自定义 docker 映像所需的环境变量,并将稍后使用的python script和jar文件复制到 S3 存储桶:

cd custom-image-resources
export ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
TOKEN=`curl -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600"`
export AWS_REGION=$(curl -H "X-aws-ec2-metadata-token: $TOKEN" -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed 's/\(.*\)[a-z]/\1/')
export IMAGE_URI="${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/emr-serverless-repository:latest"
aws s3 cp pydeequ_analyze.py  $S3_BUCKET/scripts/
aws s3 cp deequ-2.0.2-spark-3.3.jar  $S3_BUCKET/jars/
echo $IMAGE_URI

从 EMR Serverless 基础镜像创建自定义 docker镜像:

docker build . -t ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/emr-serverless-repository

image-20231221085148748

Dockerfile内容如下, 它在EMR基础镜像的基础上多安装了pydeequ库:

FROM public.ecr.aws/emr-serverless/spark/emr-6.9.0:latest AS base
USER root

RUN yum -y update

RUN pip3 install pydeequ==1.0.1

# EMRS will run the image as hadoop
USER hadoop:hadoop

将镜像上传到ECR:

aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com
docker push ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/emr-serverless-repository

image-20231221085201415

创建EMR Serverless application

custom-image-application.json中的$IMAGE_URI进行替换:

sed -i  's@<replace-image-uri>@'"$IMAGE_URI"'@g' custom-image-application.json

替换完成后格式:

{
    "name": "emr-serverless-custom-image-application",
    "releaseLabel": "emr-6.9.0",
    "type": "SPARK",
    "imageConfiguration": {
        "imageUri": "256659126572.dkr.ecr.us-east-1.amazonaws.com/emr-serverless-repository:latest"
    }
}

使用我们刚刚构建的custom iamge, 创建EMR Serverless application:

export APPLICATION_ID=$(aws emr-serverless create-application  --cli-input-json file://custom-image-application.json --output text --query applicationId)

提交一个 pyspark job,该job使用 pydeequ 库分析上节创建的 CSV 数据:

aws emr-serverless start-job-run --application-id ${APPLICATION_ID} --execution-role-arn ${JOB_ROLE_ARN} --name "pydeequ-analyze" --job-driver '{
        "sparkSubmit": {
            "entryPoint": "'"$S3_BUCKET"'/scripts/pydeequ_analyze.py",			
            "entryPointArguments": [
          "'"$S3_BUCKET"'/"
        ],
            "sparkSubmitParameters": "--jars '"$S3_BUCKET"'/jars/deequ-2.0.2-spark-3.3.jar --conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1 --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
    }' 

在EMR Studio页面上,可以看到job的运行状态(在新创建的application中运行):

image-20231221084634989

运行完成后,在 S3 路径中看到 pydeequ 的输出,如下所示:

image-20231221084940240

我们在 pyspark 脚本中使用 pydeequ 的大小分析器来计算 CSV 数据集的大小,结果如下:

image-20231221090030627

附: pydeequ_analyze.py

import os
import sys
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import pydeequ
from pydeequ.analyzers import *



if __name__ == "__main__":
    """
        Usage: wordcount [destination path]
    """
    
    if len(sys.argv) > 1:
        output_path = sys.argv[1]
    else:
        print("S3 output location not specified printing top 10 results to output stream")

    spark = SparkSession\
        .builder\
        .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
        .appName("WordCount")\
        .getOrCreate()

    df = spark.read.csv(output_path + "wordcount_output/")
    
    analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .run()

    analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
    analysisResult_df.show()
    
    analysisResult_df.write.format("csv").save(output_path + "pydeequ_output/")
    
    spark.stop()