Recently I am busy with some escalations on lateral join in Greenplum. During the process of studying the related code and issues, many problems intertwine. These are common issues in an MPP database.
A more recent blog should be better: Interconnect Deadlock Problem in Greenplum.
Basic Knowledges
Motions, slices and Gang
Plan in Greenplum is a distributed plan because it introduces Motion plannnode to implement data communication for a distributed algorithm. Every motion plannode cuts the plan into pieces: the below piece is going to send data in the motion, the upper piece is going to receive data from the motion. These pieces are called Slice in Greenplum (the concept is very similar to the stages in spark). Each slice is executed by a Gang (a group of distributed processes). Communication among gangs is by Network.

In the above picture, it is a plan tree picture of the query select * from t1 join t2 where t1.a = t2.a
. Each part with the same background color is a slice.
Execution of Motion
Motion plannode is executed in two difference slices. One slice will invoke execMotionSender
, the other slice will invoke execMotionUnsortedReceiver
(there is another function execMotionSortedReceiver
, we ignore it for this discussion).
The sender has to get ACK from receiver so that the send buffer can be cleared. If the sender buffer is full and no ACK is got, the send slice will wait.
If the subplot of a sender motion does not provide any more tuples, the motion sender will send EOS to each motion receive in the upper slice.
Motion deadlock
Theory Analysis
Deadlock happens when you have multiple processes and each of them may wait. As stated in the above section, the process of gang can be waiting for others to send either data or ACK. This may have risk for deadlock. Let me use an example to demonstrate this.
The following example is based on gpdb’s master branch (with the top commit 5a1676), the optimizer is using planner.
gpadmin=# create table t1 (a int, b int) distributed randomly;
CREATE TABLE
gpadmin=# create table t2 (a int, b int) distributed randomly;
CREATE TABLE
gpadmin=# set enable_hashjoin = false;
SET
gpadmin=# set enable_nestloop = true;
SET
gpadmin=# explain (costs off) select * from t1 join t2 on t1.a = t2.a;
QUERY PLAN
------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Nested Loop
Join Filter: (t1.a = t2.a)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: t1.a
-> Seq Scan on t1
-> Materialize
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: t2.a
-> Seq Scan on t2
The above plan’s main part is a nest loop join and both its left tree and right tree contains motion. First let’s list all the processes in the cluster.
- Join slice: A0 A1 A2 (this gang contains 3 processes in different segments)
- Outer slice: B0 B1 B2 (this gang scans table t1 in different segments)
- Inner slice: C0 C1 C2 (this gang scans table t2 in different segments)
The execution of Nestloop of Postgres is:
- Fetch a tuple from outer plan
- Fetch a tuple from inner plan and check if the join qual is satisfied
- if so, emit a join tuple
- if not, go to 2
- if there is not tuple from inner plan, go to 1
Now consider the following scenario:
- A0 has already fetched a tuple from outer plan, and it is waiting for inner plan to send a tuple to it, suppose C1, C2 has finished their jobs (or there is no data of t2 in seg1 and seg2). That means A0 is in fact waiting for C0 to send data to it. A0->C0
- C0’s buffer is full with data sending to A1, A1 does not send any ACK to C0 (A1 ‘s ic thread’s buffer is full so it does not recv data from C0 thus does not send ACK back). C0->A1
- A1 is going to fetch a tuple from outer plan, and it is waiting for B0 (suppose seg0 and seg1 contains no data of t1). A1->B0
- B0’s buffer I full with data sending to A0, A0 does not send any ACK to B0 (A0’s ic thread’s buffer is full so it does not recv data from B0 thus does not send ACK back). B0->A0
So the deadlock of motion happens: A0->C0->A1->B0->A0.
Practical Case
The above analysis is pure theory so can we manually create such deadlock? To demonstrate this we have to hack some code:
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 0b99bfeec72..ff526719842 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -993,6 +993,9 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
best_path->outerjoinpath->motionHazard)
((Join *) plan)->prefetch_joinqual = true;
+ ((Join *) plan)->prefetch_inner = false;
+ ((Join *) plan)->prefetch_joinqual = false;
+
/*
* If there are any pseudoconstant clauses attached to this node, insert a
* gating Result node that evaluates the pseudoconstants as one-time
Let me build a case step by step:
- A0 -> C0, A0 must have got one outerTuple, and A0 is blocking other processes, so A0’s buffer should be filled with unwanted data (not inner plan’s data). Let’s use B1 to fill its buffer. So B1 contains only data and enough data that will send to A0.
- C0 -> A1, A1’s buffer must be filled with outer plan’s data. Let’s use C1 to fill A1’s buffer. So C1 contains only data and enough data that will send to A1.
- A1-> B0, B0 simply cannot send data to A1
- B0 -> A0, this is because A0’s buffer is filled with B1’s data, and B0’s send buffer is filled with data to A0 but no ACK.
create table t1(a int, b int, c int, d int) distributed randomly;
create table t2(a int, b int, c int, d int) distributed randomly;
-- integer type 1 will be sent to seg1
-- integer type 2 will be sent to seg0
-- use utility mode to login in seg0 to insert (2,2,2,2) into t1
-- use utility mode to login in seg1 to insert (2,2,2,2) into t1
-- use utility mode to login in seg1 to insert (1,1,1,1) into t1
select * from t1, t2 where t1.a = t2.a; -- this will lead to deadlock!
And the ps command output is:
gpadmin 68499 7000, gpadmin gpadmin [local] con6 cmd18 SELECT
gpadmin 68524 7003, gpadmin gpadmin 127.0.0.1(31990) con6 seg1 cmd19 slice1 MPPEXEC SELECT
gpadmin 68525 7004, gpadmin gpadmin 127.0.0.1(45726) con6 seg2 cmd19 slice1 MPPEXEC SELECT
gpadmin 68526 7002, gpadmin gpadmin 127.0.0.1(60546) con6 seg0 cmd19 slice1 MPPEXEC SELECT
gpadmin 68594 7002, gpadmin gpadmin 127.0.0.1(60576) con6 seg0 cmd19 slice2 MPPEXEC SELECT
gpadmin 68595 7003, gpadmin gpadmin 127.0.0.1(32020) con6 seg1 cmd19 slice2 MPPEXEC SELECT
gpadmin 68597 7004, gpadmin gpadmin 127.0.0.1(45756) con6 seg2 idle
gpadmin 68600 7003, gpadmin gpadmin 127.0.0.1(32026) con6 seg1 cmd19 slice3 MPPEXEC SELECT
gpadmin 68601 7002, gpadmin gpadmin 127.0.0.1(60582) con6 seg0 idle
gpadmin 68602 7004, gpadmin gpadmin 127.0.0.1(45762) con6 seg2 idle
And if you remove the hacking patch code, everything works fine.
SUMMARY OF MOTION DEADLOCK
Based the implementation of Greenplum’s interconnect and the distributed execution engine, if a join’s plan’s both sub-trees contain motion, there is risk for motion deadlock because of the asynchronous execution of gangs and the buffer of interconnect may be filled with “unwanted data”.
Prefetch Inner
Greenplum is popular in the world and many many big companies use Greenplum 7*24. In practical environment Greenplum definitely does not have risk for motion deadlock.
Q: So what is the technique to avoid this?
A: Prefetch inner plan.
In the above section, only by hacking the code to force prefetch inner plan to be true can I manually make a motion deadlock case.
The plannode Join contains two new fields (prefetch_inner
and prefetch_joinqual
) added by Greenplum, we ignore prefetch_joinqual
in this blog and focus on the discussion of prefetch_inner
.
typedef struct Join
{
Plan plan;
JoinType jointype;
List *joinqual; /* JOIN quals (in addition to plan.qual) */
bool prefetch_inner; /* to avoid deadlock in MPP */
bool prefetch_joinqual; /* to avoid deadlock in MPP */
} Join;
I highly recommend reading the comments in the function src/backend/cdb/cdbllize.c:motion_sanity_walker
. Let me paste the comments here and discuss it in details.
/*
* Special handling for branch points because there is a possibility of a
* deadlock if there are Motions in both branches and one side is not
* first pre-fetched.
*
* The deadlock occurs because, when the buffers on the Send side of a
* Motion are completely filled with tuples, it blocks waiting for an ACK.
* Without prefetch_inner, the Join node reads one tuple from the outer
* side first and then starts retrieving tuples from the inner side -
* either to build a hash table (in case of HashJoin) or for joining (in
* case of MergeJoin and NestedLoopJoin).
*
* Thus we can end up with 4 processes infinitely waiting for each other :
*
* A : Join slice that already retrieved an outer tuple, and is waiting
* for inner tuples from D. B : Join slice that is still waiting for the
* first outer tuple from C. C : Outer slice whose buffer is full sending
* tuples to A and is blocked waiting for an ACK from A. D : Inner slice
* that is full sending tuples to B and is blocked waiting for an ACK from
* B.
*
* A cannot ACK C because it is waiting to finish retrieving inner tuples
* from D. B cannot ACK D because it is waiting for it's first outer tuple
* from C before accepting any inner tuples. This forms a circular
* dependency resulting in a deadlock : C -> A -> D -> B -> C.
*
* We avoid this by pre-fetching all the inner tuples in such cases and
* materializing them in some fashion, before moving on to outer_tuples.
* This effectively breaks the cycle and prevents deadlock.
*
* Details:
* https://groups.google.com/a/greenplum.org/forum/#!msg/gpdb-dev/gMa1tW0x_fk/wuzvGXBaBAAJ
*/
Prefetch inner plan contains outerParams
Postgres does not have prefech-logic. Greenplum has such logic but does not fully consider every corner case. If the inner plan contains outerParam refs, how can you prefetch a inner plan that depends on these outer params? At that time, the outer params have not been saved in the econtext, and the only thing contained in econtext is NULL pointers.
If lucky (I am not sure whether it is lucky or unlucky), the Datum’s type is integer, then we do not parse them as pointer, just take them as zero. Nothing crashes. But what if these Datums have to be parsed as pointer? Then the system will encounter null pointer reference error and PANIC.
I run the ICW test cases and found several statements that prefetch inner plan with outerParams. Let me show the example and show that how we can make it PANIC.
The following case does not use lateral syntax. It is a test case in the PR Ensure that Motion nodes in parameterized plans are not rescanned. #6776. It is in src/test/regress/sql/bfv_joins.sql
.
create table a (i int4);
create table b (i int4);
create table c (i int4, j int4);
insert into a select g from generate_series(1,1) g;
insert into b select g from generate_series(1,1) g;
insert into c select g, g from generate_series(1, 100) g;
create index on c (i,j);
-- In order to get the plan we want, Index Scan on 'c' must appear
-- much cheaper than a Seq Scan. In order to keep this test quick and small,
-- we don't want to actually create a huge table, so cheat a little and
-- force that stats to make it look big to the planner.
set allow_system_table_mods = on;
update pg_class set reltuples=10000000 where oid ='c'::regclass;
set enable_hashjoin=off;
set enable_mergejoin=off;
set enable_nestloop=on;
-- the plan should look something like this:
--
-- QUERY PLAN
-- ---------------------------------------------------------------------------
-- Gather Motion 3:1 (slice1; segments: 3)
-- -> Nested Loop [1]
-- -> Broadcast Motion 3:3 (slice2; segments: 3)
-- -> Seq Scan on b
-- -> Materialize [6]
-- -> Nested Loop [2]
-- Join Filter: (b.i = a.i)
-- -> Materialize [5]
-- -> Broadcast Motion 3:3 (slice3; segments: 3) [3]
-- -> Seq Scan on a
-- -> Index Only Scan using c_i_j_idx on c
-- Index Cond: (j = (a.i + b.i)) [4]
-- Optimizer: Postgres query optimizer
-- (14 rows)
--
-- The crucal parts are:
--
-- * Nested Loop join on the inner side of another Nested Loop join [1], [2]
--
-- * Motion on the outer side of the inner Nested Loop join (the Broadcast
-- Motion on top of "Seq Scan on a" [3])
--
-- * An Index scan in the innermost path, which uses an executor parameter
-- from the outermost path ("b.i", in the Index Cond) [4]
--
-- There must be a Materialize node on top of the "Broadcast Motion -> Seq Scan"
-- path [5]. Otherwise, when the outermost scan on 'b' produces a new row, and
-- the outer Nested Loop calls Rescan on its inner side, the Motion node would
-- be rescanned. Note that the Materialize node at [6] does *not* shield the
-- Motion node from rescanning! That Materialize node is rescanned, when the
-- executor parameter 'b.i' changes.
explain (costs off) select * from a, b, c where b.i = a.i and (a.i + b.i) = c.j;
The above is the whole test cases and the plan is like:
QUERY PLAN
---------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Nested Loop
-> Broadcast Motion 3:3 (slice2; segments: 3)
-> Seq Scan on b
-> Materialize
-> Nested Loop
Join Filter: (b.i = a.i)
-> Materialize
-> Broadcast Motion 3:3 (slice3; segments: 3)
-> Seq Scan on a
-> Index Only Scan using c_i_j_idx on c
Index Cond: (j = (a.i + b.i))
The above nestloop plan need prefetch because both left tree and right tree contain motion. But the join filter of the inner plan’s nest loop depends on the outer param b.i
. These can be verified by using gdb to read the nestParams
filed and prefetch_inner
filed of the top nest loop plannode.
But the case runs very successfully for a long time and never fails. This is because the type of b.i
is integer and we do not parse it as pointer. With some modification, the system will PANIC due to null pointer reference. The idea is to use user defined type.
create type mytype as (x int, y int);
alter table b add column mt_col mytype;
gpadmin=# explain verbose select a.*, b.i, c.* from a, b, c where (mt_col).x > a.i and (a.i + b.i) = c.j;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3) (cost=0.19..765906.70 rows=24 width=16)
Output: a.i, b.i, c.i, c.j
-> Nested Loop (cost=0.19..765906.23 rows=8 width=16)
Output: a.i, b.i, c.i, c.j
-> Broadcast Motion 3:3 (slice2; segments: 3) (cost=0.00..1.05 rows=1 width=36)
Output: b.i, b.mt_col
-> Seq Scan on public.b (cost=0.00..1.01 rows=1 width=36)
Output: b.i, b.mt_col
-> Materialize (cost=0.19..255301.71 rows=2 width=12)
Output: a.i, c.i, c.j
-> Nested Loop (cost=0.19..255301.69 rows=2 width=12)
Output: a.i, c.i, c.j
Join Filter: ((b.mt_col).x > a.i)
-> Materialize (cost=0.00..1.06 rows=1 width=4)
Output: a.i
-> Broadcast Motion 3:3 (slice3; segments: 3) (cost=0.00..1.05 rows=1 width=4)
Output: a.i
-> Seq Scan on public.a (cost=0.00..1.01 rows=1 width=4)
Output: a.i
-> Index Only Scan using c_i_j_idx on public.c (cost=0.19..85100.20 rows=1 width=8)
Output: c.i, c.j
Index Cond: (c.j = (a.i + b.i))
Optimizer: Postgres query optimizer
gpadmin=# select a.*, b.i, c.* from a, b, c where (mt_col).x > a.i and (a.i + b.i) = c.j;
ERROR: Error on receive from seg2 slice2 127.0.1.1:7004 pid=116028: server closed the connection unexpectedly
Part of the coredump’s stack is
(gdb) bt
#0 raise (sig=<optimized out>) at ../sysdeps/unix/sysv/linux/raise.c:51
#1 0x00005572d3c9941c in StandardHandlerForSigillSigsegvSigbus_OnMainThread (processName=0x5572d3fa0d92 "Segment process", postgres_signal_arg=11) at elog.c:5689
#2 0x00005572d3ae7e71 in CdbProgramErrorHandler (postgres_signal_arg=11) at postgres.c:3495
#3 <signal handler called>
#4 0x00005572d3c9e92f in pg_detoast_datum (datum=0x0) at fmgr.c:2204
#5 0x00005572d388486d in ExecEvalFieldSelect (fstate=0x5572d59fe558, econtext=0x5572d59fda80, isNull=0x5572d59fe0f0 "", isDone=0x0) at execQual.c:4648
#6 0x00005572d387f566 in ExecMakeFunctionResultNoSets (fcache=0x5572d59fdd40, econtext=0x5572d59fda80, isNull=0x7ffc7e3654c6 "", isDone=0x0) at execQual.c:2080
#7 0x00005572d38802ae in ExecEvalOper (fcache=0x5572d59fdd40, econtext=0x5572d59fda80, isNull=0x7ffc7e3654c6 "", isDone=0x0) at execQual.c:2545
#8 0x00005572d38894e5 in ExecQual (qual=0x5572d59fe6d0, econtext=0x5572d59fda80, resultForNull=0 '\000') at execQual.c:6346
#9 0x00005572d388f259 in ExecPrefetchJoinQual (node=0x5572d59fd910) at execUtils.c:1186
#10 0x00005572d38b6cea in ExecNestLoop_guts (node=0x5572d59fd910) at nodeNestloop.c:150
#11 0x00005572d38b71bb in ExecNestLoop (node=0x5572d59fd910) at nodeNestloop.c:352
#12 0x00005572d387ab1f in ExecProcNode (node=0x5572d59fd910) at execProcnode.c:1011
#13 0x00005572d38ad74a in ExecMaterial (node=0x5572d59fd600) at nodeMaterial.c:138
#14 0x00005572d387ab5e in ExecProcNode (node=0x5572d59fd600) at execProcnode.c:1026
#15 0x00005572d38b6c15 in ExecNestLoop_guts (node=0x5572d59fd138) at nodeNestloop.c:111
#16 0x00005572d38b71bb in ExecNestLoop (node=0x5572d59fd138) at nodeNestloop.c:352
#17 0x00005572d387ab1f in ExecProcNode (node=0x5572d59fd138) at execProcnode.c:1011
#18 0x00005572d38d5fad in execMotionSender (node=0x5572d59fccd8) at nodeMotion.c:271
#19 0x00005572d38d5e98 in ExecMotion (node=0x5572d59fccd8) at nodeMotion.c:235
#20 0x00005572d387ac45 in ExecProcNode (node=0x5572d59fccd8) at execProcnode.c:1070
Thoughts on Solutions
The simplest fix without deep thoughts is to add a check during planning and throw error when we are going to prefetch a inner plan that reference outer params.
But can we do better? Currently, my idea is let’s answer why we prefetch? OK, we want to avoid motion deadlock. But what is the issue’s root? I think prefetching is to materialization of motion so that scan of inner’s will not ask for interconnect. And params cannot be passed across motion, so just materialization of motions should be safe. And after this, we can return any thing to inner plan’s prefetch, because it will not impact final result.
I still have to understand every detail of interconnect and materialization plannode. After that, I will come up with a best solution.