Interconnect Deadlock Problem in Greenplum

Recently I see a GitHub Issue motion deadlock, previously we have a bunch of PRs to fix related issues, and now it comes again. I feel it is time to look deeply into the problem and jump out of the patches over patches old circles.

A thread is als created in gpdb-dev mailing list: https://groups.google.com/a/greenplum.org/g/gpdb-dev/c/Y4ajINeKeUw

History background

I can only talk about open-source discussions in my personal blog.

The above patches all focus on JOIN, actually, the motion interconnect deadlock is not a problem of JOIN plans.

The highlight of Interconnect UDP

There is a good talk on the network, watching it can provide some useful background of interconnect.

Motion plan nodes cut the whole plan into pieces and each piece is called a slice. A slice is a static part of a program and is executed by a group of processes. This group of processes is called Gang. A Gang will send tuples to another Gang via interconnect. IC-UDP is one implementation of interconnect.

Greenplum inherits Postgres’ execution model: one tuple a time. For motion node, it is also the case. Each call of ExecMotion will return a tuple util NULL, just like ExecSeqScan and Motion node will never spill to disk (the memory complexity is constant level). Motion node is not rescannable. A material node over a motion node is the skill to turn a motion node rescannable.

Under IC-UDP, each QE will contains two threads:

  • the main thread executes the Plan
  • the other thread is called interconnect receive thread: receive data from UDP, send ACKs and wake up main thread if necassary

Motion receive process (thread) will maintain a buffer for each motion connection, the buffer is fixed size, if it is full and the main thread does not consume tuples of that motion, that motion will be blocked. Motion senders will poll ack from the target.

Read the code of the function handleDataPacket for more details and pay attention to the return value of wakeup_mainthread. Function SendChunkUDPIFC is the routine to send data, after sending out it will wait for ACK (read function `pollAcks` and handleAcks).

SubPlan in Greenplum

Expressions in programming languages can be evaluated to values. SubPlan is a kind of expression in SQL plan. It can be evaluated to values by executing a SQL plan.

typedef struct SubPlan
{
	Expr		xpr;
	/* Fields copied from original SubLink: */
	SubLinkType subLinkType;	/* see above */
    ......

SubPlan can appear at any places when an expression is expected: targetlist, where condition, join condition…

An expression (SubPlan) is evaluated in a QE (on one segment) and most of the cases it need to fetch all of a distributed table’s information. So in MPP database, Greenplum’s implementation of SubPlan is to broadcast (or gather) and materialize the base distributed relations in a SubPlan. There is also a kind of SubPlan called HashSubPlan, it will build a hash table to story tuples for later hash operations, the hash table here is sort of like materialization.

In short, it is easy to introduce motion (by SubPlan) in targetlist, plan qual and joinqual in Greenplum.

Motion Deadlock Pattern

  • S is a slice with gangsize > 1
  • S has multiple child Slices, at least C1, C2
  • S‘s algorithm is:
fetch one tuple from C1
fetch multiple tuples (or one tuple?) from C2

The above pattern will lead to motion deadlock under IC-UDP. And let’s analyze the wait event chain based on the following plan.

Hash Join
    \____ Redistribute Motion (Motion A)
            \____ Scan
    \____ Hash
            \____ Redistribute Motion (Motion B)
                     \____ Scan

Before the analysis, lets define some symbols:

  • JS: the join slice
  • JS.x the QE of join slice on segment x
  • OS: the outer slice
  • OS.x: the QE of the outer slice on segment x
  • IS: the inner slice
  • IS.x: the QE of the inner slice on segment x

The Algorithm of Hash Join:

  1. fetch one tuple from outer (Motion A)
  2. if there is nothing, then let’s skip building the Hash Table
  3. if there is something, then fetch all tuples from Motion B and build the Hash Table

The wait chain can be:

  1. JS.0 is waiting for tuples from OS, suppose there is no data sent to it, so it is actually waiting for all EOS of OS
  2. OS.1‘s sender buffer is full, is waiting for ACK from JS.1
  3. JS.1 is building the Hash Table, it is waiting for tuples from IS
  4. IS.0 sender buffer is full, is waiting for ACK from JS.0

Motion Deadlock Cases

Playing with the following cases need:

Join’s outer & Join’s inner

create table t_motion_deadlock_1(a int, b int);
create table t_motion_deadlock_2(a int, b int);

insert into t_motion_deadlock_1 select i,i from generate_series(1, 30000)i;
delete from t_motion_deadlock_1 where gp_segment_id <> 1;
-- after the above steps, table t_motion_deadlock_1 only
-- contain plenty of data on seg1, and redistribute t seg1 (since a=b)

insert into t_motion_deadlock_2 select i,i from generate_series(1, 30000)i;
delete from t_motion_deadlock_2 where gp_segment_id <> 0;
-- after the above steps, table t_motion_deadlock_2 only
-- contain plenty of data on seg0, and will redistribute to seg0 (since a=b)
insert into t_motion_deadlock_2
select y.a, x.b from t_motion_deadlock_1 x, t_motion_deadlock_2 y limit 10;
-- the final insert into to insert some data in
-- t_motion_deadlock_2 (at the end), and these part of
-- data will redistribute to seg1
gpadmin=# explain (costs off, verbose)
select count(1)
from
  t_motion_deadlock_1 x,
  t_motion_deadlock_2 y
where x.b = y.b;
                                  QUERY PLAN
------------------------------------------------------------------------------
 Finalize Aggregate
   Output: count(1)
   ->  Gather Motion 3:1  (slice1; segments: 3)
         Output: (PARTIAL count(1))
         ->  Partial Aggregate
               Output: PARTIAL count(1)
               ->  Hash Join
                     Hash Cond: (x.b = y.b)
                     ->  Redistribute Motion 3:3  (slice2; segments: 3)
                           Output: x.b
                           Hash Key: x.b
                           ->  Seq Scan on public.t_motion_deadlock_1 x
                                 Output: x.b
                     ->  Hash
                           Output: y.b
                           ->  Redistribute Motion 3:3  (slice3; segments: 3)
                                 Output: y.b
                                 Hash Key: y.b
                                 ->  Seq Scan on public.t_motion_deadlock_2 y
                                       Output: y.b

Join’s outer & join qual & plan qual

create table t_motion_deadlock_1(a int, b int);
create table t_motion_deadlock_2(a int, b int);
create table t_motion_deadlock_3(a int, b int);

insert into t_motion_deadlock_1 select i,i from generate_series(1, 30000)i;
delete from t_motion_deadlock_1 where gp_segment_id <> 1;
-- after the above steps, t_motion_deadlock_1 only contain plenty of data
-- on seg1

-- we don't care t_motion_deadlock_2, just make sure
-- it contain data on each segment (no empty Hash every where)
insert into t_motion_deadlock_2 select i,i from generate_series(1, 30)i;

-- plenty of data in t_motion_deadlock_3
insert into t_motion_deadlock_3 select i,i from generate_series(1, 10000)i;
gpadmin=# explain (costs off, verbose)
select count(1)
from
  t_motion_deadlock_1 x join t_motion_deadlock_2 y
  on x.b = y.a and
     x.b + y.a > (select count(1) from t_motion_deadlock_3 z where z.b < x.a + y.b);
                                         QUERY PLAN
--------------------------------------------------------------------------------------------
 Finalize Aggregate
   Output: count(1)
   ->  Gather Motion 3:1  (slice1; segments: 3)
         Output: (PARTIAL count(1))
         ->  Partial Aggregate
               Output: PARTIAL count(1)
               ->  Hash Join
                     Hash Cond: (x.b = y.a)
                     Join Filter: ((x.b + y.a) > (SubPlan 1))
                     ->  Redistribute Motion 3:3  (slice2; segments: 3)
                           Output: x.b, x.a
                           Hash Key: x.b
                           ->  Seq Scan on public.t_motion_deadlock_1 x
                                 Output: x.b, x.a
                     ->  Hash
                           Output: y.a, y.b
                           ->  Seq Scan on public.t_motion_deadlock_2 y
                                 Output: y.a, y.b
                     SubPlan 1
                       ->  Aggregate
                             Output: count(1)
                             ->  Result
                                   Filter: (z.b < (x.a + y.b))
                                   ->  Materialize
                                         Output: z.b
                                         ->  Broadcast Motion 3:3  (slice3; segments: 3)
                                               Output: z.b
                                               ->  Seq Scan on public.t_motion_deadlock_3 z
                                                     Output: z.b


gpadmin=# explain (costs off, verbose)
select *
from
  t_motion_deadlock_1 x left join t_motion_deadlock_2 y
  on x.b = y.a
where
   x.a is null or exists (select random() from t_motion_deadlock_3 z where z.b < x.a + y.b)
limit random();
                                   QUERY PLAN
--------------------------------------------------------------------------------
 Limit
   Output: x.a, x.b, y.a, y.b
   ->  Gather Motion 3:1  (slice1; segments: 3)
         Output: x.a, x.b, y.a, y.b
         ->  Hash Left Join
               Output: x.a, x.b, y.a, y.b
               Hash Cond: (x.b = y.a)
               Filter: ((x.a IS NULL) OR (SubPlan 1))
               ->  Redistribute Motion 3:3  (slice3; segments: 3)
                     Output: x.a, x.b
                     Hash Key: x.b
                     ->  Seq Scan on public.t_motion_deadlock_1 x
                           Output: x.a, x.b
               ->  Hash
                     Output: y.a, y.b
                     ->  Seq Scan on public.t_motion_deadlock_2 y
                           Output: y.a, y.b
               SubPlan 1
                 ->  Result
                       Filter: (z.b < (x.a + y.b))
                       ->  Materialize
                             Output: z.b
                             ->  Broadcast Motion 3:3  (slice2; segments: 3)
                                   Output: z.b
                                   ->  Seq Scan on public.t_motion_deadlock_3 z
                                         Output: z.b

Join’s outer & Target List subplan

create table t_motion_deadlock_1(a int, b int);
create table t_motion_deadlock_2(a int, b int);
create table t_motion_deadlock_3(a int, b int);

insert into t_motion_deadlock_1 select i,i from generate_series(1, 30000)i;
delete from t_motion_deadlock_1 where gp_segment_id <> 1;
insert into t_motion_deadlock_2 select i,i from generate_series(1, 30)i;
insert into t_motion_deadlock_3 select i,i from generate_series(1, 10000)i;
explain (costs off, verbose)
select
  (select count(1) from t_motion_deadlock_3 z where z.a < x.a + y.b ) s
from t_motion_deadlock_1 x join t_motion_deadlock_2 y on x.b = y.a;
                                   QUERY PLAN
--------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   Output: ((SubPlan 1))
   ->  Hash Join
         Output: (SubPlan 1)
         Hash Cond: (x.b = y.a)
         ->  Redistribute Motion 3:3  (slice3; segments: 3)
               Output: x.a, x.b
               Hash Key: x.b
               ->  Seq Scan on public.t_motion_deadlock_1 x
                     Output: x.a, x.b
         ->  Hash
               Output: y.b, y.a
               ->  Seq Scan on public.t_motion_deadlock_2 y
                     Output: y.b, y.a
         SubPlan 1
           ->  Aggregate
                 Output: count(1)
                 ->  Result
                       Filter: (z.a < (x.a + y.b))
                       ->  Materialize
                             Output: z.a
                             ->  Broadcast Motion 3:3  (slice2; segments: 3)
                                   Output: z.a
                                   ->  Seq Scan on public.t_motion_deadlock_3 z
                                         Output: z.a

Fix method

Currently, I am working on a fix for the problem. The idea is:

  • always prefetch inner of Join
  • when there are SubPlans in a node, prefetch the SubPlans
    • find out all materialize above motion nodes, execute them
    • if the subplan is hash subplan, prefetch means building the hash table if it is null

1 thought on “Interconnect Deadlock Problem in Greenplum

  1. Pingback: Motion deadlock and all related things in Greenplum | Thoughts on Computing

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.