自适应扫描

背景介绍

PolarDB for PostgreSQL 提供了一款强大的分析型查询引擎——PX(Parallel eXecution),通过利用集群中多个节点的计算能力,来实现跨节点的并行查询功能。PX 可以支持顺序扫描、索引扫描等多种物理算子的跨节点并行化。其中,对顺序扫描算子,PX 提供了两种扫描模式,分别为 自适应扫描模式非自适应扫描模式

术语

  • QC:Query Coordinator,发起 PX 并行查询的进程角色。
  • PX Worker:参与 PX 跨节点并行查询的工作进程角色。
  • Worker ID:唯一标识一个 PX Worker 的编号。
  • Disk Unit ID:PX 跨节点并行扫描的最小存储单元,默认为 4MB 大小。

功能介绍

非自适应扫描

非自适应扫描模式是 PX 顺序扫描算子(Sequential Scan)的默认扫描方式。每一个参与并行查询的 PX Worker 在执行过程中都会被分配一个唯一的 Worker ID。非自适应扫描模式将会依据 Worker ID 划分数据表在物理存储上的 Disk Unit ID,从而实现每个 PX Worker 可以均匀扫描数据表在共享存储上的存储单元,所有 PX Worker 的扫描结果最终汇总形成全量的数据。

自适应扫描

在非自适应扫描模式下,扫描单元会均匀划分给每个 PX Worker。当存在个别只读节点计算资源不足的情况下,可能会导致扫描过程发生计算倾斜:用户发起的单次并行查询迟迟不能完成,查询受限于计算资源不足的节点长时间不能完成扫描任务。

PX 提供的自适应扫描模式可以解决这个问题。自适应扫描模式不再限定每个 PX Worker 扫描特定的 Disk Unit ID,而是采用 请求-响应(Request-Response)模式,通过 QC 进程与 PX Worker 进程之间的特定 RPC 通信机制,由 QC 进程负责告知每个 PX Worker 进程可以执行的扫描任务,从而消除计算倾斜的问题。

功能设计

非自适应扫描

QC 进程在发起并行查询任务时,会为每个 PX Worker 进程分配固定的 Worker ID,每个 PX Worker 进程根据 Worker ID 对存储单元 取模,只扫描其所属的特定的 Dist Unit。

non-adaptive-scan

自适应扫描

QC 进程在发起并行查询任务时,会启动 自适应扫描线程,用于接收并处理来自 PX Worker 进程的请求消息。自适应扫描线程维护了当前查询扫描任务的进度,并根据每个 PX Worker 进程的工作进度,向 PX Worker 进程分派需要扫描的 Disk Unit ID。对于需要扫描的最后一个 Disk Unit,自适应扫描线程会唤醒处于空闲状态的 PX Worker,加速最后一块 Disk Unit 的扫描过程。

adaptive-scan

消息通信机制

由于自适应扫描线程与各个 PX worker 进程之间的通信数据很少,频率不高,所以重用了已有的 QC 进程与 PX worker 进程之间的 libpq 连接进行报文通信。自适应扫描线程通过 poll 的方式在需要时同步轮询 PX Worker 进程的请求和响应。

扫描任务协调

PX Worker 进程在执行顺序扫描算子时,会首先向 QC 进程发起询问请求,将以下信息发送给 QC 端的自适应扫描线程:

  • 扫描任务的编号
  • 扫描动作(正向 / 反向扫描)
  • 扫描物理块数

自适应扫描线程在收到询问请求后,会创建扫描任务或更新扫描任务的进度。

可变颗粒度

为了减少请求带来的网络交互次数,PX 实现了可变的任务颗粒度。当扫描任务量剩余较多时,PX Worker 进程单次领取的扫描物理块数较多;当扫描任务量剩余较少时,PX Worker 进程单次领取的扫描物理块数相应减少。通过这种方法,可以平衡 网络开销负载均衡 两者之间的关系。

缓存友好

自适应扫描模式将尽量保证每个节点在多次执行并行查询任务时,能够重用 Shared Buffer 缓存,避免缓存频繁更新 / 淘汰。在实现上,自适应扫描功能会根据 集群拓扑视图 配置的节点 IP 地址信息,采用缓存绑定策略,尽量让同一个物理 Page 被同一个节点复用。

报文设计

  • PX Worker 请求报文:采用 libpq 的 'S' 协议进行通信,按照 key-value 的方式编码为字符串。

    | 内容 | 描述 | | ------------ | ------------------ | | task_id | 扫描任务编号 | | direction | 扫描方向 | | page_count | 需扫描的总物理块数 | | scan_start | 扫描起始物理块号 | | current_page | 当前扫描的物理块号 | | scan_round | 扫描的次数 |

  • 自适应扫描线程回复报文

    | 内容 | 描述 | | ---------- | ------------------ | | success | 是否成功 | | page_start | 响应的起始物理块号 | | page_end | 响应的结束物理块号 |

使用指南

创建测试表:

postgres=# CREATE TABLE t(id INT);
CREATE TABLE
postgres=# INSERT INTO t VALUES(generate_series(1,100));
INSERT 0 100

非自适应扫描

开启 PX 并行查询功能,并设置单节点并发度为 3。通过 EXPLAIN 可以看到执行计划来自 PX 优化器。由于参与测试的只读节点有两个,所以从执行计划中可以看到整体并发度为 6。

postgres=# SET polar_enable_px = 1;
SET
postgres=# SET polar_px_dop_per_node = 3;
SET
postgres=# SHOW polar_px_enable_adps;
 polar_px_enable_adps
----------------------
 off
(1 row)

postgres=# EXPLAIN SELECT * FROM t;
                                  QUERY PLAN
-------------------------------------------------------------------------------
 PX Coordinator 6:1  (slice1; segments: 6)  (cost=0.00..431.00 rows=1 width=4)
   ->  Partial Seq Scan on t  (cost=0.00..431.00 rows=1 width=4)
 Optimizer: PolarDB PX Optimizer
(3 rows)

postgres=# SELECT COUNT(*) FROM t;
 count
-------
   100
(1 row)

自适应扫描

开启自适应扫描功能的开关后,通过 EXPLAIN ANALYZE 可以看到每个 PX Worker 进程扫描的物理块号。

postgres=# SET polar_enable_px = 1;
SET
postgres=# SET polar_px_dop_per_node = 3;
SET
postgres=# SET polar_px_enable_adps = 1;
SET
postgres=# SHOW polar_px_enable_adps;
 polar_px_enable_adps
----------------------
 on
(1 row)

postgres=# SET polar_px_enable_adps_explain_analyze = 1;
SET
postgres=# SHOW polar_px_enable_adps_explain_analyze;
 polar_px_enable_adps_explain_analyze
--------------------------------------
 on
(1 row)

postgres=# EXPLAIN ANALYZE SELECT * FROM t;
                                                        QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
 PX Coordinator 6:1  (slice1; segments: 6)  (cost=0.00..431.00 rows=1 width=4) (actual time=0.968..0.982 rows=100 loops=1)
   ->  Partial Seq Scan on t  (cost=0.00..431.00 rows=1 width=4) (actual time=0.380..0.435 rows=100 loops=1)
         Dynamic Pages Per Worker: [1]
 Planning Time: 5.571 ms
 Optimizer: PolarDB PX Optimizer
   (slice0)    Executor memory: 23K bytes.
   (slice1)    Executor memory: 14K bytes avg x 6 workers, 14K bytes max (seg0).
 Execution Time: 9.047 ms
(8 rows)

postgres=# SELECT COUNT(*) FROM t;
 count
-------
   100
(1 row)

results matching ""

    No results matching ""