在本节中,我们将学习如何使用 apache Spark connetor通过 EMR Serverless 和 Redshift Serverless 运行大数据工作负载
我们将在 EMR Serverless 上读取Redshift Serverless中的数据,使用 Spark 运行ETL,将数据加载回 Redshift Serverless;以及从S3 读取数据,在 EMR Serverless 上使用 Spark 运行ETL,然后将转换后的数据写入 Redshift Serverless:
subfolders (script and input)
Amazon S3 资源存储桶 ( )。S3Bucket
redshift-spark-demo-temp-dir
该前缀将作为 redshift 加载的临时目录。Action
从 EMR studio 中选择您的 EMR 无服务器应用程序,然后单击Configure
确保您的 EMR 无服务器应用程序使用 emr-6.10.0 或更高版本作为发布版本并且处于 STOPPED 状态。如果您的 EMR 无服务器应用程序与 emr-6.10.0 或更高版本不同,请创建一个新应用程序
Network connections
选择在 cloudformation 输出屏幕中找到的 VPC 和私有子网以及安全组部分的 EmrServerless-sg。在 Redshift Serverless 中激活示例数据库
using the admin username
),然后单击create connection
按钮。sample_data_dev
。三个示例架构对应于您可以加载到 Amazon Redshift Serverless 数据库中的三个示例数据集。现在让我们加载tpcds
模式数据。单击**Open sample notebook
tpcds 架构。sample_data_dev
、tpcds
schematest
让我们验证数据库、模式中是否没有表public
。redshift
,那么它将从 Redshift Serverless Sample_data_dev 数据库获取数据,进行转换并写回 Redshift 测试数据库。如果源是 S3,那么它将获取数据进行S3
转换并写入 redshift serverless 来测试数据库。。
from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.functions import upper, col, concat, lit, current_timestamp, current_date, count
from datetime import datetime
import sys
source = sys.argv[1]
accountId = sys.argv[2]
def create_redshift_context(database):
url_redshift = f"jdbc:redshift:iam://redshift-spark-wg.{accountId}.us-east-1.redshift-serverless.amazonaws.com:5439/{database}"
aws_role = f"arn:aws:iam::{accountId}:role/AmazonRedshiftCommandsAccessRole"
temp_dir= f"s3://emrserverless-workshop-{accountId}/redshift-spark-demo-temp-dir/"
redshiftOptions = {
"url": url_redshift,
"tempdir": temp_dir,
"aws_iam_role": aws_role
}
return redshiftOptions
def read_from_s3(spark):
startTime = datetime.now()
print("Process started at: ", startTime)
as_of_date = startTime.strftime("%Y-%m-%d")
rawDF = spark.read.parquet(f"s3://emrserverless-workshop-{accountId}/input/")
rawDF.printSchema()
print("Total number of records: " + str(rawDF.count()))
return rawDF
def read_from_redshift(spark, redshiftOptions):
customerDF = (
spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.options(**redshiftOptions)
.option("dbtable","tpcds.customer")
.load()
)
storeSalesDF = (
spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.options(**redshiftOptions)
.option("dbtable","tpcds.store_sales")
.load()
)
webSalesDF = (
spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.options(**redshiftOptions)
.option("dbtable","tpcds.web_Sales")
.load()
)
catalogSalesDF = (
spark.read
.format("io.github.spark_redshift_community.spark.redshift")
.options(**redshiftOptions)
.option("dbtable","tpcds.catalog_sales")
.load()
)
aggregateDF = customerDF \
.join(storeSalesDF, customerDF.c_customer_sk == storeSalesDF.ss_customer_sk, "left") \
.join(webSalesDF, customerDF.c_customer_sk == webSalesDF.ws_bill_customer_sk,"left") \
.join(catalogSalesDF, customerDF.c_customer_sk == catalogSalesDF.cs_bill_customer_sk, "left") \
.groupBy("c_customer_id") \
.agg(count("ss_customer_sk"),count("cs_bill_customer_sk"),count("ws_bill_customer_sk")) \
.select(col("c_customer_id"),\
col("count(ss_customer_sk)"), \
col("count(cs_bill_customer_sk)"), \
col("count(ws_bill_customer_sk)")) \
.withColumnRenamed("count(ss_customer_sk)", "total_store_sales") \
.withColumnRenamed("count(cs_bill_customer_sk)", "total_catalog_sales") \
.withColumnRenamed("count(ws_bill_customer_sk)", "total_web_sales")
return aggregateDF
def write_to_redshift (spark, dataframe, redshiftOptions, tableName, schemaName):
dataframe.write.format("io.github.spark_redshift_community.spark.redshift")\
.options(**redshiftOptions)\
.option("dbtable", f"{schemaName}.{tableName}")\
.option("tempformat", "CSV")\
.mode("append")\
.save()
if __name__ == "__main__":
spark = SparkSession.builder.appName("Redshift Spark Connector").config('spark.sql.debug.maxToStringFields',2000).getOrCreate()
redshiftOptionsWrite = create_redshift_context("test")
redshiftOptionsRead = create_redshift_context("sample_data_dev")
sourceDF = read_from_s3(spark) if source.upper() == "S3" else read_from_redshift(spark, redshiftOptionsRead)
write_to_redshift(spark, sourceDF, redshiftOptionsWrite, "nyc_taxi_trips","public") if source.upper() == "S3" else write_to_redshift(spark, sourceDF, redshiftOptionsWrite, "total_by_customer","public")
spark.stop()
1
2
3
4
export JOB_ROLE_ARN=<<EmrServerlessS3RuntimeRoleARN>>
export S3_BUCKET=s3://<<S3Bucket>>
export APPLICATION_ID=<<application_id>>
export ACCOUNT_ID=<<accountId>>
{ApplicationID}
为从 EMR Studio 获取的 EMR 无服务器应用程序,替换{JobExecutionArn}
为 CloudFormation 模板输出中的 EmrServerlessJobRoleArn。
aws emr-serverless start-job-run --application-id ${APPLICATION_ID} --region us-east-1 --execution-role-arn ${JOB_ROLE_ARN} --name "Redshift-Redshift-Job" --job-driver '{
"sparkSubmit": {
"entryPoint": "'"$S3_BUCKET"'/script/load_to_redshift.py",
"entryPointArguments": ["redshift","'"$ACCOUNT_ID"'"]
}
}' --configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {
"logUri": "'"${S3_BUCKET}"'/logs/"
}
}
}'
1
aws emr-serverless get-job-run --application-id ${APPLICATION_ID} --job-run-id jobRunId --region us-east-1
total_by_customer
中创建了表。test``public
在提交作业屏幕上的运行时角色下,您可以让 EMR Serverless 为作业创建角色,但我们将使用 Cloudformation 模板已创建的角色
对于本次研讨会,我们有一个由 Cloudformation 模板创建的角色,我们可以使用它。
在提交作业屏幕中输入以下详细信息:
姓名 | S3-Redshift-作业 |
运行时角色 | EMRServerlessS3Runtime角色 |
脚本位置 S3 URI | AccountId s3 ://emrserverless-workshop-/script/load_to_redshift.py |
脚本参数 | [“S3”,“ AccountId ”] |
点击提交作业
作业提交后,运行状态将显示成功。
nyc_taxi_trips
中创建了表。test``public
nyc_taxi_trips
对redshift 表运行选择查询