override def apply()

in amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MixedFormatExtendedDataSourceV2Strategy.scala [70:182]


  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case c @ Call(procedure, args) =>
      val input = buildInternalRow(args)
      CallExec(c.output, procedure, input) :: Nil

    case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform, name) =>
      AddPartitionFieldExec(catalog, ident, transform, name) :: Nil

    case CreateOrReplaceBranch(
          IcebergCatalogAndIdentifier(catalog, ident),
          branch,
          branchOptions,
          create,
          replace,
          ifNotExists) =>
      CreateOrReplaceBranchExec(
        catalog,
        ident,
        branch,
        branchOptions,
        create,
        replace,
        ifNotExists) :: Nil

    case CreateOrReplaceTag(
          IcebergCatalogAndIdentifier(catalog, ident),
          tag,
          tagOptions,
          create,
          replace,
          ifNotExists) =>
      CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, create, replace, ifNotExists) :: Nil

    case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) =>
      DropBranchExec(catalog, ident, branch, ifExists) :: Nil

    case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) =>
      DropTagExec(catalog, ident, tag, ifExists) :: Nil

    case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) =>
      DropPartitionFieldExec(catalog, ident, transform) :: Nil

    case ReplacePartitionField(
          IcebergCatalogAndIdentifier(catalog, ident),
          transformFrom,
          transformTo,
          name) =>
      ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil

    case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
      SetIdentifierFieldsExec(catalog, ident, fields) :: Nil

    case DropIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) =>
      DropIdentifierFieldsExec(catalog, ident, fields) :: Nil

    case SetWriteDistributionAndOrdering(
          IcebergCatalogAndIdentifier(catalog, ident),
          distributionMode,
          ordering) =>
      SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, ordering) :: Nil

    case ReplaceIcebergData(_: DataSourceV2Relation, query, r: DataSourceV2Relation, Some(write)) =>
      // refresh the cache using the original relation
      ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil

    case WriteDelta(_: DataSourceV2Relation, query, r: DataSourceV2Relation, projs, Some(write)) =>
      // refresh the cache using the original relation
      WriteDeltaExec(planLater(query), refreshCache(r), projs, write) :: Nil

    case MergeRows(
          isSourceRowPresent,
          isTargetRowPresent,
          matchedConditions,
          matchedOutputs,
          notMatchedConditions,
          notMatchedOutputs,
          targetOutput,
          performCardinalityCheck,
          emitNotMatchedTargetRows,
          output,
          child) =>
      MergeRowsExec(
        isSourceRowPresent,
        isTargetRowPresent,
        matchedConditions,
        matchedOutputs,
        notMatchedConditions,
        notMatchedOutputs,
        targetOutput,
        performCardinalityCheck,
        emitNotMatchedTargetRows,
        output,
        planLater(child)) :: Nil

    case DeleteFromIcebergTable(DataSourceV2ScanRelation(r, _, output, _), condition, None) =>
      // the optimizer has already checked that this delete can be handled using a metadata operation
      val deleteCond = condition.getOrElse(Literal.TrueLiteral)
      val predicates = splitConjunctivePredicates(deleteCond)
      val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
      val filters = normalizedPredicates.flatMap { pred =>
        val filter = DataSourceStrategy.translateFilter(pred, supportNestedPredicatePushdown = true)
        if (filter.isEmpty) {
          throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
        }
        filter
      }.toArray
      DeleteFromTableExec(r.table.asDeletable, filters, refreshCache(r)) :: Nil

    case NoStatsUnaryNode(child) =>
      planLater(child) :: Nil

    case _ => Nil
  }