private def codegenFullOuter()

in sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala [782:1019]


  private def codegenFullOuter(ctx: CodegenContext): String = {
    // Inline mutable state since not many join operations in a task.
    // Create class member for input iterator from both sides.
    val leftInput = ctx.addMutableState("scala.collection.Iterator", "leftInput",
      v => s"$v = inputs[0];", forceInline = true)
    val rightInput = ctx.addMutableState("scala.collection.Iterator", "rightInput",
      v => s"$v = inputs[1];", forceInline = true)

    // Create class member for next input row from both sides.
    val leftInputRow = ctx.addMutableState("InternalRow", "leftInputRow", forceInline = true)
    val rightInputRow = ctx.addMutableState("InternalRow", "rightInputRow", forceInline = true)

    // Create variables for join keys from both sides.
    val leftKeyVars = createJoinKey(ctx, leftInputRow, leftKeys, left.output)
    val leftAnyNull = leftKeyVars.map(_.isNull).mkString(" || ")
    val rightKeyVars = createJoinKey(ctx, rightInputRow, rightKeys, right.output)
    val rightAnyNull = rightKeyVars.map(_.isNull).mkString(" || ")
    val matchedKeyVars = copyKeys(ctx, leftKeyVars)
    val leftMatchedKeyVars = createJoinKey(ctx, leftInputRow, leftKeys, left.output)
    val rightMatchedKeyVars = createJoinKey(ctx, rightInputRow, rightKeys, right.output)

    // Create class member for next output row from both sides.
    val leftOutputRow = ctx.addMutableState("InternalRow", "leftOutputRow", forceInline = true)
    val rightOutputRow = ctx.addMutableState("InternalRow", "rightOutputRow", forceInline = true)

    // Create class member for buffers of rows with same join keys from both sides.
    val bufferClsName = "java.util.ArrayList<InternalRow>"
    val leftBuffer = ctx.addMutableState(bufferClsName, "leftBuffer",
      v => s"$v = new $bufferClsName();", forceInline = true)
    val rightBuffer = ctx.addMutableState(bufferClsName, "rightBuffer",
      v => s"$v = new $bufferClsName();", forceInline = true)
    val matchedClsName = classOf[BitSet].getName
    val leftMatched = ctx.addMutableState(matchedClsName, "leftMatched",
      v => s"$v = new $matchedClsName(1);", forceInline = true)
    val rightMatched = ctx.addMutableState(matchedClsName, "rightMatched",
      v => s"$v = new $matchedClsName(1);", forceInline = true)
    val leftIndex = ctx.freshName("leftIndex")
    val rightIndex = ctx.freshName("rightIndex")

    // Generate code for join condition
    val leftResultVars = genOneSideJoinVars(
      ctx, leftOutputRow, left, setDefaultValue = true)
    val rightResultVars = genOneSideJoinVars(
      ctx, rightOutputRow, right, setDefaultValue = true)
    val resultVars = leftResultVars ++ rightResultVars
    val (_, conditionCheck, _) =
      getJoinCondition(ctx, leftResultVars, left, right, Some(rightOutputRow))

    // Generate code for result output in separate function, as we need to output result from
    // multiple places in join code.
    val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
    ctx.addNewFunction(consumeFullOuterJoinRow,
      s"""
         |private void $consumeFullOuterJoinRow() throws java.io.IOException {
         |  ${metricTerm(ctx, "numOutputRows")}.add(1);
         |  ${consume(ctx, resultVars)}
         |}
       """.stripMargin)

    // Handle the case when input row has no match.
    val outputLeftNoMatch =
      s"""
         |$leftOutputRow = $leftInputRow;
         |$rightOutputRow = null;
         |$leftInputRow = null;
         |$consumeFullOuterJoinRow();
       """.stripMargin
    val outputRightNoMatch =
      s"""
         |$rightOutputRow = $rightInputRow;
         |$leftOutputRow = null;
         |$rightInputRow = null;
         |$consumeFullOuterJoinRow();
       """.stripMargin

    // Generate a function to scan both sides to find rows with matched join keys.
    // The matched rows from both sides are copied in buffers separately. This function assumes
    // either non-empty `leftIter` and `rightIter`, or non-null `leftInputRow` and `rightInputRow`.
    //
    // The function has the following steps:
    //  - Step 1: Find the next `leftInputRow` and `rightInputRow` with non-null join keys.
    //            Output row with null join keys (`outputLeftNoMatch` and `outputRightNoMatch`).
    //
    //  - Step 2: Compare and find next same join keys from between `leftInputRow` and
    //            `rightInputRow`.
    //            Output row with smaller join keys (`outputLeftNoMatch` and `outputRightNoMatch`).
    //
    //  - Step 3: Buffer rows with same join keys from both sides into `leftBuffer` and
    //            `rightBuffer`. Reset bit sets for both buffers accordingly (`leftMatched` and
    //            `rightMatched`).
    val findNextJoinRowsFuncName = ctx.freshName("findNextJoinRows")
    ctx.addNewFunction(findNextJoinRowsFuncName,
      s"""
         |private void $findNextJoinRowsFuncName(
         |    scala.collection.Iterator leftIter,
         |    scala.collection.Iterator rightIter) throws java.io.IOException {
         |  int comp = 0;
         |  $leftBuffer.clear();
         |  $rightBuffer.clear();
         |
         |  if ($leftInputRow == null) {
         |    $leftInputRow = (InternalRow) leftIter.next();
         |  }
         |  if ($rightInputRow == null) {
         |    $rightInputRow = (InternalRow) rightIter.next();
         |  }
         |
         |  ${leftKeyVars.map(_.code).mkString("\n")}
         |  if ($leftAnyNull) {
         |    // The left row join key is null, join it with null row
         |    $outputLeftNoMatch
         |    return;
         |  }
         |
         |  ${rightKeyVars.map(_.code).mkString("\n")}
         |  if ($rightAnyNull) {
         |    // The right row join key is null, join it with null row
         |    $outputRightNoMatch
         |    return;
         |  }
         |
         |  ${genComparison(ctx, leftKeyVars, rightKeyVars)}
         |  if (comp < 0) {
         |    // The left row join key is smaller, join it with null row
         |    $outputLeftNoMatch
         |    return;
         |  } else if (comp > 0) {
         |    // The right row join key is smaller, join it with null row
         |    $outputRightNoMatch
         |    return;
         |  }
         |
         |  ${matchedKeyVars.map(_.code).mkString("\n")}
         |  $leftBuffer.add($leftInputRow.copy());
         |  $rightBuffer.add($rightInputRow.copy());
         |  $leftInputRow = null;
         |  $rightInputRow = null;
         |
         |  // Buffer rows from both sides with same join key
         |  while (leftIter.hasNext()) {
         |    $leftInputRow = (InternalRow) leftIter.next();
         |    ${leftMatchedKeyVars.map(_.code).mkString("\n")}
         |    ${genComparison(ctx, leftMatchedKeyVars, matchedKeyVars)}
         |    if (comp == 0) {
         |
         |      $leftBuffer.add($leftInputRow.copy());
         |      $leftInputRow = null;
         |    } else {
         |      break;
         |    }
         |  }
         |  while (rightIter.hasNext()) {
         |    $rightInputRow = (InternalRow) rightIter.next();
         |    ${rightMatchedKeyVars.map(_.code).mkString("\n")}
         |    ${genComparison(ctx, rightMatchedKeyVars, matchedKeyVars)}
         |    if (comp == 0) {
         |      $rightBuffer.add($rightInputRow.copy());
         |      $rightInputRow = null;
         |    } else {
         |      break;
         |    }
         |  }
         |
         |  // Reset bit sets of buffers accordingly
         |  if ($leftBuffer.size() <= $leftMatched.capacity()) {
         |    $leftMatched.clearUntil($leftBuffer.size());
         |  } else {
         |    $leftMatched = new $matchedClsName($leftBuffer.size());
         |  }
         |  if ($rightBuffer.size() <= $rightMatched.capacity()) {
         |    $rightMatched.clearUntil($rightBuffer.size());
         |  } else {
         |    $rightMatched = new $matchedClsName($rightBuffer.size());
         |  }
         |}
       """.stripMargin)

    // Scan the left and right buffers to find all matched rows.
    val matchRowsInBuffer =
      s"""
         |int $leftIndex;
         |int $rightIndex;
         |
         |for ($leftIndex = 0; $leftIndex < $leftBuffer.size(); $leftIndex++) {
         |  $leftOutputRow = (InternalRow) $leftBuffer.get($leftIndex);
         |  for ($rightIndex = 0; $rightIndex < $rightBuffer.size(); $rightIndex++) {
         |    $rightOutputRow = (InternalRow) $rightBuffer.get($rightIndex);
         |    $conditionCheck {
         |      $consumeFullOuterJoinRow();
         |      $leftMatched.set($leftIndex);
         |      $rightMatched.set($rightIndex);
         |    }
         |  }
         |
         |  if (!$leftMatched.get($leftIndex)) {
         |
         |    $rightOutputRow = null;
         |    $consumeFullOuterJoinRow();
         |  }
         |}
         |
         |$leftOutputRow = null;
         |for ($rightIndex = 0; $rightIndex < $rightBuffer.size(); $rightIndex++) {
         |  if (!$rightMatched.get($rightIndex)) {
         |    // The right row has never matched any left row, join it with null row
         |    $rightOutputRow = (InternalRow) $rightBuffer.get($rightIndex);
         |    $consumeFullOuterJoinRow();
         |  }
         |}
       """.stripMargin

    s"""
       |while (($leftInputRow != null || $leftInput.hasNext()) &&
       |  ($rightInputRow != null || $rightInput.hasNext())) {
       |  $findNextJoinRowsFuncName($leftInput, $rightInput);
       |  $matchRowsInBuffer
       |  if (shouldStop()) return;
       |}
       |
       |// The right iterator has no more rows, join left row with null
       |while ($leftInputRow != null || $leftInput.hasNext()) {
       |  if ($leftInputRow == null) {
       |    $leftInputRow = (InternalRow) $leftInput.next();
       |  }
       |  $outputLeftNoMatch
       |  if (shouldStop()) return;
       |}
       |
       |// The left iterator has no more rows, join right row with null
       |while ($rightInputRow != null || $rightInput.hasNext()) {
       |  if ($rightInputRow == null) {
       |    $rightInputRow = (InternalRow) $rightInput.next();
       |  }
       |  $outputRightNoMatch
       |  if (shouldStop()) return;
       |}
     """.stripMargin
  }