static bool CheckTargetNode()

in src/backend/executor/nodeHashjoin.c [169:727]


static bool CheckTargetNode(PlanState *node,
							AttrNumber attno,
							AttrNumber *lattno);
static List *FindTargetNodes(HashJoinState *hjstate,
							 AttrNumber attno,
							 AttrNumber *lattno);
static AttrFilter *CreateAttrFilter(PlanState *target,
									AttrNumber lattno,
									AttrNumber rattno,
									double plan_rows);
extern bool Test_print_prefetch_joinqual;


/* ----------------------------------------------------------------
 *		ExecHashJoinImpl
 *
 *		This function implements the Hybrid Hashjoin algorithm.  It is marked
 *		with an always-inline attribute so that ExecHashJoin() and
 *		ExecParallelHashJoin() can inline it.  Compilers that respect the
 *		attribute should create versions specialized for parallel == true and
 *		parallel == false with unnecessary branches removed.
 *
 *		Note: the relation we build hash table on is the "inner"
 *			  the other one is "outer".
 * ----------------------------------------------------------------
 */
static pg_attribute_always_inline TupleTableSlot *
ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
	HashJoinState *node = castNode(HashJoinState, pstate);
	PlanState  *outerNode;
	HashState  *hashNode;
	ExprState  *joinqual;
	ExprState  *otherqual;
	ExprContext *econtext;
	HashJoinTable hashtable;
	TupleTableSlot *outerTupleSlot;
	uint32		hashvalue;
	int			batchno;
	ParallelHashJoinState *parallel_state;
	EState	   *estate;

	/*
	 * get information from HashJoin node
	 */
	estate = node->js.ps.state;
	joinqual = node->js.joinqual;
	otherqual = node->js.ps.qual;
	hashNode = (HashState *) innerPlanState(node);
	outerNode = outerPlanState(node);
	hashtable = node->hj_HashTable;
	econtext = node->js.ps.ps_ExprContext;
	parallel_state = hashNode->parallel_state;
	/* CBDB_PARALLEL_FIXME: When parallel is true and parallel_state is NULL */
	parallel = parallel && (parallel_state != NULL);

	/*
	 * Reset per-tuple memory context to free any expression evaluation
	 * storage allocated in the previous tuple cycle.
	 */
	ResetExprContext(econtext);

	/*
	 * Executor try to squelch nodes in it‘s subtree after a node returning a NULL tuple.
	 * If the chgParam is not null, squelching is not safe. 
	 * If outer node gets empty tuple, squelching the outer node is too early.
	 * To fix that, we should add delayEagerFree logic to Limit node, 
	 * to not call ExecSquelchNode() when the node might get rescanned later.
	 */
	if (outerNode->chgParam != NULL)
		node->delayEagerFree = true;
	/*
	 * run the hash join state machine
	 */
	for (;;)
	{
		/* We must never use an eagerly released hash table */
		Assert(hashtable == NULL || !hashtable->eagerlyReleased);
		/*
		 * It's possible to iterate this loop many times before returning a
		 * tuple, in some pathological cases such as needing to move much of
		 * the current batch to a later batch.  So let's check for interrupts
		 * each time through.
		 */
		CHECK_FOR_INTERRUPTS();

		switch (node->hj_JoinState)
		{
			case HJ_BUILD_HASHTABLE:

				/*
				 * First time through: build hash table for inner relation.
				 */
				Assert(hashtable == NULL);

				/*
				 * MPP-4165: My fix for MPP-3300 was correct in that we avoided
				 * the *deadlock* but had very unexpected (and painful)
				 * performance characteristics: we basically de-pipeline and
				 * de-parallelize execution of any query which has motion below
				 * us.
				 *
				 * So now prefetch_inner is set (see createplan.c) if we have *any* motion
				 * below us. If we don't have any motion, it doesn't matter.
				 *
				 * See motion_sanity_walker() for details on how a deadlock may occur.
				 */
				if (!node->prefetch_inner)
				{
					/*
					 * If the outer relation is completely empty, and it's not
					 * right/full join, we can quit without building the hash
					 * table.  However, for an inner join it is only a win to
					 * check this when the outer relation's startup cost is less
					 * than the projected cost of building the hash table.
					 * Otherwise it's best to build the hash table first and see
					 * if the inner relation is empty.  (When it's a left join, we
					 * should always make this check, since we aren't going to be
					 * able to skip the join on the strength of an empty inner
					 * relation anyway.)
					 *
					 * If we are rescanning the join, we make use of information
					 * gained on the previous scan: don't bother to try the
					 * prefetch if the previous scan found the outer relation
					 * nonempty. This is not 100% reliable since with new
					 * parameters the outer relation might yield different
					 * results, but it's a good heuristic.
					 *
					 * The only way to make the check is to try to fetch a tuple
					 * from the outer plan node.  If we succeed, we have to stash
					 * it away for later consumption by ExecHashJoinOuterGetTuple.
					 */
					if (HJ_FILL_INNER(node))
					{
						/* no chance to not build the hash table */
						node->hj_FirstOuterTupleSlot = NULL;
					}
					else if (parallel)
					{
						/*
						 * The empty-outer optimization is not implemented for
						 * shared hash tables, because no one participant can
						 * determine that there are no outer tuples, and it's not
						 * yet clear that it's worth the synchronization overhead
						 * of reaching consensus to figure that out.  So we have
						 * to build the hash table.
						 */
						node->hj_FirstOuterTupleSlot = NULL;
					}
					else if (HJ_FILL_OUTER(node) ||
						 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
						  !node->hj_OuterNotEmpty))
					{
						node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
						if (TupIsNull(node->hj_FirstOuterTupleSlot))
						{
							node->hj_OuterNotEmpty = false;
							return NULL;
						}
						else
							node->hj_OuterNotEmpty = true;
					}
					else
						node->hj_FirstOuterTupleSlot = NULL;
				}
				else
					node->hj_FirstOuterTupleSlot = NULL;

				/*
				 * Create the hash table.  If using Parallel Hash, then
				 * whoever gets here first will create the hash table and any
				 * later arrivals will merely attach to it.
				 */
				hashtable = ExecHashTableCreate(hashNode,
												node,
												node->hj_HashOperators,
												node->hj_Collations,
				/*
				 * hashNode->hs_keepnull is required to support using IS NOT DISTINCT FROM as hash condition
				 * For example, in ORCA, `explain SELECT t2.a FROM t2 INTERSECT (SELECT t1.a FROM t1);`
				 */
												HJ_FILL_INNER(node) || hashNode->hs_keepnull,
												PlanStateOperatorMemKB((PlanState *) hashNode));
				node->hj_HashTable = hashtable;

				/*
				 * CDB: Offer extra info for EXPLAIN ANALYZE.
				 */
				if ((estate->es_instrument & INSTRUMENT_CDB))
					ExecHashTableExplainInit(hashNode, node, hashtable);

				/*
				 * Only if doing a LASJ_NOTIN join, we want to quit as soon as we find
				 * a NULL key on the inner side
				 */
				hashNode->hs_quit_if_hashkeys_null = (node->js.jointype == JOIN_LASJ_NOTIN);

				/*
				 * Execute the Hash node, to build the hash table.  If using
				 * Parallel Hash, then we'll try to help hashing unless we
				 * arrived too late.
				 */
				hashNode->hashtable = hashtable;
				(void) MultiExecProcNode((PlanState *) hashNode);

#ifdef HJDEBUG
				elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples by executing subplan for batch 0", hashtable->totalTuples);
#endif

				/**
				 * If LASJ_NOTIN and a null was found on the inner side, then clean out.
				 */
				if (node->js.jointype == JOIN_LASJ_NOTIN && hashNode->hs_hashkeys_null)
					return NULL;

				/*
				 * If the inner relation is completely empty, and we're not
				 * doing a left outer join, we can quit without scanning the
				 * outer relation.
				 */
				if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
					return NULL;

				/*
				 * Prefetch JoinQual or NonJoinQual to prevent motion hazard.
				 *
				 * See ExecPrefetchQual() for details.
				 */
				if (node->prefetch_joinqual)
				{
					ExecPrefetchQual(&node->js, true);
					node->prefetch_joinqual = false;
				}

				if (node->prefetch_qual)
				{
					ExecPrefetchQual(&node->js, false);
					node->prefetch_qual = false;
				}

				/*
				 * We just scanned the entire inner side and built the hashtable
				 * (and its overflow batches). Check here and remember if the inner
				 * side is empty.
				 */
				node->hj_InnerEmpty = (hashtable->totalTuples == 0);

				/*
				 * need to remember whether nbatch has increased since we
				 * began scanning the outer relation
				 */
				hashtable->nbatch_outstart = hashtable->nbatch;

				/*
				 * Reset OuterNotEmpty for scan.  (It's OK if we fetched a
				 * tuple above, because ExecHashJoinOuterGetTuple will
				 * immediately set it again.)
				 */
				node->hj_OuterNotEmpty = false;

				if (parallel)
				{
					Barrier    *build_barrier;
					Barrier    *outer_motion_barrier = &parallel_state->outer_motion_barrier;

					build_barrier = &parallel_state->build_barrier;
					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
						   BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
					if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
					{
						/*
						 * If multi-batch, we need to hash the outer relation
						 * up front.
						 */
						if (hashtable->nbatch > 1)
							ExecParallelHashJoinPartitionOuter(node);
						/*
						 * CBDB_PARALLEL
						 * If outer side has motion behind, we need to wait for all siblings
						 * before next phase.
						 */
						if (((HashJoin *)node->js.ps.plan)->outer_motionhazard)
							BarrierArriveAndWait(outer_motion_barrier, WAIT_EVENT_PARALLEL_FINISH);

						BarrierArriveAndWait(build_barrier,
											 WAIT_EVENT_HASH_BUILD_HASH_OUTER);
					}
					Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);

					/* Each backend should now select a batch to work on. */
					hashtable->curbatch = -1;
					node->hj_JoinState = HJ_NEED_NEW_BATCH;

					continue;
				}
				else
					node->hj_JoinState = HJ_NEED_NEW_OUTER;

				/* FALL THRU */

			case HJ_NEED_NEW_OUTER:

				/* For a rescannable hash table we might need to reload batch 0 during rescan */
				if (hashtable->curbatch == -1 && !hashtable->first_pass)
				{
					hashtable->curbatch = 0;
					if (!ExecHashJoinReloadHashTable(node))
						return NULL;
				}

				/*
				 * We don't have an outer tuple, try to get the next one
				 */
				if (parallel)
					outerTupleSlot =
						ExecParallelHashJoinOuterGetTuple(outerNode, node,
														  &hashvalue);
				else
					outerTupleSlot =
						ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);

				if (TupIsNull(outerTupleSlot))
				{
					/* end of batch, or maybe whole join */
					if (HJ_FILL_INNER(node))
					{
						/* set up to scan for unmatched inner tuples */
						ExecPrepHashTableForUnmatched(node);
						node->hj_JoinState = HJ_FILL_INNER_TUPLES;
					}
					else
						node->hj_JoinState = HJ_NEED_NEW_BATCH;
					continue;
				}

				econtext->ecxt_outertuple = outerTupleSlot;
				node->hj_MatchedOuter = false;

				/*
				 * Find the corresponding bucket for this tuple in the main
				 * hash table or skew hash table.
				 */
				node->hj_CurHashValue = hashvalue;
				ExecHashGetBucketAndBatch(hashtable, hashvalue,
										  &node->hj_CurBucketNo, &batchno);
				node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
																 hashvalue);
				node->hj_CurTuple = NULL;

				/*
				 * The tuple might not belong to the current batch (where
				 * "current batch" includes the skew buckets if any).
				 */
				if (batchno != hashtable->curbatch &&
					node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
				{
					bool		shouldFree;
					MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
																	  &shouldFree);

					/*
					 * Need to postpone this outer tuple to a later batch.
					 * Save it in the corresponding outer-batch file.
					 */
					Assert(parallel_state == NULL);
					Assert(batchno > hashtable->curbatch);
					ExecHashJoinSaveTuple(&node->js.ps, mintuple,
										  hashvalue,
										  hashtable,
										  &hashtable->outerBatchFile[batchno],
										  hashtable->bfCxt);

					if (shouldFree)
						heap_free_minimal_tuple(mintuple);

					/* Loop around, staying in HJ_NEED_NEW_OUTER state */
					continue;
				}

				/* OK, let's scan the bucket for matches */
				node->hj_JoinState = HJ_SCAN_BUCKET;

				/* FALL THRU */

			case HJ_SCAN_BUCKET:

				/*
				 * OPT-3325: Handle NULLs in the outer side of LASJ_NOTIN
				 *  - if tuple is NULL and inner is not empty, drop outer tuple
				 *  - if tuple is NULL and inner is empty, keep going as we'll
				 *    find no match for this tuple in the inner side
				 */
				if (node->js.jointype == JOIN_LASJ_NOTIN &&
					!node->hj_InnerEmpty &&
					isJoinExprNull(node->hj_OuterHashKeys,econtext))
				{
					node->hj_MatchedOuter = true;
					node->hj_JoinState = HJ_NEED_NEW_OUTER;
					continue;
				}

				/*
				 * Scan the selected hash bucket for matches to current outer
				 */
				if (parallel)
				{
					if (!ExecParallelScanHashBucket(hashNode, node, econtext))
					{
						/* out of matches; check for possible outer-join fill */
						node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
						continue;
					}
				}
				else
				{
					if (!ExecScanHashBucket(hashNode, node, econtext))
					{
						/* out of matches; check for possible outer-join fill */
						node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
						continue;
					}
				}

				/*
				 * We've got a match, but still need to test non-hashed quals.
				 * ExecScanHashBucket already set up all the state needed to
				 * call ExecQual.
				 *
				 * If we pass the qual, then save state for next call and have
				 * ExecProject form the projection, store it in the tuple
				 * table, and return the slot.
				 *
				 * Only the joinquals determine tuple match status, but all
				 * quals must pass to actually return the tuple.
				 */
				if (joinqual == NULL || ExecQual(joinqual, econtext))
				{
					node->hj_MatchedOuter = true;

					if (parallel)
					{
						/*
						 * Full/right outer joins are currently not supported
						 * for parallel joins, so we don't need to set the
						 * match bit.  Experiments show that it's worth
						 * avoiding the shared memory traffic on large
						 * systems.
						 */
						Assert(!HJ_FILL_INNER(node));
					}
					else
					{
						/*
						 * This is really only needed if HJ_FILL_INNER(node),
						 * but we'll avoid the branch and just set it always.
						 */
						HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
					}

					/* In an antijoin, we never return a matched tuple */
					if (node->js.jointype == JOIN_ANTI ||
						node->js.jointype == JOIN_LASJ_NOTIN)
					{
						node->hj_JoinState = HJ_NEED_NEW_OUTER;
						continue;
					}

					/*
					 * If we only need to join to the first matching inner
					 * tuple, then consider returning this one, but after that
					 * continue with next outer tuple.
					 */
					if (node->js.single_match)
						node->hj_JoinState = HJ_NEED_NEW_OUTER;

					if (otherqual == NULL || ExecQual(otherqual, econtext))
						return ExecProject(node->js.ps.ps_ProjInfo);
					else
						InstrCountFiltered2(node, 1);
				}
				else
					InstrCountFiltered1(node, 1);
				break;

			case HJ_FILL_OUTER_TUPLE:

				/*
				 * The current outer tuple has run out of matches, so check
				 * whether to emit a dummy outer-join tuple.  Whether we emit
				 * one or not, the next state is NEED_NEW_OUTER.
				 */
				node->hj_JoinState = HJ_NEED_NEW_OUTER;

				if (!node->hj_MatchedOuter &&
					HJ_FILL_OUTER(node))
				{
					/*
					 * Generate a fake join tuple with nulls for the inner
					 * tuple, and return it if it passes the non-join quals.
					 */
					econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;

					if (otherqual == NULL || ExecQual(otherqual, econtext))
						return ExecProject(node->js.ps.ps_ProjInfo);
					else
						InstrCountFiltered2(node, 1);
				}
				break;

			case HJ_FILL_INNER_TUPLES:

				/*
				 * We have finished a batch, but we are doing right/full join,
				 * so any unmatched inner tuples in the hashtable have to be
				 * emitted before we continue to the next batch.
				 */
				if (!ExecScanHashTableForUnmatched(node, econtext))
				{
					/* no more unmatched tuples */
					node->hj_JoinState = HJ_NEED_NEW_BATCH;
					continue;
				}

				/*
				 * Generate a fake join tuple with nulls for the outer tuple,
				 * and return it if it passes the non-join quals.
				 */
				econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;

				if (otherqual == NULL || ExecQual(otherqual, econtext))
					return ExecProject(node->js.ps.ps_ProjInfo);
				else
					InstrCountFiltered2(node, 1);
				break;

			case HJ_NEED_NEW_BATCH:

				/*
				 * Try to advance to next batch.  Done if there are no more.
				 */
				if (parallel)
				{
					if (!ExecParallelHashJoinNewBatch(node))
						return NULL;	/* end of parallel-aware join */
				}
				else
				{
					if (!ExecHashJoinNewBatch(node))
						return NULL;	/* end of parallel-oblivious join */
				}
				node->hj_JoinState = HJ_NEED_NEW_OUTER;
				break;

			default:
				elog(ERROR, "unrecognized hashjoin state: %d",
					 (int) node->hj_JoinState);
		}
	}
}