in sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala [782:1019]
private def codegenFullOuter(ctx: CodegenContext): String = {
// Inline mutable state since not many join operations in a task.
// Create class member for input iterator from both sides.
val leftInput = ctx.addMutableState("scala.collection.Iterator", "leftInput",
v => s"$v = inputs[0];", forceInline = true)
val rightInput = ctx.addMutableState("scala.collection.Iterator", "rightInput",
v => s"$v = inputs[1];", forceInline = true)
// Create class member for next input row from both sides.
val leftInputRow = ctx.addMutableState("InternalRow", "leftInputRow", forceInline = true)
val rightInputRow = ctx.addMutableState("InternalRow", "rightInputRow", forceInline = true)
// Create variables for join keys from both sides.
val leftKeyVars = createJoinKey(ctx, leftInputRow, leftKeys, left.output)
val leftAnyNull = leftKeyVars.map(_.isNull).mkString(" || ")
val rightKeyVars = createJoinKey(ctx, rightInputRow, rightKeys, right.output)
val rightAnyNull = rightKeyVars.map(_.isNull).mkString(" || ")
val matchedKeyVars = copyKeys(ctx, leftKeyVars)
val leftMatchedKeyVars = createJoinKey(ctx, leftInputRow, leftKeys, left.output)
val rightMatchedKeyVars = createJoinKey(ctx, rightInputRow, rightKeys, right.output)
// Create class member for next output row from both sides.
val leftOutputRow = ctx.addMutableState("InternalRow", "leftOutputRow", forceInline = true)
val rightOutputRow = ctx.addMutableState("InternalRow", "rightOutputRow", forceInline = true)
// Create class member for buffers of rows with same join keys from both sides.
val bufferClsName = "java.util.ArrayList<InternalRow>"
val leftBuffer = ctx.addMutableState(bufferClsName, "leftBuffer",
v => s"$v = new $bufferClsName();", forceInline = true)
val rightBuffer = ctx.addMutableState(bufferClsName, "rightBuffer",
v => s"$v = new $bufferClsName();", forceInline = true)
val matchedClsName = classOf[BitSet].getName
val leftMatched = ctx.addMutableState(matchedClsName, "leftMatched",
v => s"$v = new $matchedClsName(1);", forceInline = true)
val rightMatched = ctx.addMutableState(matchedClsName, "rightMatched",
v => s"$v = new $matchedClsName(1);", forceInline = true)
val leftIndex = ctx.freshName("leftIndex")
val rightIndex = ctx.freshName("rightIndex")
// Generate code for join condition
val leftResultVars = genOneSideJoinVars(
ctx, leftOutputRow, left, setDefaultValue = true)
val rightResultVars = genOneSideJoinVars(
ctx, rightOutputRow, right, setDefaultValue = true)
val resultVars = leftResultVars ++ rightResultVars
val (_, conditionCheck, _) =
getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))
// Generate code for result output in separate function, as we need to output result from
// multiple places in join code.
val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
ctx.addNewFunction(consumeFullOuterJoinRow,
s"""
|private void $consumeFullOuterJoinRow() throws java.io.IOException {
| ${metricTerm(ctx, "numOutputRows")}.add(1);
| ${consume(ctx, resultVars)}
|}
""".stripMargin)
// Handle the case when input row has no match.
val outputLeftNoMatch =
s"""
|$leftOutputRow = $leftInputRow;
|$rightOutputRow = null;
|$leftInputRow = null;
|$consumeFullOuterJoinRow();
""".stripMargin
val outputRightNoMatch =
s"""
|$rightOutputRow = $rightInputRow;
|$leftOutputRow = null;
|$rightInputRow = null;
|$consumeFullOuterJoinRow();
""".stripMargin
// Generate a function to scan both sides to find rows with matched join keys.
// The matched rows from both sides are copied in buffers separately. This function assumes
// either non-empty `leftIter` and `rightIter`, or non-null `leftInputRow` and `rightInputRow`.
//
// The function has the following steps:
// - Step 1: Find the next `leftInputRow` and `rightInputRow` with non-null join keys.
// Output row with null join keys (`outputLeftNoMatch` and `outputRightNoMatch`).
//
// - Step 2: Compare and find next same join keys from between `leftInputRow` and
// `rightInputRow`.
// Output row with smaller join keys (`outputLeftNoMatch` and `outputRightNoMatch`).
//
// - Step 3: Buffer rows with same join keys from both sides into `leftBuffer` and
// `rightBuffer`. Reset bit sets for both buffers accordingly (`leftMatched` and
// `rightMatched`).
val findNextJoinRowsFuncName = ctx.freshName("findNextJoinRows")
ctx.addNewFunction(findNextJoinRowsFuncName,
s"""
|private void $findNextJoinRowsFuncName(
| scala.collection.Iterator leftIter,
| scala.collection.Iterator rightIter) throws java.io.IOException {
| int comp = 0;
| $leftBuffer.clear();
| $rightBuffer.clear();
|
| if ($leftInputRow == null) {
| $leftInputRow = (InternalRow) leftIter.next();
| }
| if ($rightInputRow == null) {
| $rightInputRow = (InternalRow) rightIter.next();
| }
|
| ${leftKeyVars.map(_.code).mkString("\n")}
| if ($leftAnyNull) {
| // The left row join key is null, join it with null row
| $outputLeftNoMatch
| return;
| }
|
| ${rightKeyVars.map(_.code).mkString("\n")}
| if ($rightAnyNull) {
| // The right row join key is null, join it with null row
| $outputRightNoMatch
| return;
| }
|
| ${genComparison(ctx, leftKeyVars, rightKeyVars)}
| if (comp < 0) {
| // The left row join key is smaller, join it with null row
| $outputLeftNoMatch
| return;
| } else if (comp > 0) {
| // The right row join key is smaller, join it with null row
| $outputRightNoMatch
| return;
| }
|
| ${matchedKeyVars.map(_.code).mkString("\n")}
| $leftBuffer.add($leftInputRow.copy());
| $rightBuffer.add($rightInputRow.copy());
| $leftInputRow = null;
| $rightInputRow = null;
|
| // Buffer rows from both sides with same join key
| while (leftIter.hasNext()) {
| $leftInputRow = (InternalRow) leftIter.next();
| ${leftMatchedKeyVars.map(_.code).mkString("\n")}
| ${genComparison(ctx, leftMatchedKeyVars, matchedKeyVars)}
| if (comp == 0) {
|
| $leftBuffer.add($leftInputRow.copy());
| $leftInputRow = null;
| } else {
| break;
| }
| }
| while (rightIter.hasNext()) {
| $rightInputRow = (InternalRow) rightIter.next();
| ${rightMatchedKeyVars.map(_.code).mkString("\n")}
| ${genComparison(ctx, rightMatchedKeyVars, matchedKeyVars)}
| if (comp == 0) {
| $rightBuffer.add($rightInputRow.copy());
| $rightInputRow = null;
| } else {
| break;
| }
| }
|
| // Reset bit sets of buffers accordingly
| if ($leftBuffer.size() <= $leftMatched.capacity()) {
| $leftMatched.clearUntil($leftBuffer.size());
| } else {
| $leftMatched = new $matchedClsName($leftBuffer.size());
| }
| if ($rightBuffer.size() <= $rightMatched.capacity()) {
| $rightMatched.clearUntil($rightBuffer.size());
| } else {
| $rightMatched = new $matchedClsName($rightBuffer.size());
| }
|}
""".stripMargin)
// Scan the left and right buffers to find all matched rows.
val matchRowsInBuffer =
s"""
|int $leftIndex;
|int $rightIndex;
|
|for ($leftIndex = 0; $leftIndex < $leftBuffer.size(); $leftIndex++) {
| $leftOutputRow = (InternalRow) $leftBuffer.get($leftIndex);
| for ($rightIndex = 0; $rightIndex < $rightBuffer.size(); $rightIndex++) {
| $rightOutputRow = (InternalRow) $rightBuffer.get($rightIndex);
| $conditionCheck {
| $consumeFullOuterJoinRow();
| $leftMatched.set($leftIndex);
| $rightMatched.set($rightIndex);
| }
| }
|
| if (!$leftMatched.get($leftIndex)) {
|
| $rightOutputRow = null;
| $consumeFullOuterJoinRow();
| }
|}
|
|$leftOutputRow = null;
|for ($rightIndex = 0; $rightIndex < $rightBuffer.size(); $rightIndex++) {
| if (!$rightMatched.get($rightIndex)) {
| // The right row has never matched any left row, join it with null row
| $rightOutputRow = (InternalRow) $rightBuffer.get($rightIndex);
| $consumeFullOuterJoinRow();
| }
|}
""".stripMargin
s"""
|while (($leftInputRow != null || $leftInput.hasNext()) &&
| ($rightInputRow != null || $rightInput.hasNext())) {
| $findNextJoinRowsFuncName($leftInput, $rightInput);
| $matchRowsInBuffer
| if (shouldStop()) return;
|}
|
|// The right iterator has no more rows, join left row with null
|while ($leftInputRow != null || $leftInput.hasNext()) {
| if ($leftInputRow == null) {
| $leftInputRow = (InternalRow) $leftInput.next();
| }
| $outputLeftNoMatch
| if (shouldStop()) return;
|}
|
|// The left iterator has no more rows, join right row with null
|while ($rightInputRow != null || $rightInput.hasNext()) {
| if ($rightInputRow == null) {
| $rightInputRow = (InternalRow) $rightInput.next();
| }
| $outputRightNoMatch
| if (shouldStop()) return;
|}
""".stripMargin
}