Recently I see a GitHub Issue motion deadlock, previously we have a bunch of PRs to fix related issues, and now it comes again. I feel it is time to look deeply into the problem and jump out of the patches over patches old circles.
A thread is als created in gpdb-dev mailing list: https://groups.google.com/a/greenplum.org/g/gpdb-dev/c/Y4ajINeKeUw
History background
I can only talk about open-source discussions in my personal blog.
- GPDB-DEV mailing list thread: Deadlock hazards and joins
- PR #7492: Fix motion hazard between outer and joinqual
- PR #11862: Prefetch NonJoinQual to avoid motion hazard.
- Read the source code
cdbllize.c:motion_sanity_walker
The above patches all focus on JOIN, actually, the motion interconnect deadlock is not a problem of JOIN plans.
The highlight of Interconnect UDP
There is a good talk on the network, watching it can provide some useful background of interconnect.
Motion plan nodes cut the whole plan into pieces and each piece is called a slice. A slice is a static part of a program and is executed by a group of processes. This group of processes is called Gang. A Gang will send tuples to another Gang via interconnect. IC-UDP is one implementation of interconnect.
Greenplum inherits Postgres’ execution model: one tuple a time. For motion node, it is also the case. Each call of ExecMotion
will return a tuple util NULL
, just like ExecSeqScan
and Motion node will never spill to disk (the memory complexity is constant level). Motion node is not rescannable. A material node over a motion node is the skill to turn a motion node rescannable.
Under IC-UDP, each QE will contains two threads:
- the main thread executes the Plan
- the other thread is called interconnect receive thread: receive data from UDP, send ACKs and wake up main thread if necassary
Motion receive process (thread) will maintain a buffer for each motion connection, the buffer is fixed size, if it is full and the main thread does not consume tuples of that motion, that motion will be blocked. Motion senders will poll ack from the target.
Read the code of the function handleDataPacket
for more details and pay attention to the return value of wakeup_mainthread
. Function SendChunkUDPIFC
is the routine to send data, after sending out it will wait for ACK (read function `pollAcks` and handleAcks
).
SubPlan in Greenplum
Expressions in programming languages can be evaluated to values. SubPlan is a kind of expression in SQL plan. It can be evaluated to values by executing a SQL plan.
typedef struct SubPlan
{
Expr xpr;
/* Fields copied from original SubLink: */
SubLinkType subLinkType; /* see above */
......
SubPlan can appear at any places when an expression is expected: targetlist, where condition, join condition…
An expression (SubPlan) is evaluated in a QE (on one segment) and most of the cases it need to fetch all of a distributed table’s information. So in MPP database, Greenplum’s implementation of SubPlan is to broadcast (or gather) and materialize the base distributed relations in a SubPlan. There is also a kind of SubPlan called HashSubPlan, it will build a hash table to story tuples for later hash operations, the hash table here is sort of like materialization.
In short, it is easy to introduce motion (by SubPlan) in targetlist, plan qual and joinqual in Greenplum.
Motion Deadlock Pattern
S
is a slice withgangsize > 1
S
has multiple child Slices, at leastC1, C2
S
‘s algorithm is:
fetch one tuple from C1
fetch multiple tuples (or one tuple?) from C2
The above pattern will lead to motion deadlock under IC-UDP. And let’s analyze the wait event chain based on the following plan.
Hash Join
\____ Redistribute Motion (Motion A)
\____ Scan
\____ Hash
\____ Redistribute Motion (Motion B)
\____ Scan
Before the analysis, lets define some symbols:
JS
: the join sliceJS.x
the QE of join slice on segmentx
OS
: the outer sliceOS.x
: the QE of the outer slice on segmentx
IS
: the inner sliceIS.x
: the QE of the inner slice on segmentx
The Algorithm of Hash Join:
- fetch one tuple from outer (Motion A)
- if there is nothing, then let’s skip building the Hash Table
- if there is something, then fetch all tuples from Motion B and build the Hash Table
The wait chain can be:
-
JS.0
is waiting for tuples fromOS
, suppose there is no data sent to it, so it is actually waiting for all EOS ofOS
OS.1
‘s sender buffer is full, is waiting for ACK fromJS.1
JS.1
is building the Hash Table, it is waiting for tuples fromIS
IS.0
sender buffer is full, is waiting for ACK fromJS.0
Motion Deadlock Cases
Playing with the following cases need:
- gpdb main branch, with top commit
df114f0d3d22
- a 3-seg demo cluster
- apply the following two patches to remove prefetch logic
Join’s outer & Join’s inner
create table t_motion_deadlock_1(a int, b int);
create table t_motion_deadlock_2(a int, b int);
insert into t_motion_deadlock_1 select i,i from generate_series(1, 30000)i;
delete from t_motion_deadlock_1 where gp_segment_id <> 1;
-- after the above steps, table t_motion_deadlock_1 only
-- contain plenty of data on seg1, and redistribute t seg1 (since a=b)
insert into t_motion_deadlock_2 select i,i from generate_series(1, 30000)i;
delete from t_motion_deadlock_2 where gp_segment_id <> 0;
-- after the above steps, table t_motion_deadlock_2 only
-- contain plenty of data on seg0, and will redistribute to seg0 (since a=b)
insert into t_motion_deadlock_2
select y.a, x.b from t_motion_deadlock_1 x, t_motion_deadlock_2 y limit 10;
-- the final insert into to insert some data in
-- t_motion_deadlock_2 (at the end), and these part of
-- data will redistribute to seg1
gpadmin=# explain (costs off, verbose)
select count(1)
from
t_motion_deadlock_1 x,
t_motion_deadlock_2 y
where x.b = y.b;
QUERY PLAN
------------------------------------------------------------------------------
Finalize Aggregate
Output: count(1)
-> Gather Motion 3:1 (slice1; segments: 3)
Output: (PARTIAL count(1))
-> Partial Aggregate
Output: PARTIAL count(1)
-> Hash Join
Hash Cond: (x.b = y.b)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Output: x.b
Hash Key: x.b
-> Seq Scan on public.t_motion_deadlock_1 x
Output: x.b
-> Hash
Output: y.b
-> Redistribute Motion 3:3 (slice3; segments: 3)
Output: y.b
Hash Key: y.b
-> Seq Scan on public.t_motion_deadlock_2 y
Output: y.b
Join’s outer & join qual & plan qual
create table t_motion_deadlock_1(a int, b int);
create table t_motion_deadlock_2(a int, b int);
create table t_motion_deadlock_3(a int, b int);
insert into t_motion_deadlock_1 select i,i from generate_series(1, 30000)i;
delete from t_motion_deadlock_1 where gp_segment_id <> 1;
-- after the above steps, t_motion_deadlock_1 only contain plenty of data
-- on seg1
-- we don't care t_motion_deadlock_2, just make sure
-- it contain data on each segment (no empty Hash every where)
insert into t_motion_deadlock_2 select i,i from generate_series(1, 30)i;
-- plenty of data in t_motion_deadlock_3
insert into t_motion_deadlock_3 select i,i from generate_series(1, 10000)i;
gpadmin=# explain (costs off, verbose)
select count(1)
from
t_motion_deadlock_1 x join t_motion_deadlock_2 y
on x.b = y.a and
x.b + y.a > (select count(1) from t_motion_deadlock_3 z where z.b < x.a + y.b);
QUERY PLAN
--------------------------------------------------------------------------------------------
Finalize Aggregate
Output: count(1)
-> Gather Motion 3:1 (slice1; segments: 3)
Output: (PARTIAL count(1))
-> Partial Aggregate
Output: PARTIAL count(1)
-> Hash Join
Hash Cond: (x.b = y.a)
Join Filter: ((x.b + y.a) > (SubPlan 1))
-> Redistribute Motion 3:3 (slice2; segments: 3)
Output: x.b, x.a
Hash Key: x.b
-> Seq Scan on public.t_motion_deadlock_1 x
Output: x.b, x.a
-> Hash
Output: y.a, y.b
-> Seq Scan on public.t_motion_deadlock_2 y
Output: y.a, y.b
SubPlan 1
-> Aggregate
Output: count(1)
-> Result
Filter: (z.b < (x.a + y.b))
-> Materialize
Output: z.b
-> Broadcast Motion 3:3 (slice3; segments: 3)
Output: z.b
-> Seq Scan on public.t_motion_deadlock_3 z
Output: z.b
gpadmin=# explain (costs off, verbose)
select *
from
t_motion_deadlock_1 x left join t_motion_deadlock_2 y
on x.b = y.a
where
x.a is null or exists (select random() from t_motion_deadlock_3 z where z.b < x.a + y.b)
limit random();
QUERY PLAN
--------------------------------------------------------------------------------
Limit
Output: x.a, x.b, y.a, y.b
-> Gather Motion 3:1 (slice1; segments: 3)
Output: x.a, x.b, y.a, y.b
-> Hash Left Join
Output: x.a, x.b, y.a, y.b
Hash Cond: (x.b = y.a)
Filter: ((x.a IS NULL) OR (SubPlan 1))
-> Redistribute Motion 3:3 (slice3; segments: 3)
Output: x.a, x.b
Hash Key: x.b
-> Seq Scan on public.t_motion_deadlock_1 x
Output: x.a, x.b
-> Hash
Output: y.a, y.b
-> Seq Scan on public.t_motion_deadlock_2 y
Output: y.a, y.b
SubPlan 1
-> Result
Filter: (z.b < (x.a + y.b))
-> Materialize
Output: z.b
-> Broadcast Motion 3:3 (slice2; segments: 3)
Output: z.b
-> Seq Scan on public.t_motion_deadlock_3 z
Output: z.b
Join’s outer & Target List subplan
create table t_motion_deadlock_1(a int, b int);
create table t_motion_deadlock_2(a int, b int);
create table t_motion_deadlock_3(a int, b int);
insert into t_motion_deadlock_1 select i,i from generate_series(1, 30000)i;
delete from t_motion_deadlock_1 where gp_segment_id <> 1;
insert into t_motion_deadlock_2 select i,i from generate_series(1, 30)i;
insert into t_motion_deadlock_3 select i,i from generate_series(1, 10000)i;
explain (costs off, verbose)
select
(select count(1) from t_motion_deadlock_3 z where z.a < x.a + y.b ) s
from t_motion_deadlock_1 x join t_motion_deadlock_2 y on x.b = y.a;
QUERY PLAN
--------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
Output: ((SubPlan 1))
-> Hash Join
Output: (SubPlan 1)
Hash Cond: (x.b = y.a)
-> Redistribute Motion 3:3 (slice3; segments: 3)
Output: x.a, x.b
Hash Key: x.b
-> Seq Scan on public.t_motion_deadlock_1 x
Output: x.a, x.b
-> Hash
Output: y.b, y.a
-> Seq Scan on public.t_motion_deadlock_2 y
Output: y.b, y.a
SubPlan 1
-> Aggregate
Output: count(1)
-> Result
Filter: (z.a < (x.a + y.b))
-> Materialize
Output: z.a
-> Broadcast Motion 3:3 (slice2; segments: 3)
Output: z.a
-> Seq Scan on public.t_motion_deadlock_3 z
Output: z.a
Fix method
Currently, I am working on a fix for the problem. The idea is:
- always prefetch inner of Join
- when there are SubPlans in a node, prefetch the SubPlans
- find out all materialize above motion nodes, execute them
- if the subplan is hash subplan, prefetch means building the hash table if it is null
Pingback: Motion deadlock and all related things in Greenplum | Thoughts on Computing