in amoro-format-mixed/amoro-mixed-spark/v3.5/amoro-mixed-spark-3.5/src/main/scala/org/apache/amoro/spark/sql/execution/MergeRowsExec.scala [102:180]
private def processPartition(rowIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
val inputAttrs = child.output
val isSourceRowPresentPred = createPredicate(isSourceRowPresent, inputAttrs)
val isTargetRowPresentPred = createPredicate(isTargetRowPresent, inputAttrs)
val matchedPreds = matchedConditions.map(createPredicate(_, inputAttrs))
val matchedProjs = matchedOutputs.map {
case output if output.nonEmpty =>
Some(createProjection(output, inputAttrs))
case _ => None
}
val matchedPairs = matchedPreds zip matchedProjs
val notMatchedPreds = notMatchedConditions.map(createPredicate(_, inputAttrs))
val notMatchedProjs = notMatchedOutputs.map {
case output if output.nonEmpty => Some(createProjection(output, inputAttrs))
case _ => None
}
val nonMatchedPairs = notMatchedPreds zip notMatchedProjs
val projectTargetCols = createProjection(Nil, inputAttrs)
val rowIdProj = createProjection(rowIdAttrs, inputAttrs)
def processRow(inputRow: InternalRow): InternalRow = {
if (emitNotMatchedTargetRows && !isSourceRowPresentPred.eval(inputRow)) {
projectTargetCols.apply(inputRow)
} else if (!isTargetRowPresentPred.eval(inputRow)) {
applyProjection(nonMatchedPairs, inputRow)
} else {
applyProjection(matchedPairs, inputRow)
}
}
var lastMatchedRowId: InternalRow = null
def processRowWithMatchedOrUnMatchedRowCheck(inputRow: InternalRow): InternalRow = {
val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
if (isSourceRowPresent && isTargetRowPresent) {
val currentRowId = rowIdProj.apply(inputRow)
if (currentRowId == lastMatchedRowId) {
throw new SparkException(
"The ON search condition of the MERGE statement matched a single row from " +
"the target table with multiple rows of the source table. ")
}
lastMatchedRowId = currentRowId.copy()
} else if (isSourceRowPresent && !isTargetRowPresent && unMatchedRowCheck) {
val currentRowId = rowIdProj.apply(inputRow)
if (currentRowId == lastMatchedRowId) {
throw new SparkException(
"There are multiple duplicate primary key data in the inserted data, " +
"which cannot guarantee the uniqueness of the primary key. ")
}
lastMatchedRowId = currentRowId.copy()
} else {
lastMatchedRowId = null
}
if (emitNotMatchedTargetRows && !isSourceRowPresent) {
projectTargetCols.apply(inputRow)
} else if (!isTargetRowPresent) {
applyProjection(nonMatchedPairs, inputRow)
} else {
applyProjection(matchedPairs, inputRow)
}
}
val processFunc: InternalRow => InternalRow = if (matchedRowCheck || unMatchedRowCheck) {
processRowWithMatchedOrUnMatchedRowCheck
} else {
processRow
}
rowIterator
.map(processFunc)
.filter(row => row != null)
}