跨数据源查询

我们将在EMR主节点上使用Docker部署PostgreSQL数据库,配置Trino的PostgreSQL连接器,插入示例数据,并执行Trino联合查询(跨Hive和PostgreSQL)。

流程如下:

  1. 在EMR主节点上使用Docker部署PostgreSQL数据库

  2. 在PostgreSQL中创建示例数据

  3. 配置Trino的PostgreSQL连接器

  4. 在Hive中创建示例数据

  5. 使用Trino执行跨数据源的联合查询

  6. 分析查询执行计划

首先,通过SSH连接到EMR主节点, EMR上默认已经安装了docker,如果是以ec2-userhadoop用户身份登录的话,默认没有执行docker的权限,需要先添加到组:

sudo usermod -aG docker hadoop  # 或者是ec2-user
# 重新登录以使组更改生效

image-20250719170216842

部署PostgreSQL容器

使用Docker部署PostgreSQL容器:

# 创建一个网络,方便后续连接
docker network create trino-net

# 运行PostgreSQL容器
docker run --name postgres-trino \
  --network trino-net \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_USER=postgres \
  -e POSTGRES_DB=testdb \
  -p 5432:5432 \
  -d postgres:13

验证PostgreSQL容器是否正在运行:

docker ps | grep postgres-trino

image-20250719170425251

创建示例数据

连接到PostgreSQL容器并创建示例数据:

# 安装PostgreSQL客户端(如果尚未安装)
sudo yum install -y postgresql17.x86_64

# 连接到PostgreSQL
PGPASSWORD=postgres psql -h localhost -U postgres -d testdb

在PostgreSQL中创建示例表和数据:

-- 创建示例表
CREATE SCHEMA IF NOT EXISTS sales;

CREATE TABLE sales.customers (
  customer_id SERIAL PRIMARY KEY,
  name VARCHAR(100),
  email VARCHAR(100),
  registration_date DATE
);

CREATE TABLE sales.products (
  product_id SERIAL PRIMARY KEY,
  name VARCHAR(100),
  category VARCHAR(50),
  price DECIMAL(10, 2)
);

CREATE TABLE sales.orders (
  order_id SERIAL PRIMARY KEY,
  customer_id INTEGER REFERENCES sales.customers(customer_id),
  order_date DATE,
  total_amount DECIMAL(10, 2)
);

-- 插入示例数据
INSERT INTO sales.customers (name, email, registration_date)
VALUES 
  ('John Doe', 'john.doe@example.com', '2023-01-15'),
  ('Jane Smith', 'jane.smith@example.com', '2023-02-20'),
  ('Robert Johnson', 'robert.j@example.com', '2023-03-05'),
  ('Emily Davis', 'emily.d@example.com', '2023-03-15'),
  ('Michael Brown', 'michael.b@example.com', '2023-04-10');

INSERT INTO sales.products (name, category, price)
VALUES 
  ('Laptop Pro', 'Electronics', 1299.99),
  ('Smartphone X', 'Electronics', 799.99),
  ('Coffee Maker', 'Home Appliances', 89.99),
  ('Wireless Headphones', 'Electronics', 149.99),
  ('Blender', 'Home Appliances', 79.99);

INSERT INTO sales.orders (customer_id, order_date, total_amount)
VALUES 
  (1, '2023-05-10', 1299.99),
  (2, '2023-05-15', 799.99),
  (3, '2023-05-20', 89.99),
  (4, '2023-06-05', 149.99),
  (1, '2023-06-10', 79.99),
  (2, '2023-06-15', 1299.99);

-- 验证数据
SELECT * FROM sales.customers;
SELECT * FROM sales.products;
SELECT * FROM sales.orders;

-- 退出PostgreSQL
\q

image-20250719170726566

配置Trino的PostgreSQL连接器

创建PostgreSQL连接器配置文件:

sudo vi /etc/trino/conf/catalog/postgresql.properties

添加以下配置内容:

connector.name=postgresql
connection-url=jdbc:postgresql://localhost:5432/testdb
connection-user=postgres
connection-password=postgres

下载PostgreSQL JDBC驱动:

# 创建插件目录(如果不存在)
sudo mkdir -p /usr/lib/trino/lib/plugin/postgresql

# 下载PostgreSQL JDBC驱动
sudo wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar -P /usr/lib/trino/lib/plugin/postgresql/

image-20250719170917529

重启Trino服务以应用新配置:

sudo systemctl restart trino-server

在Hive中创建示例数据

现在,让我们在Hive中创建一些示例数据,以便后续进行联合查询:

# 切换到hadoop user
sudo su hadoop
# 启动Hive CLI
hive

在Hive中创建示例表和数据:

-- 创建数据库
CREATE DATABASE IF NOT EXISTS retail;

-- 使用数据库
USE retail;

-- 创建示例表
CREATE TABLE IF NOT EXISTS inventory (
  product_id INT,
  warehouse_id INT,
  quantity INT,
  last_updated DATE
);

-- 插入示例数据
INSERT INTO inventory VALUES 
  (1, 101, 50, '2023-06-01'),
  (2, 101, 30, '2023-06-01'),
  (3, 101, 20, '2023-06-01'),
  (4, 102, 40, '2023-06-02'),
  (5, 102, 25, '2023-06-02'),
  (1, 102, 10, '2023-06-02');

-- 创建另一个表
CREATE TABLE IF NOT EXISTS warehouses (
  warehouse_id INT,
  name STRING,
  location STRING
);

-- 插入数据
INSERT INTO warehouses VALUES 
  (101, 'Central Warehouse', 'New York'),
  (102, 'West Coast Warehouse', 'Los Angeles');

-- 验证数据
SELECT * FROM inventory;
SELECT * FROM warehouses;

-- 退出Hive
exit;

image-20250719171051984

使用Trino进行联合查询

现在,我们可以使用Trino CLI进行联合查询,跨PostgreSQL和Hive数据源:

# 启动Trino CLI
trino-cli --server localhost:8889

在Trino中执行查询:

-- 列出所有目录
SHOW CATALOGS;

-- 验证PostgreSQL连接器
SHOW SCHEMAS FROM postgresql;
SHOW TABLES FROM postgresql.sales;

image-20250719171228707

先将/etc/trino/conf/config.propertiesnode-scheduler.include-coordinator改为true:

sudo cat /etc/trino/conf/config.properties

coordinator=true
node-scheduler.include-coordinator=true
discovery.uri=http://ip-172-31-37-81.us-west-2.compute.internal:8889
http-server.threads.max=500
discovery-server.enabled=true

再次登录到hive:

# 查询pg里面的数据		
SELECT * FROM postgresql.sales.customers LIMIT 5;

image-20250719173225589

-- 验证Hive连接器
SHOW SCHEMAS FROM hive;
SHOW TABLES FROM hive.retail;
SELECT * FROM hive.retail.inventory LIMIT 5;

image-20250719173309728

-- 执行联合查询:查找有库存的产品及其详细信息
SELECT 
  p.product_id,
  p.name AS product_name,
  p.category,
  p.price,
  i.warehouse_id,
  w.name AS warehouse_name,
  i.quantity,
  i.last_updated
FROM 
  postgresql.sales.products p
JOIN 
  hive.retail.inventory i ON p.product_id = i.product_id
JOIN 
  hive.retail.warehouses w ON i.warehouse_id = w.warehouse_id;

-- 执行联合查询:查找每个客户的订单总数和总金额
SELECT 
  c.customer_id,
  c.name AS customer_name,
  COUNT(o.order_id) AS order_count,
  SUM(o.total_amount) AS total_spent
FROM 
  postgresql.sales.customers c
LEFT JOIN 
  postgresql.sales.orders o ON c.customer_id = o.customer_id
GROUP BY 
  c.customer_id, c.name
ORDER BY 
  total_spent DESC;

-- 执行复杂联合查询:查找每个产品类别在各仓库的库存情况
SELECT 
  p.category,
  w.location,
  COUNT(DISTINCT p.product_id) AS unique_products,
  SUM(i.quantity) AS total_quantity,
  AVG(p.price) AS avg_price,
  SUM(p.price * i.quantity) AS inventory_value
FROM 
  postgresql.sales.products p
JOIN 
  hive.retail.inventory i ON p.product_id = i.product_id
JOIN 
  hive.retail.warehouses w ON i.warehouse_id = w.warehouse_id
GROUP BY 
  p.category, w.location
ORDER BY 
  inventory_value DESC;

image-20250719173411104

使用EXPLAIN分析查询计划

分析查询计划:

-- 分析简单联合查询的执行计划
EXPLAIN 
SELECT 
  p.product_id,
  p.name AS product_name,
  i.quantity
FROM 
  postgresql.sales.products p
JOIN 
  hive.retail.inventory i ON p.product_id = i.product_id;

-- 分析复杂联合查询的执行计划
EXPLAIN 
SELECT 
  p.category,
  w.location,
  COUNT(DISTINCT p.product_id) AS unique_products,
  SUM(i.quantity) AS total_quantity
FROM 
  postgresql.sales.products p
JOIN 
  hive.retail.inventory i ON p.product_id = i.product_id
JOIN 
  hive.retail.warehouses w ON i.warehouse_id = w.warehouse_id
GROUP BY 
  p.category, w.location;

-- 退出Trino CLI
exit;

image-20250719173504004

清理资源

清理资源:

# 停止并删除PostgreSQL容器
docker stop postgres-trino
docker rm postgres-trino

# 删除Docker网络
docker network rm trino-net