Tour of analyzing of a bug of Greenplum: Locus, Partial Distribution, Motion and Interconnect

Overview

The bad plan and wrong result

When reviewing the code of the PR Improve Append plan when General children exist #9692, I have come to this line of code in the function set_append_path_locus:

/*
 * CDB: If all the scans are distributed alike, set
 * the result locus to match.  Otherwise, if all are partitioned,
 * set it to strewn.  A mixture of partitioned and non-partitioned
 * scans should not occur after above correction;
 *
 * CDB TODO: When the scans are not all partitioned alike, and the
 * result is joined with another rel, consider pushing the join
 * below the Append so that child tables that are properly
 * distributed can be joined in place.
 */
if (isfirst)
{
	targetlocus = projectedlocus;
	isfirst = false;
}
else if (cdbpathlocus_equal(targetlocus, projectedlocus))
{
	/* compatible */
}
else
{
	/*
	 * subpaths have different distributed policy, mark it as random
	 * distributed and set the numsegments to the maximum of all
	 * subpaths to not missing any tuples.
	 */
	CdbPathLocus_MakeStrewn(&targetlocus,
							Max(CdbPathLocus_NumSegments(targetlocus),
								CdbPathLocus_NumSegments(projectedlocus)));
	break;
}

While looking at the code to set numsegments of the final result, some interesting cases come into my mind: what if one of the subpaths is of partial tables (it contains motion path) and the other paths are fully distributed tables. Then I quickly try the following case based on the latest master branch (the top commit could be 1da52e5d):

gpadmin=# create table t1(a int, b int);
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE
gpadmin=# create table t2(a int, b int);
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE
gpadmin=# set allow_system_table_mods = on;
SET
gpadmin=# update gp_distribution_policy set numsegments = 2 where localoid = 't1'::regclass::oid or localoid = 't2'::regclass::oid;
UPDATE 2
gpadmin=# explain select * from t1 join t2 on t1.a = t2.b;
                                             QUERY PLAN
-----------------------------------------------------------------------------------------------------
 Gather Motion 2:1  (slice1; segments: 2)  (cost=2037.25..1153983.05 rows=7413210 width=16)
   ->  Hash Join  (cost=2037.25..1005718.85 rows=3706605 width=16)
         Hash Cond: (t2.b = t1.a)
         ->  Redistribute Motion 2:2  (slice2; segments: 2)  (cost=0.00..2683.00 rows=43050 width=8)
               Hash Key: t2.b
               ->  Seq Scan on t2  (cost=0.00..961.00 rows=43050 width=8)
         ->  Hash  (cost=961.00..961.00 rows=43050 width=8)
               ->  Seq Scan on t1  (cost=0.00..961.00 rows=43050 width=8)
 Optimizer: Postgres query optimizer
(9 rows)

gpadmin=# create table t3(a int, b int);
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table.
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE
gpadmin=# select localoid::regclass::text, * from gp_distribution_policy ;
 localoid | localoid | policytype | numsegments | distkey | distclass
----------+----------+------------+-------------+---------+-----------
 t1       |    16385 | p          |           2 | 1       | 10027
 t2       |    16388 | p          |           2 | 1       | 10027
 t3       |    16391 | p          |           3 | 1       | 10027
(3 rows)

gpadmin=# explain select t1.a, t2.b from t1 join t2 on t1.a = t2.b union all select * from t3;
                                                QUERY PLAN
-----------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)  (cost=2037.25..1230798.15 rows=7499310 width=8)
   ->  Append  (cost=2037.25..1080811.95 rows=2499770 width=8)
         ->  Hash Join  (cost=2037.25..1005718.85 rows=2471070 width=8)
               Hash Cond: (t2.b = t1.a)
               ->  Redistribute Motion 2:3  (slice2; segments: 2)  (cost=0.00..2683.00 rows=43050 width=4)
                     Hash Key: t2.b
                     ->  Seq Scan on t2  (cost=0.00..961.00 rows=43050 width=4)
               ->  Hash  (cost=961.00..961.00 rows=28700 width=4)
                     ->  Seq Scan on t1  (cost=0.00..961.00 rows=28700 width=4)
         ->  Seq Scan on t3  (cost=0.00..961.00 rows=28700 width=8)
 Optimizer: Postgres query optimizer
(11 rows)

The above test case involves two partial tables t1 and t2 with numsegments = 2. t3 is fully distributed table with numsegments = 3.

Looking at the final plan in the above test case, the motion over t2 is very strange: it shows that this motion redistributes t2 from 2 segments to 3 segments to join with t1 while we know that t1 is a partial table with numsegments = 2. t1 is distributed by its first column a, to implement t1 join t2 on t2.b = t1.a, we could redistribute t2 so that t1 and t2 are co-located (which means the join qual cannot happen across segments). So the target of this redistribute motion should be t1‘s policy numsegments. Otherwise, we will lose some data because the co-location assumption cannot be guaranteed.

One word on Greenplum core concept

Greenplum adds many powerful concepts on Postgres to build a powerful MPP database. The concepts are: motion, slice, gang, dispatch…

Refer to this slide to get some 101 knowledge.

One word on history of partial table

Partial table is introduced in Greenplum 6 to enhance online expansion. Greenplum database is an MPP database and data is distributed on every segment. The very common distribution policy is hash distributed by some columns. A good design of policy based on the daily jobs can make your query much more faster.

A hash distribution policy is also determined by the cluster size since there are two steps to locate the segment id of a tuple under hash distributed policy:

  1. based on the tuple and distribution keys’ type and value, use some hash function to map the tuple to a hash value (uint64): tuple \implies uint64
  2. based on the hash value computed in the above step, use consistent hashing to get the segment id: hashvalue::uint64, numsegments::int \implies segmentid::int. This step contains a very important argument: the table’s numsegments.

So when the cluster is expanded and before we totally redistribute all tables’ data, queries can still enjoy the benefit of co-location.

One word on path locus

Greenplum has two optimizers:

  • ORCA (refer to the paper for details, and now orca is in gpdb’s repo src/backend/gporca)
  • Planner: this is implemented based on Postgres planner, a very important concept to get a distributed plan is locus. Refer to the talk by me to get more information.

Before applying cost model, every thing is path (a potential plan) in Planner. To describe path’s distribution feature, Greenplum introduces the concept of locus. Besides, in MPP database, at some point we have to move data. This is motion in Greenplum and from planner’s perspective, motion is used to change locus.

How to fix the bug?

The root cause of this bug

Let’s first locate the buggy code. A very quick entry point of this is the code of explain. We can look at the code that prints 2:3 in the motion then maybe we can find the commit or some other clues.

Go to the function explain.c:ExplainNode and search how it handles Motion plan node. Then we come to this part of the code https://github.com/greenplum-db/gpdb/blob/1da52e5d9dbfa960b7574742ccb64d4ff86a4aac/src/backend/commands/explain.c#L1418

case T_Motion:
{
	Motion		*pMotion = (Motion *) plan;

	Assert(plan->lefttree);

	motion_snd = list_length(es->currentSlice->segments);
	motion_recv = (parentSlice == NULL ? 1 : list_length(parentSlice->segments));

	/* scale the number of rows by the number of segments sending data */
	scaleFactor = motion_snd;

	switch (pMotion->motionType)
	{
		case MOTIONTYPE_GATHER:
			sname = "Gather Motion";
			scaleFactor = 1;
			motion_recv = 1;
			break;
		case MOTIONTYPE_GATHER_SINGLE:
			sname = "Explicit Gather Motion";
			scaleFactor = 1;
			motion_recv = 1;
			break;
		case MOTIONTYPE_HASH:
			sname = "Redistribute Motion";
			break;
		case MOTIONTYPE_BROADCAST:
			sname = "Broadcast Motion";
			break;
		case MOTIONTYPE_EXPLICIT:
			sname = "Explicit Redistribute Motion";
			break;
		default:
			sname = "???";
			break;
	}
//....
}

Now we understand explain uses the parent slice’s size as the motion’s target size. This gives us some clues on this bug:

In setting append path’s locus, we encounter partial subpath with motion path inside it, and the final locus is strewn so we set numsegments as the max of subpaths. This introduces a jump of numsegments between parent slice and child slice but this kind of gap should be impact motion path created before.

The idea is quite clear and by debugging into the issue, we can find the code in ExecInitMotion(https://github.com/greenplum-db/gpdb/blob/3d11e871daefab7cde733d3f985be08d9a9fa8e7/src/backend/executor/nodeMotion.c#L764):

motionstate->numHashSegments = recvSlice->planNumSegments;
if (motionstate->mstype == MOTIONSTATE_SEND && node->motionType == MOTIONTYPE_HASH)
{
	int			nkeys;

	nkeys = list_length(node->hashExprs);

	if (nkeys > 0)
		motionstate->hashExprs = (List *) ExecInitExpr((Expr *) node->hashExprs,
													   (PlanState *) motionstate);

	/*
	 * Create hash API reference
	 */
	motionstate->cdbhash = makeCdbHash(motionstate->numHashSegments,
									   nkeys,
									   node->hashFuncs);
}

From the above code we know that the target size of the motion is determined by the receive slice’s size. Git blame the code and we could know it is from the commit https://github.com/greenplum-db/gpdb/commit/1f7c024fdeba64e5d0e0f92dfc6ade48de14971e:

Stop using Flows in executor, except during initialization.

Flow was used to get the number of receiving segments for the hash
calculation in in Motion and Result nodes. Use the number of segments
from the slice table instead. Seems more clear, and removes some ugly
differences in handling Postgres planner vs. ORCA plans.

In an upcoming commit, I will move the responsibility of generating the
slice table from the executor to the planner. Once that's done, there will
no more references to Flows in the executor at all. For now, though, we're
still using the flows to build the slice table at executor startup.

Reviewed-by: Taylor Vesely <tvesely@pivotal.io>

The assumption here (the redistribute motion’s target size is the receive slice’s size) is broken by setting the parent slice’s locus’s numsegments using Max (the numsegments gap).

Patch to fix this bug

The idea is to add an extra field in motion plan node to record the number of the actual receivers of this motion. The following patch might fix this (Note that the patch does not fix the print of explain).

From 787184b5bdd1b582009579a27d0dd326b514bc9e Mon Sep 17 00:00:00 2001
From: Zhenghua Lyu <kainwen@gmail.com>
Date: Sun, 26 Apr 2020 14:42:36 +0800
Subject: [PATCH] xxxxx

---
 src/backend/executor/nodeMotion.c       | 2 +-
 src/backend/nodes/copyfuncs.c           | 1 +
 src/backend/nodes/outfast.c             | 1 +
 src/backend/nodes/outfuncs.c            | 2 ++
 src/backend/nodes/readfast.c            | 1 +
 src/backend/optimizer/plan/createplan.c | 2 ++
 src/include/nodes/plannodes.h           | 2 ++
 7 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/src/backend/executor/nodeMotion.c b/src/backend/executor/nodeMotion.c
index d7dfad96f7..842ee3c56c 100644
--- a/src/backend/executor/nodeMotion.c
+++ b/src/backend/executor/nodeMotion.c
@@ -761,7 +761,7 @@ ExecInitMotion(Motion *node, EState *estate, int eflags)
 	motionstate->ps.ps_ProjInfo = NULL;

 	/* Set up motion send data structures */
-	motionstate->numHashSegments = recvSlice->planNumSegments;
+	motionstate->numHashSegments = node->recv_numsegments;
 	if (motionstate->mstype == MOTIONSTATE_SEND && node->motionType == MOTIONTYPE_HASH)
 	{
 		int			nkeys;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 10628efff5..ee0e13e5d0 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -1437,6 +1437,7 @@ _copyMotion(const Motion *from)
 	COPY_POINTER_FIELD(nullsFirst, from->numSortCols * sizeof(bool));

 	COPY_SCALAR_FIELD(segidColIdx);
+	COPY_SCALAR_FIELD(recv_numsegments);

 	if (from->senderSliceInfo)
 	{
diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c
index 56a226037e..838d2cc43a 100644
--- a/src/backend/nodes/outfast.c
+++ b/src/backend/nodes/outfast.c
@@ -543,6 +543,7 @@ _outMotion(StringInfo str, Motion *node)
 	WRITE_BOOL_ARRAY(nullsFirst, node->numSortCols);

 	WRITE_INT_FIELD(segidColIdx);
+	WRITE_INT_FIELD(recv_numsegments);

 	_outPlanInfo(str, (Plan *) node);
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 0f3b79a710..42a094c8a5 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -1328,6 +1328,8 @@ _outMotion(StringInfo str, const Motion *node)

 	WRITE_INT_FIELD(segidColIdx);

+	WRITE_INT_FIELD(recv_numsegments);
+
 	/* senderSliceInfo is intentionally omitted. It's only used during planning */

 	_outPlanInfo(str, (Plan *) node);
diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c
index 3f042f25f3..e348a1fc80 100644
--- a/src/backend/nodes/readfast.c
+++ b/src/backend/nodes/readfast.c
@@ -1568,6 +1568,7 @@ _readMotion(void)
 	READ_BOOL_ARRAY(nullsFirst, local_node->numSortCols);

 	READ_INT_FIELD(segidColIdx);
+	READ_INT_FIELD(recv_numsegments);

 	ReadCommonPlan(&local_node->plan);

diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 38c2d4fdb3..4f04878545 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -2505,6 +2505,7 @@ create_motion_plan(PlannerInfo *root, CdbMotionPath *path)
 	int			before_numMotions;
 	PlanSlice  *save_curSlice = root->curSlice;
 	PlanSlice  *sendSlice;
+	int         motion_path_numsegments = ((Path*)path)->locus.numsegments;

 	/*
 	 * singleQE-->entry:  Elide the motion.  The subplan will run in the same
@@ -2641,6 +2642,7 @@ create_motion_plan(PlannerInfo *root, CdbMotionPath *path)
 	/* Add motion operator. */
 	motion = cdbpathtoplan_create_motion_plan(root, path, subplan);
 	motion->senderSliceInfo = sendSlice;
+	motion->recv_numsegments = motion_path_numsegments;

 	if (subpath->locus.locustype == CdbLocusType_Replicated)
 		motion->motionType = MOTIONTYPE_GATHER_SINGLE;
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 99bd6fc6a5..60f5e558ba 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -1348,6 +1348,8 @@ typedef struct Motion
 	Oid		   *collations;		/* OIDs of collations */
 	bool	   *nullsFirst;		/* NULLS FIRST/LAST directions */

+	int         recv_numsegments;
+
 	/* sender slice info */
 	PlanSlice  *senderSliceInfo;
 } Motion;
--
2.17.1

With the above patch, we can get the correct result. But why?

Why Greenplum can handle the above case correct after the fixing?

The above patch fixes the cdbhash info during ExecInitMotion and this will let the tuple go to the correct segment. After fixing this, I am still curious about the execution of this plan. What does the process of the last element of the receiving slice do so that it works well?

This PR Adjust GANG size according to numsegments #6096 introduces dynamic gang when starting the distribution executor. So the receive slice (the slice which executes append and hash join) maps to a full gang, however the slice that scan t2 and then send out via motion maps to a gang that only exists on seg0 and seg1.

Let’s go to the motion receiving code of append slice to find out the secret. Go to the code that initialize interconnect then we will find the answer.

In either the function SetupUDPIFCInterconnect_Internal or the function SetupTCPInterconnect, we find that the connection pair is the cartesian product of the receive and send slices:

        /* now we'll do some setup for each of our Receiving Motion Nodes. */
        foreach(cell, mySlice->children)
        {
                int                     totalNumProcs;
                int                     childId = lfirst_int(cell);

#ifdef AMS_VERBOSE_LOGGING
                elog(DEBUG5, "Setting up RECEIVING motion node %d", childId);
#endif

                aSlice = &interconnect_context->sliceTable->slices[childId];

                /*
                 * If we're using directed-dispatch we have dummy primary-process
                 * entries, so we count the entries.
                 */
                totalNumProcs = list_length(aSlice->primaryProcesses);
                for (i = 0; i < totalNumProcs; i++)
                {
                        CdbProcess *cdbProc;

                        cdbProc = list_nth(aSlice->primaryProcesses, i);
                        if (cdbProc)
                                expectedTotalIncoming++;
                }

                (void) createChunkTransportState(interconnect_context, aSlice, mySlice, totalNumProcs);
        }

This is key to the correctness since this means that every element in the sending slice will broadcast the EOS to every element in the receiving slice. So the whole distributed plan can be finished.

For the plan in the first section of this blog, it has another reason to finish: since on seg2, t1 and t2 contain no data. So in the execution of hash join of the append slice in seg2, the hash table built from t1 is empty and the hash join will quickly finish (https://github.com/greenplum-db/gpdb/blob/82dd4168c707956349082e40364976e5ce7e2136/src/backend/executor/nodeHashjoin.c#L236).

To see an example of interconnect EOS finishing, the idea is to change the join qual to get a double redistribute motion plan. Please hack and debug via GDB to fully understand this yourself.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.