private def processPartition()

in amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/execution/MergeRowsExec.scala [102:180]


  private def processPartition(rowIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
    val inputAttrs = child.output

    val isSourceRowPresentPred = createPredicate(isSourceRowPresent, inputAttrs)
    val isTargetRowPresentPred = createPredicate(isTargetRowPresent, inputAttrs)

    val matchedPreds = matchedConditions.map(createPredicate(_, inputAttrs))
    val matchedProjs = matchedOutputs.map {
      case output if output.nonEmpty =>
        Some(createProjection(output, inputAttrs))
      case _ => None
    }
    val matchedPairs = matchedPreds zip matchedProjs

    val notMatchedPreds = notMatchedConditions.map(createPredicate(_, inputAttrs))
    val notMatchedProjs = notMatchedOutputs.map {
      case output if output.nonEmpty => Some(createProjection(output, inputAttrs))
      case _ => None
    }
    val nonMatchedPairs = notMatchedPreds zip notMatchedProjs

    val projectTargetCols = createProjection(Nil, inputAttrs)
    val rowIdProj = createProjection(rowIdAttrs, inputAttrs)

    def processRow(inputRow: InternalRow): InternalRow = {
      if (emitNotMatchedTargetRows && !isSourceRowPresentPred.eval(inputRow)) {
        projectTargetCols.apply(inputRow)
      } else if (!isTargetRowPresentPred.eval(inputRow)) {
        applyProjection(nonMatchedPairs, inputRow)
      } else {
        applyProjection(matchedPairs, inputRow)
      }
    }

    var lastMatchedRowId: InternalRow = null

    def processRowWithMatchedOrUnMatchedRowCheck(inputRow: InternalRow): InternalRow = {
      val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
      val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)

      if (isSourceRowPresent && isTargetRowPresent) {
        val currentRowId = rowIdProj.apply(inputRow)
        if (currentRowId == lastMatchedRowId) {
          throw new SparkException(
            "The ON search condition of the MERGE statement matched a single row from " +
              "the target table with multiple rows of the source table. ")
        }
        lastMatchedRowId = currentRowId.copy()
      } else if (isSourceRowPresent && !isTargetRowPresent && unMatchedRowCheck) {
        val currentRowId = rowIdProj.apply(inputRow)
        if (currentRowId == lastMatchedRowId) {
          throw new SparkException(
            "There are multiple duplicate primary key data in the inserted data, " +
              "which cannot guarantee the uniqueness of the primary key. ")
        }
        lastMatchedRowId = currentRowId.copy()
      } else {
        lastMatchedRowId = null
      }

      if (emitNotMatchedTargetRows && !isSourceRowPresent) {
        projectTargetCols.apply(inputRow)
      } else if (!isTargetRowPresent) {
        applyProjection(nonMatchedPairs, inputRow)
      } else {
        applyProjection(matchedPairs, inputRow)
      }
    }

    val processFunc: InternalRow => InternalRow = if (matchedRowCheck || unMatchedRowCheck) {
      processRowWithMatchedOrUnMatchedRowCheck
    } else {
      processRow
    }

    rowIterator
      .map(processFunc)
      .filter(row => row != null)
  }