public Seq apply()

in spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java [68:231]


  public Seq<SparkPlan> apply(LogicalPlan plan) {
    if (plan instanceof AddPartitionField) {
      AddPartitionField addPartitionField = (AddPartitionField) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, addPartitionField.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                AddPartitionFieldExec addPartitionFieldExec =
                    new AddPartitionFieldExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        addPartitionField.transform(),
                        addPartitionField.name());
                return toSeq(addPartitionFieldExec);
              })
          .get();
    } else if (plan instanceof CreateOrReplaceBranch) {
      CreateOrReplaceBranch createOrReplaceBranch = (CreateOrReplaceBranch) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, createOrReplaceBranch.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                CreateOrReplaceBranchExec createOrReplaceBranchExec =
                    new CreateOrReplaceBranchExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        createOrReplaceBranch.branch(),
                        createOrReplaceBranch.branchOptions(),
                        createOrReplaceBranch.create(),
                        createOrReplaceBranch.replace(),
                        createOrReplaceBranch.ifNotExists());
                return toSeq(createOrReplaceBranchExec);
              })
          .get();
    } else if (plan instanceof CreateOrReplaceTag) {
      CreateOrReplaceTag createOrReplaceTag = (CreateOrReplaceTag) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, createOrReplaceTag.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                CreateOrReplaceTagExec createOrReplaceTagExec =
                    new CreateOrReplaceTagExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        createOrReplaceTag.tag(),
                        createOrReplaceTag.tagOptions(),
                        createOrReplaceTag.create(),
                        createOrReplaceTag.replace(),
                        createOrReplaceTag.ifNotExists());
                return toSeq(createOrReplaceTagExec);
              })
          .get();
    } else if (plan instanceof DropBranch) {
      DropBranch dropBranch = (DropBranch) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, dropBranch.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                DropBranchExec dropBranchExec =
                    new DropBranchExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        dropBranch.branch(),
                        dropBranch.ifExists());
                return toSeq(dropBranchExec);
              })
          .get();
    } else if (plan instanceof DropTag) {
      DropTag dropTag = (DropTag) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, dropTag.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                DropTagExec dropTagExec =
                    new DropTagExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        dropTag.tag(),
                        dropTag.ifExists());
                return toSeq(dropTagExec);
              })
          .get();
    } else if (plan instanceof DropPartitionField) {
      DropPartitionField dropPartitionField = (DropPartitionField) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, dropPartitionField.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                DropPartitionFieldExec dropPartitionFieldExec =
                    new DropPartitionFieldExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        dropPartitionField.transform());
                return toSeq(dropPartitionFieldExec);
              })
          .get();
    } else if (plan instanceof ReplacePartitionField) {
      ReplacePartitionField replacePartitionField = (ReplacePartitionField) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, replacePartitionField.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                ReplacePartitionFieldExec replacePartitionFieldExec =
                    new ReplacePartitionFieldExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        replacePartitionField.transformFrom(),
                        replacePartitionField.transformTo(),
                        replacePartitionField.name());
                return toSeq(replacePartitionFieldExec);
              })
          .get();
    } else if (plan instanceof SetIdentifierFields) {
      SetIdentifierFields setIdentifierFields = (SetIdentifierFields) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, setIdentifierFields.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                SetIdentifierFieldsExec setIdentifierFieldsExec =
                    new SetIdentifierFieldsExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        setIdentifierFields.fields());
                return toSeq(setIdentifierFieldsExec);
              })
          .get();
    } else if (plan instanceof DropIdentifierFields) {
      DropIdentifierFields dropIdentifierFields = (DropIdentifierFields) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, dropIdentifierFields.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                DropIdentifierFieldsExec dropIdentifierFieldsExec =
                    new DropIdentifierFieldsExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        dropIdentifierFields.fields());
                return toSeq(dropIdentifierFieldsExec);
              })
          .get();
    } else if (plan instanceof SetWriteDistributionAndOrdering) {
      SetWriteDistributionAndOrdering setWriteDistributionAndOrdering =
          (SetWriteDistributionAndOrdering) plan;
      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
              spark, setWriteDistributionAndOrdering.table().toIndexedSeq())
          .map(
              catalogAndIdentifier -> {
                SetWriteDistributionAndOrderingExec setWriteDistributionAndOrderingExec =
                    new SetWriteDistributionAndOrderingExec(
                        catalogAndIdentifier.catalog,
                        catalogAndIdentifier.identifier,
                        setWriteDistributionAndOrdering.distributionMode(),
                        setWriteDistributionAndOrdering.sortOrder());
                return toSeq(setWriteDistributionAndOrderingExec);
              })
          .get();
    } else {
      scala.collection.Seq<SparkPlan> sparkPlans = super.apply(plan);
      if (sparkPlans != null) {
        return sparkPlans.toIndexedSeq();
      }
      return null;
    }
  }