in src/backend/executor/nodeHashjoin.c [169:727]
static bool CheckTargetNode(PlanState *node,
AttrNumber attno,
AttrNumber *lattno);
static List *FindTargetNodes(HashJoinState *hjstate,
AttrNumber attno,
AttrNumber *lattno);
static AttrFilter *CreateAttrFilter(PlanState *target,
AttrNumber lattno,
AttrNumber rattno,
double plan_rows);
extern bool Test_print_prefetch_joinqual;
/* ----------------------------------------------------------------
* ExecHashJoinImpl
*
* This function implements the Hybrid Hashjoin algorithm. It is marked
* with an always-inline attribute so that ExecHashJoin() and
* ExecParallelHashJoin() can inline it. Compilers that respect the
* attribute should create versions specialized for parallel == true and
* parallel == false with unnecessary branches removed.
*
* Note: the relation we build hash table on is the "inner"
* the other one is "outer".
* ----------------------------------------------------------------
*/
static pg_attribute_always_inline TupleTableSlot *
ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
HashJoinState *node = castNode(HashJoinState, pstate);
PlanState *outerNode;
HashState *hashNode;
ExprState *joinqual;
ExprState *otherqual;
ExprContext *econtext;
HashJoinTable hashtable;
TupleTableSlot *outerTupleSlot;
uint32 hashvalue;
int batchno;
ParallelHashJoinState *parallel_state;
EState *estate;
/*
* get information from HashJoin node
*/
estate = node->js.ps.state;
joinqual = node->js.joinqual;
otherqual = node->js.ps.qual;
hashNode = (HashState *) innerPlanState(node);
outerNode = outerPlanState(node);
hashtable = node->hj_HashTable;
econtext = node->js.ps.ps_ExprContext;
parallel_state = hashNode->parallel_state;
/* CBDB_PARALLEL_FIXME: When parallel is true and parallel_state is NULL */
parallel = parallel && (parallel_state != NULL);
/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle.
*/
ResetExprContext(econtext);
/*
* Executor try to squelch nodes in it‘s subtree after a node returning a NULL tuple.
* If the chgParam is not null, squelching is not safe.
* If outer node gets empty tuple, squelching the outer node is too early.
* To fix that, we should add delayEagerFree logic to Limit node,
* to not call ExecSquelchNode() when the node might get rescanned later.
*/
if (outerNode->chgParam != NULL)
node->delayEagerFree = true;
/*
* run the hash join state machine
*/
for (;;)
{
/* We must never use an eagerly released hash table */
Assert(hashtable == NULL || !hashtable->eagerlyReleased);
/*
* It's possible to iterate this loop many times before returning a
* tuple, in some pathological cases such as needing to move much of
* the current batch to a later batch. So let's check for interrupts
* each time through.
*/
CHECK_FOR_INTERRUPTS();
switch (node->hj_JoinState)
{
case HJ_BUILD_HASHTABLE:
/*
* First time through: build hash table for inner relation.
*/
Assert(hashtable == NULL);
/*
* MPP-4165: My fix for MPP-3300 was correct in that we avoided
* the *deadlock* but had very unexpected (and painful)
* performance characteristics: we basically de-pipeline and
* de-parallelize execution of any query which has motion below
* us.
*
* So now prefetch_inner is set (see createplan.c) if we have *any* motion
* below us. If we don't have any motion, it doesn't matter.
*
* See motion_sanity_walker() for details on how a deadlock may occur.
*/
if (!node->prefetch_inner)
{
/*
* If the outer relation is completely empty, and it's not
* right/full join, we can quit without building the hash
* table. However, for an inner join it is only a win to
* check this when the outer relation's startup cost is less
* than the projected cost of building the hash table.
* Otherwise it's best to build the hash table first and see
* if the inner relation is empty. (When it's a left join, we
* should always make this check, since we aren't going to be
* able to skip the join on the strength of an empty inner
* relation anyway.)
*
* If we are rescanning the join, we make use of information
* gained on the previous scan: don't bother to try the
* prefetch if the previous scan found the outer relation
* nonempty. This is not 100% reliable since with new
* parameters the outer relation might yield different
* results, but it's a good heuristic.
*
* The only way to make the check is to try to fetch a tuple
* from the outer plan node. If we succeed, we have to stash
* it away for later consumption by ExecHashJoinOuterGetTuple.
*/
if (HJ_FILL_INNER(node))
{
/* no chance to not build the hash table */
node->hj_FirstOuterTupleSlot = NULL;
}
else if (parallel)
{
/*
* The empty-outer optimization is not implemented for
* shared hash tables, because no one participant can
* determine that there are no outer tuples, and it's not
* yet clear that it's worth the synchronization overhead
* of reaching consensus to figure that out. So we have
* to build the hash table.
*/
node->hj_FirstOuterTupleSlot = NULL;
}
else if (HJ_FILL_OUTER(node) ||
(outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
!node->hj_OuterNotEmpty))
{
node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
if (TupIsNull(node->hj_FirstOuterTupleSlot))
{
node->hj_OuterNotEmpty = false;
return NULL;
}
else
node->hj_OuterNotEmpty = true;
}
else
node->hj_FirstOuterTupleSlot = NULL;
}
else
node->hj_FirstOuterTupleSlot = NULL;
/*
* Create the hash table. If using Parallel Hash, then
* whoever gets here first will create the hash table and any
* later arrivals will merely attach to it.
*/
hashtable = ExecHashTableCreate(hashNode,
node,
node->hj_HashOperators,
node->hj_Collations,
/*
* hashNode->hs_keepnull is required to support using IS NOT DISTINCT FROM as hash condition
* For example, in ORCA, `explain SELECT t2.a FROM t2 INTERSECT (SELECT t1.a FROM t1);`
*/
HJ_FILL_INNER(node) || hashNode->hs_keepnull,
PlanStateOperatorMemKB((PlanState *) hashNode));
node->hj_HashTable = hashtable;
/*
* CDB: Offer extra info for EXPLAIN ANALYZE.
*/
if ((estate->es_instrument & INSTRUMENT_CDB))
ExecHashTableExplainInit(hashNode, node, hashtable);
/*
* Only if doing a LASJ_NOTIN join, we want to quit as soon as we find
* a NULL key on the inner side
*/
hashNode->hs_quit_if_hashkeys_null = (node->js.jointype == JOIN_LASJ_NOTIN);
/*
* Execute the Hash node, to build the hash table. If using
* Parallel Hash, then we'll try to help hashing unless we
* arrived too late.
*/
hashNode->hashtable = hashtable;
(void) MultiExecProcNode((PlanState *) hashNode);
#ifdef HJDEBUG
elog(gp_workfile_caching_loglevel, "HashJoin built table with %.1f tuples by executing subplan for batch 0", hashtable->totalTuples);
#endif
/**
* If LASJ_NOTIN and a null was found on the inner side, then clean out.
*/
if (node->js.jointype == JOIN_LASJ_NOTIN && hashNode->hs_hashkeys_null)
return NULL;
/*
* If the inner relation is completely empty, and we're not
* doing a left outer join, we can quit without scanning the
* outer relation.
*/
if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
return NULL;
/*
* Prefetch JoinQual or NonJoinQual to prevent motion hazard.
*
* See ExecPrefetchQual() for details.
*/
if (node->prefetch_joinqual)
{
ExecPrefetchQual(&node->js, true);
node->prefetch_joinqual = false;
}
if (node->prefetch_qual)
{
ExecPrefetchQual(&node->js, false);
node->prefetch_qual = false;
}
/*
* We just scanned the entire inner side and built the hashtable
* (and its overflow batches). Check here and remember if the inner
* side is empty.
*/
node->hj_InnerEmpty = (hashtable->totalTuples == 0);
/*
* need to remember whether nbatch has increased since we
* began scanning the outer relation
*/
hashtable->nbatch_outstart = hashtable->nbatch;
/*
* Reset OuterNotEmpty for scan. (It's OK if we fetched a
* tuple above, because ExecHashJoinOuterGetTuple will
* immediately set it again.)
*/
node->hj_OuterNotEmpty = false;
if (parallel)
{
Barrier *build_barrier;
Barrier *outer_motion_barrier = ¶llel_state->outer_motion_barrier;
build_barrier = ¶llel_state->build_barrier;
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
{
/*
* If multi-batch, we need to hash the outer relation
* up front.
*/
if (hashtable->nbatch > 1)
ExecParallelHashJoinPartitionOuter(node);
/*
* CBDB_PARALLEL
* If outer side has motion behind, we need to wait for all siblings
* before next phase.
*/
if (((HashJoin *)node->js.ps.plan)->outer_motionhazard)
BarrierArriveAndWait(outer_motion_barrier, WAIT_EVENT_PARALLEL_FINISH);
BarrierArriveAndWait(build_barrier,
WAIT_EVENT_HASH_BUILD_HASH_OUTER);
}
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
/* Each backend should now select a batch to work on. */
hashtable->curbatch = -1;
node->hj_JoinState = HJ_NEED_NEW_BATCH;
continue;
}
else
node->hj_JoinState = HJ_NEED_NEW_OUTER;
/* FALL THRU */
case HJ_NEED_NEW_OUTER:
/* For a rescannable hash table we might need to reload batch 0 during rescan */
if (hashtable->curbatch == -1 && !hashtable->first_pass)
{
hashtable->curbatch = 0;
if (!ExecHashJoinReloadHashTable(node))
return NULL;
}
/*
* We don't have an outer tuple, try to get the next one
*/
if (parallel)
outerTupleSlot =
ExecParallelHashJoinOuterGetTuple(outerNode, node,
&hashvalue);
else
outerTupleSlot =
ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
if (TupIsNull(outerTupleSlot))
{
/* end of batch, or maybe whole join */
if (HJ_FILL_INNER(node))
{
/* set up to scan for unmatched inner tuples */
ExecPrepHashTableForUnmatched(node);
node->hj_JoinState = HJ_FILL_INNER_TUPLES;
}
else
node->hj_JoinState = HJ_NEED_NEW_BATCH;
continue;
}
econtext->ecxt_outertuple = outerTupleSlot;
node->hj_MatchedOuter = false;
/*
* Find the corresponding bucket for this tuple in the main
* hash table or skew hash table.
*/
node->hj_CurHashValue = hashvalue;
ExecHashGetBucketAndBatch(hashtable, hashvalue,
&node->hj_CurBucketNo, &batchno);
node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
hashvalue);
node->hj_CurTuple = NULL;
/*
* The tuple might not belong to the current batch (where
* "current batch" includes the skew buckets if any).
*/
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
{
bool shouldFree;
MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
&shouldFree);
/*
* Need to postpone this outer tuple to a later batch.
* Save it in the corresponding outer-batch file.
*/
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
ExecHashJoinSaveTuple(&node->js.ps, mintuple,
hashvalue,
hashtable,
&hashtable->outerBatchFile[batchno],
hashtable->bfCxt);
if (shouldFree)
heap_free_minimal_tuple(mintuple);
/* Loop around, staying in HJ_NEED_NEW_OUTER state */
continue;
}
/* OK, let's scan the bucket for matches */
node->hj_JoinState = HJ_SCAN_BUCKET;
/* FALL THRU */
case HJ_SCAN_BUCKET:
/*
* OPT-3325: Handle NULLs in the outer side of LASJ_NOTIN
* - if tuple is NULL and inner is not empty, drop outer tuple
* - if tuple is NULL and inner is empty, keep going as we'll
* find no match for this tuple in the inner side
*/
if (node->js.jointype == JOIN_LASJ_NOTIN &&
!node->hj_InnerEmpty &&
isJoinExprNull(node->hj_OuterHashKeys,econtext))
{
node->hj_MatchedOuter = true;
node->hj_JoinState = HJ_NEED_NEW_OUTER;
continue;
}
/*
* Scan the selected hash bucket for matches to current outer
*/
if (parallel)
{
if (!ExecParallelScanHashBucket(hashNode, node, econtext))
{
/* out of matches; check for possible outer-join fill */
node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
continue;
}
}
else
{
if (!ExecScanHashBucket(hashNode, node, econtext))
{
/* out of matches; check for possible outer-join fill */
node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
continue;
}
}
/*
* We've got a match, but still need to test non-hashed quals.
* ExecScanHashBucket already set up all the state needed to
* call ExecQual.
*
* If we pass the qual, then save state for next call and have
* ExecProject form the projection, store it in the tuple
* table, and return the slot.
*
* Only the joinquals determine tuple match status, but all
* quals must pass to actually return the tuple.
*/
if (joinqual == NULL || ExecQual(joinqual, econtext))
{
node->hj_MatchedOuter = true;
if (parallel)
{
/*
* Full/right outer joins are currently not supported
* for parallel joins, so we don't need to set the
* match bit. Experiments show that it's worth
* avoiding the shared memory traffic on large
* systems.
*/
Assert(!HJ_FILL_INNER(node));
}
else
{
/*
* This is really only needed if HJ_FILL_INNER(node),
* but we'll avoid the branch and just set it always.
*/
HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
}
/* In an antijoin, we never return a matched tuple */
if (node->js.jointype == JOIN_ANTI ||
node->js.jointype == JOIN_LASJ_NOTIN)
{
node->hj_JoinState = HJ_NEED_NEW_OUTER;
continue;
}
/*
* If we only need to join to the first matching inner
* tuple, then consider returning this one, but after that
* continue with next outer tuple.
*/
if (node->js.single_match)
node->hj_JoinState = HJ_NEED_NEW_OUTER;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);
else
InstrCountFiltered2(node, 1);
}
else
InstrCountFiltered1(node, 1);
break;
case HJ_FILL_OUTER_TUPLE:
/*
* The current outer tuple has run out of matches, so check
* whether to emit a dummy outer-join tuple. Whether we emit
* one or not, the next state is NEED_NEW_OUTER.
*/
node->hj_JoinState = HJ_NEED_NEW_OUTER;
if (!node->hj_MatchedOuter &&
HJ_FILL_OUTER(node))
{
/*
* Generate a fake join tuple with nulls for the inner
* tuple, and return it if it passes the non-join quals.
*/
econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);
else
InstrCountFiltered2(node, 1);
}
break;
case HJ_FILL_INNER_TUPLES:
/*
* We have finished a batch, but we are doing right/full join,
* so any unmatched inner tuples in the hashtable have to be
* emitted before we continue to the next batch.
*/
if (!ExecScanHashTableForUnmatched(node, econtext))
{
/* no more unmatched tuples */
node->hj_JoinState = HJ_NEED_NEW_BATCH;
continue;
}
/*
* Generate a fake join tuple with nulls for the outer tuple,
* and return it if it passes the non-join quals.
*/
econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
if (otherqual == NULL || ExecQual(otherqual, econtext))
return ExecProject(node->js.ps.ps_ProjInfo);
else
InstrCountFiltered2(node, 1);
break;
case HJ_NEED_NEW_BATCH:
/*
* Try to advance to next batch. Done if there are no more.
*/
if (parallel)
{
if (!ExecParallelHashJoinNewBatch(node))
return NULL; /* end of parallel-aware join */
}
else
{
if (!ExecHashJoinNewBatch(node))
return NULL; /* end of parallel-oblivious join */
}
node->hj_JoinState = HJ_NEED_NEW_OUTER;
break;
default:
elog(ERROR, "unrecognized hashjoin state: %d",
(int) node->hj_JoinState);
}
}
}