我们将在EMR主节点上使用Docker部署PostgreSQL数据库,配置Trino的PostgreSQL连接器,插入示例数据,并执行Trino联合查询(跨Hive和PostgreSQL)。
流程如下:
在EMR主节点上使用Docker部署PostgreSQL数据库
在PostgreSQL中创建示例数据
配置Trino的PostgreSQL连接器
在Hive中创建示例数据
使用Trino执行跨数据源的联合查询
分析查询执行计划
首先,通过SSH连接到EMR主节点, EMR上默认已经安装了docker,如果是以ec2-user
或hadoop
用户身份登录的话,默认没有执行docker的权限,需要先添加到组:
sudo usermod -aG docker hadoop # 或者是ec2-user
# 重新登录以使组更改生效
使用Docker部署PostgreSQL容器:
# 创建一个网络,方便后续连接
docker network create trino-net
# 运行PostgreSQL容器
docker run --name postgres-trino \
--network trino-net \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_USER=postgres \
-e POSTGRES_DB=testdb \
-p 5432:5432 \
-d postgres:13
验证PostgreSQL容器是否正在运行:
docker ps | grep postgres-trino
连接到PostgreSQL容器并创建示例数据:
# 安装PostgreSQL客户端(如果尚未安装)
sudo yum install -y postgresql17.x86_64
# 连接到PostgreSQL
PGPASSWORD=postgres psql -h localhost -U postgres -d testdb
在PostgreSQL中创建示例表和数据:
-- 创建示例表
CREATE SCHEMA IF NOT EXISTS sales;
CREATE TABLE sales.customers (
customer_id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
registration_date DATE
);
CREATE TABLE sales.products (
product_id SERIAL PRIMARY KEY,
name VARCHAR(100),
category VARCHAR(50),
price DECIMAL(10, 2)
);
CREATE TABLE sales.orders (
order_id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES sales.customers(customer_id),
order_date DATE,
total_amount DECIMAL(10, 2)
);
-- 插入示例数据
INSERT INTO sales.customers (name, email, registration_date)
VALUES
('John Doe', 'john.doe@example.com', '2023-01-15'),
('Jane Smith', 'jane.smith@example.com', '2023-02-20'),
('Robert Johnson', 'robert.j@example.com', '2023-03-05'),
('Emily Davis', 'emily.d@example.com', '2023-03-15'),
('Michael Brown', 'michael.b@example.com', '2023-04-10');
INSERT INTO sales.products (name, category, price)
VALUES
('Laptop Pro', 'Electronics', 1299.99),
('Smartphone X', 'Electronics', 799.99),
('Coffee Maker', 'Home Appliances', 89.99),
('Wireless Headphones', 'Electronics', 149.99),
('Blender', 'Home Appliances', 79.99);
INSERT INTO sales.orders (customer_id, order_date, total_amount)
VALUES
(1, '2023-05-10', 1299.99),
(2, '2023-05-15', 799.99),
(3, '2023-05-20', 89.99),
(4, '2023-06-05', 149.99),
(1, '2023-06-10', 79.99),
(2, '2023-06-15', 1299.99);
-- 验证数据
SELECT * FROM sales.customers;
SELECT * FROM sales.products;
SELECT * FROM sales.orders;
-- 退出PostgreSQL
\q
创建PostgreSQL连接器配置文件:
sudo vi /etc/trino/conf/catalog/postgresql.properties
添加以下配置内容:
connector.name=postgresql
connection-url=jdbc:postgresql://localhost:5432/testdb
connection-user=postgres
connection-password=postgres
下载PostgreSQL JDBC驱动:
# 创建插件目录(如果不存在)
sudo mkdir -p /usr/lib/trino/lib/plugin/postgresql
# 下载PostgreSQL JDBC驱动
sudo wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar -P /usr/lib/trino/lib/plugin/postgresql/
重启Trino服务以应用新配置:
sudo systemctl restart trino-server
现在,让我们在Hive中创建一些示例数据,以便后续进行联合查询:
# 切换到hadoop user
sudo su hadoop
# 启动Hive CLI
hive
在Hive中创建示例表和数据:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS retail;
-- 使用数据库
USE retail;
-- 创建示例表
CREATE TABLE IF NOT EXISTS inventory (
product_id INT,
warehouse_id INT,
quantity INT,
last_updated DATE
);
-- 插入示例数据
INSERT INTO inventory VALUES
(1, 101, 50, '2023-06-01'),
(2, 101, 30, '2023-06-01'),
(3, 101, 20, '2023-06-01'),
(4, 102, 40, '2023-06-02'),
(5, 102, 25, '2023-06-02'),
(1, 102, 10, '2023-06-02');
-- 创建另一个表
CREATE TABLE IF NOT EXISTS warehouses (
warehouse_id INT,
name STRING,
location STRING
);
-- 插入数据
INSERT INTO warehouses VALUES
(101, 'Central Warehouse', 'New York'),
(102, 'West Coast Warehouse', 'Los Angeles');
-- 验证数据
SELECT * FROM inventory;
SELECT * FROM warehouses;
-- 退出Hive
exit;
现在,我们可以使用Trino CLI进行联合查询,跨PostgreSQL和Hive数据源:
# 启动Trino CLI
trino-cli --server localhost:8889
在Trino中执行查询:
-- 列出所有目录
SHOW CATALOGS;
-- 验证PostgreSQL连接器
SHOW SCHEMAS FROM postgresql;
SHOW TABLES FROM postgresql.sales;
先将/etc/trino/conf/config.properties
的node-scheduler.include-coordinator
改为true:
sudo cat /etc/trino/conf/config.properties
coordinator=true
node-scheduler.include-coordinator=true
discovery.uri=http://ip-172-31-37-81.us-west-2.compute.internal:8889
http-server.threads.max=500
discovery-server.enabled=true
再次登录到hive:
# 查询pg里面的数据
SELECT * FROM postgresql.sales.customers LIMIT 5;
-- 验证Hive连接器
SHOW SCHEMAS FROM hive;
SHOW TABLES FROM hive.retail;
SELECT * FROM hive.retail.inventory LIMIT 5;
-- 执行联合查询:查找有库存的产品及其详细信息
SELECT
p.product_id,
p.name AS product_name,
p.category,
p.price,
i.warehouse_id,
w.name AS warehouse_name,
i.quantity,
i.last_updated
FROM
postgresql.sales.products p
JOIN
hive.retail.inventory i ON p.product_id = i.product_id
JOIN
hive.retail.warehouses w ON i.warehouse_id = w.warehouse_id;
-- 执行联合查询:查找每个客户的订单总数和总金额
SELECT
c.customer_id,
c.name AS customer_name,
COUNT(o.order_id) AS order_count,
SUM(o.total_amount) AS total_spent
FROM
postgresql.sales.customers c
LEFT JOIN
postgresql.sales.orders o ON c.customer_id = o.customer_id
GROUP BY
c.customer_id, c.name
ORDER BY
total_spent DESC;
-- 执行复杂联合查询:查找每个产品类别在各仓库的库存情况
SELECT
p.category,
w.location,
COUNT(DISTINCT p.product_id) AS unique_products,
SUM(i.quantity) AS total_quantity,
AVG(p.price) AS avg_price,
SUM(p.price * i.quantity) AS inventory_value
FROM
postgresql.sales.products p
JOIN
hive.retail.inventory i ON p.product_id = i.product_id
JOIN
hive.retail.warehouses w ON i.warehouse_id = w.warehouse_id
GROUP BY
p.category, w.location
ORDER BY
inventory_value DESC;
分析查询计划:
-- 分析简单联合查询的执行计划
EXPLAIN
SELECT
p.product_id,
p.name AS product_name,
i.quantity
FROM
postgresql.sales.products p
JOIN
hive.retail.inventory i ON p.product_id = i.product_id;
-- 分析复杂联合查询的执行计划
EXPLAIN
SELECT
p.category,
w.location,
COUNT(DISTINCT p.product_id) AS unique_products,
SUM(i.quantity) AS total_quantity
FROM
postgresql.sales.products p
JOIN
hive.retail.inventory i ON p.product_id = i.product_id
JOIN
hive.retail.warehouses w ON i.warehouse_id = w.warehouse_id
GROUP BY
p.category, w.location;
-- 退出Trino CLI
exit;
清理资源:
# 停止并删除PostgreSQL容器
docker stop postgres-trino
docker rm postgres-trino
# 删除Docker网络
docker network rm trino-net