与RDS Hive MetaStore集成

在本节中,我们将使用 EMR Serverless 运行 Spark ETL 作业,并与 RDS Hive Metastore 交互以创建表

Hive MetaStore是一个集中位置,用于存储有关表的结构信息,包括架构、分区名称和数据类型。

请注意,在EMR Serverless application 上提交作业时,MetaStore不会自动初始化。

设置提交作业的先决条件

在 EMR Serveless 中调度 Spark 作业之前,需要创建所需的 AWS 资源。为此,我们提供 AWS CloudFormation 模板来创建包含资源的堆栈。当您创建堆栈时,AWS 会在您的账户中创建以下资源:

  • 设置部署以下资源的 VPC 网络:
    • 具有一对公有子网和私有子网的 VPC 分布在两个可用区。
    • 互联网网关,在公共子网上具有默认路由。
    • 一对 NAT 网关(每个可用区一个),以及它们在私有子网中的默认路由。
    • 用于从 EMR 无服务器应用程序访问 Hive Metastore 的安全组,允许本地入站流量和所有出站流量。
  • 满足以下要求的 Amazon Simple Storage Service (Amazon S3) 存储桶:
    • 在您创建 EMR 无服务器应用程序的同一 AWS 区域中创建。
    • 全局唯一的存储桶名称
  • 初始化 EMR 发行版本 6.6.6 的 hive 元存储架构
  • 设置具有以下权限的 EMR Serverless 作业执行 IAM 角色:
    • 连接MySQL环境
    • 访问所需的 s3-buckets,
    • 承担 EMR 无服务器角色。
  • 具有以下属性的 EMR 无服务器应用程序
    • Amazon EMR 发布版本 6.6.6
    • 在应用程序中使用的 Apache Spark 运行时
  • VPC 公有子网中的 cloud9 环境,用于提交 EMR Serverless 作业

要启动 CloudFormation 堆栈,请单击启动堆栈。

启动堆栈

  1. 在“创建堆栈”页面上,选择页面底部的**“下一步” ,如下所示。**

img

  1. 在指定堆栈详细信息上选择以下内容
    • 对于堆栈名称,输入EMR-Serverless-HMS
    • 对于 EC2 密钥对,从下拉菜单中选择私钥(.pem 文件),将所有其他选项保留为默认值,然后单击下一步。

img

  1. 将“配置堆栈选项”上的所有选项保留为默认值,然后单击“下一步”
  2. 转至“审核”页面底部,选中“我确认 AWS CloudFormation 可能会使用自定义名称创建 IAM 资源”旁边的复选框,然后单击“创建堆栈”,如下所示。

img

CloudFormation 堆栈将创建研讨会所需的必要资源。

CloudFormation 堆栈大约需要 15-20 分钟才能完成。

在 CloudFormation 控制台中等待状态CREATE_COMPLETE,如下所示:

img

使用 EMR 无服务器应用程序提交 Spark 作业

  1. 导航至CloudFormation 控制台 。检查 AWS 区域并确保您选择正确的区域。
  2. 选择 Stacks,然后选择您的根堆栈名称(默认为EMR-Serverless-HMS )
  3. 选择输出,然后选择 Key Cloud9URL旁边的链接。

img

  1. 您的 AWS Cloud9 环境应类似于下面的屏幕截图(颜色方案可能看起来不同)

img

  1. 然后返回EMR-Serverless-HMS CloudFormation 模板的输出部分,复制并替换以下代码片段中相应键的****值
1
2
3
4
5
6
7
8
9
10
sudo yum install jq -y
export MariaDBHost=<Replace with EMRServerlessHiveMetastoreDB>
export JOB_ROLE_ARN=<Replace with JobRoleArn>
export S3_BUCKET=<Replace with S3Bucket>
export SPARK_APPLICATION_ID=<Replace with SparkApplicationId>
export SECRET_ID=<Replace with RDSSecretId>
export DBUSER=$(aws secretsmanager get-secret-value --secret-id $SECRET_ID | jq --raw-output '.SecretString' | jq -r .MasterUsername)
export DBPASSWORD=$(aws secretsmanager get-secret-value --secret-id $SECRET_ID | jq --raw-output '.SecretString' | jq -r .MasterUserPassword)
export JDBCDriverClass=org.mariadb.jdbc.Driver
export JDBCDriver=mariadb-connector-java.jar
  1. 替换完值后,复制代码并在Cloud9 Terminal中执行以设置环境变量。
  2. 在 Cloud9 控制台的左上角菜单中,单击File,然后选择New File并将以下代码片段复制到这个新文件中,并将文件另存为**“spark-nyctaxi.py”**。
import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":

    print(len(sys.argv))
    if (len(sys.argv) != 4):
        print("Usage: spark-nyctaxi [warehouse-location] [input-folder] [output-folder]")
        sys.exit(0)
     
    print("Warehouse location: " + sys.argv[1]+"/warehouse/") 
    print("CSV folder path: " + sys.argv[2])
    print("Writing the parquet file to the folder: " + sys.argv[3])
     
    spark = SparkSession \
        .builder \
        .config("spark.sql.warehouse.dir", sys.argv[1]+"/warehouse/" ) \
        .enableHiveSupport() \
        .getOrCreate()
    
    nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[2])
    
    updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
    
    updatedNYTaxi.registerTempTable("ny_taxi_table")
    
    spark.sql("SHOW DATABASES").show()
    spark.sql("CREATE DATABASE IF NOT EXISTS `hivemetastore`")
    spark.sql("DROP TABLE IF EXISTS hivemetastore.ny_taxi_parquet")
    
    updatedNYTaxi.write.option("path",sys.argv[3]).mode("overwrite").format("parquet").saveAsTable("hivemetastore.ny_taxi_parquet")
  1. 通过在 Cloud9 控制台中执行以下代码,将文件**“spark-nyctaxi.py”上传到您的 Workshop S3 存储桶。**确保 PySpark 脚本位于当前文件夹中。
1
aws s3 cp spark-nyctaxi.py "s3://${S3_BUCKET}"
  1. 通过在 Cloud9 控制台中执行以下代码,将 MariaDB 连接器上传到您的 Workshop S3 存储桶。
1
2
wget https://downloads.mariadb.com/Connectors/java/connector-java-3.0.7/mariadb-java-client-3.0.7.jar
aws s3 cp mariadb-java-client-3.0.7.jar "s3://${S3_BUCKET}/mariadb-connector-java.jar" 
  1. 在 Cloud9 控制台中安装最新版本 2 的 AWS CLI
1
2
3
4
5
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install -i ~/aws-cli
alias aws=~/aws-cli/v2/current/bin/aws
aws --version

您应该看到类似于下面的输出。请注意 aws cli 的版本是 2.xx

img

  1. 运行以下命令以在 EMR 无服务器应用程序上执行 Spark ETL 作业。S3 输出位置将作为参数传递给脚本,这作为entryPointArguments 传递
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
aws emr-serverless start-job-run \
  --application-id $SPARK_APPLICATION_ID \
  --execution-role-arn $JOB_ROLE_ARN \
  --job-driver '{
        "sparkSubmit": {
    "entryPoint": "s3://'${S3_BUCKET}'/spark-nyctaxi.py",
    "entryPointArguments":["s3://'"${S3_BUCKET}"'/","s3://aws-data-analytics-workshops/shared_datasets/tripdata/","s3://'"${S3_BUCKET}"'/taxi-data-rds-hive-metastore/"],
    "sparkSubmitParameters": "--jars s3://'${S3_BUCKET}'/'${JDBCDriver}' --conf spark.hadoop.javax.jdo.option.ConnectionDriverName='${JDBCDriverClass}' --conf spark.hadoop.javax.jdo.option.ConnectionUserName='${DBUSER}' --conf spark.hadoop.javax.jdo.option.ConnectionPassword='${DBPASSWORD}' --conf spark.hadoop.javax.jdo.option.ConnectionURL=\"jdbc:mariadb://'${MariaDBHost}':3306/hivemetastore\"  --conf spark.driver.cores=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=2"
}
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
        "s3MonitoringConfiguration": {
            "logUri": "s3://'${S3_BUCKET}'/sparklogs/"
        }
    }
}'

该作业将需要 8-10 分钟才能完成。

  1. 导航至EMR 控制台 。检查 AWS 区域并确保您选择正确的区域。
  2. 从左侧栏中选择EMR Serverless 。将弹出一个新窗口,显示 Amazon EMR 无服务器应用程序。单击管理应用程序
  3. 选择 EMR 无服务器应用程序 ID 以及您从EMR-Serverless-HMS CloudFormation 堆栈(您提交 Spark 作业的堆栈)的输出部分复制的****值。
  4. 检查您从 Cloud9 控制台提交的最新作业 ID 的状态,并等待该作业 ID 的运行状态为成功 img
  5. 导航至S3 控制台
  6. 让我们检查在 S3 存储桶中创建的输出数据,例如 emr-hms-[AWS-REGION]-[ACCOUNT-ID],即您从CloudFormation 堆栈的输出部分复制的****值。您会注意到创建了一个新文件夹taxi-data-rds-hive-metastoreimg

请随意在文件夹内浏览。您会注意到 parquet 文件已被写入。

img

  1. 返回 Cloud9 控制台,验证 MariaDB Hive Metastore 中的 Metastore 表创建。使用以下命令连接到 hivemetatsore 数据库:
1
mysql --host $MariaDBHost --user=$DBUSER --password=$DBPASSWORD hivemetastore

您可以看到包含不同元存储信息的元存储表,例如 TBLS 表中的表和 COLUMNS_V2 表中的列。

img

对 TBLS 表运行 SELECT 查询以查看我们在 Spark 程序中创建的表。

img

继续下一节。