在前面的实验中,我们创建了S3存储桶,在input目录上传了tripdata.csv文件。在这一节中,将继续使用此S3存储桶, 并使用Pig脚本分析CSV数据,将其转换成TSV格式。
将以下代码保存为ny-taxi.pig
,将其上传之前创建的S3桶的files
目录下。或通过此链接直接下载:
DEFINE CSVLoader org.apache.pig.piggybank.storage.CSVLoader();
NY_TAXI = LOAD '$INPUT' USING CSVLoader(',') AS
(vendor_id:int,
lpep_pickup_datetime:chararray,
lpep_dropoff_datetime:chararray,
store_and_fwd_flag:chararray,
rate_code_id:int,
pu_location_id:int,
do_location_id:int,
passenger_count:int,
trip_distance:double,
fare_amount:double,
mta_tax:double,
tip_amount:double,
tolls_amount:double,
ehail_fee:double,
improvement_surcharge:double,
total_amount:double,
payment_type:int,
trip_type:int);
STORE NY_TAXI into '$OUTPUT' USING PigStorage('\t');
上面的Pig脚本将做以下处理:
使用CSVLoader
加载CSV并解析数据。
将结果保存到${OUTPUT}
路径,这个路径在提交step的时候指定。
现在将这个Pig文件以步骤的方式提交到EMR。 在EMR集群的界面上,点击添加步骤:
在步骤类型中,选择Pig程序
, 相关配置如下:
s3://<YOUR-BUCKET>/files/ny-taxi.pig
s3://<YOUR-BUCKET>/input/
s3://<YOUR-BUCKET>/output/pig/
在点击添加后,Pig脚本会接着被执行,在大约2分钟过后检查S3/output/pig/
目录,可以看到结果文件: