在本节中,我们将学习如何在 EMR on EMR 运行 Spark ETL 任务。
在下面的代码中,Spark 从 Amazon S3 读取 NY 出租车行程数据。该脚本更新时间戳列,打印schema和行数,最后以 parquet 格式将数据写入 Amazon S3。最后一部分可能需要一些时间,具体取决于 EKS 集群的大小。
请注意,输入和输出位置作为参数传递。
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
print(len(sys.argv))
if (len(sys.argv) != 3):
print("Usage: spark-etl [input-folder] [output-folder]")
sys.exit(0)
spark = SparkSession\
.builder\
.appName("SparkETL")\
.getOrCreate()
nyTaxi = spark.read.option("inferSchema", "true").option("header", "true").csv(sys.argv[1])
updatedNYTaxi = nyTaxi.withColumn("current_date", lit(datetime.now()))
updatedNYTaxi.printSchema()
print(updatedNYTaxi.show())
print("Total number of records: " + str(updatedNYTaxi.count()))
updatedNYTaxi.write.parquet(sys.argv[2])
运行以下命令,在 EMR on EKS 集群上执行 Spark ETL 任务。S3 输出位置将使用entryPointArguments
参数传递给该脚本:
aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-etl \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-6.2.0-latest \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/spark-etl.py",
"entryPointArguments": ["s3://aws-data-analytics-workshops/shared_datasets/tripdata/",
"'"$S3_BUCKET"'/taxi-data/"
],
"sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
}
}'\
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.driver.memory":"2G"
}
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-containers/jobs",
"logStreamNamePrefix": "emr-eks-workshop"
},
"s3MonitoringConfiguration": {
"logUri": "'"$S3_BUCKET"'/logs/"
}
}
}'
等待ETL任务执行完成后,到s3桶查看输出: