in amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala [70:182]
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case c @ Call(procedure, args) =>
val input = buildInternalRow(args)
CallExec(c.output, procedure, input) :: Nil
case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform, name) =>
AddPartitionFieldExec(catalog, ident, transform, name) :: Nil
case CreateOrReplaceBranch(
IcebergCatalogAndIdentifier(catalog, ident),
branch,
branchOptions,
create,
replace,
ifNotExists) =>
CreateOrReplaceBranchExec(
catalog,
ident,
branch,
branchOptions,
create,
replace,
ifNotExists) :: Nil
case CreateOrReplaceTag(
IcebergCatalogAndIdentifier(catalog, ident),
tag,
tagOptions,
create,
replace,
ifNotExists) =>
CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace, ifNotExists) :: Nil
case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) =>
DropBranchExec(catalog, ident, branch, ifExists) :: Nil
case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
DropTagExec(catalog, ident, tag, ifExists) :: Nil
case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) =>
DropPartitionFieldExec(catalog, ident, transform) :: Nil
case ReplacePartitionField(
IcebergCatalogAndIdentifier(catalog, ident),
transformFrom,
transformTo,
name) =>
ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil
case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
SetIdentifierFieldsExec(catalog, ident, fields) :: Nil
case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
DropIdentifierFieldsExec(catalog, ident, fields) :: Nil
case SetWriteDistributionAndOrdering(
IcebergCatalogAndIdentifier(catalog, ident),
distributionMode,
ordering) =>
SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, ordering) :: Nil
case ReplaceIcebergData(_: DataSourceV2Relation, query, r: DataSourceV2Relation, Some(write)) =>
// refresh the cache using the original relation
ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil
case WriteDelta(_: DataSourceV2Relation, query, r: DataSourceV2Relation, projs, Some(write)) =>
// refresh the cache using the original relation
WriteDeltaExec(planLater(query), refreshCache(r), projs, write) :: Nil
case MergeRows(
isSourceRowPresent,
isTargetRowPresent,
matchedConditions,
matchedOutputs,
notMatchedConditions,
notMatchedOutputs,
targetOutput,
performCardinalityCheck,
emitNotMatchedTargetRows,
output,
child) =>
MergeRowsExec(
isSourceRowPresent,
isTargetRowPresent,
matchedConditions,
matchedOutputs,
notMatchedConditions,
notMatchedOutputs,
targetOutput,
performCardinalityCheck,
emitNotMatchedTargetRows,
output,
planLater(child)) :: Nil
case DeleteFromIcebergTable(DataSourceV2ScanRelation(r, _, output, _), condition, None) =>
// the optimizer has already checked that this delete can be handled using a metadata operation
val deleteCond = condition.getOrElse(Literal.TrueLiteral)
val predicates = splitConjunctivePredicates(deleteCond)
val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
val filters = normalizedPredicates.flatMap { pred =>
val filter = DataSourceStrategy.translateFilter(pred, supportNestedPredicatePushdown = true)
if (filter.isEmpty) {
throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
}
filter
}.toArray
DeleteFromTableExec(r.table.asDeletable, filters, refreshCache(r)) :: Nil
case NoStatsUnaryNode(child) =>
planLater(child) :: Nil
case _ => Nil
}