in spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java [68:231]
public Seq<SparkPlan> apply(LogicalPlan plan) {
if (plan instanceof AddPartitionField) {
AddPartitionField addPartitionField = (AddPartitionField) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, addPartitionField.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
AddPartitionFieldExec addPartitionFieldExec =
new AddPartitionFieldExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
addPartitionField.transform(),
addPartitionField.name());
return toSeq(addPartitionFieldExec);
})
.get();
} else if (plan instanceof CreateOrReplaceBranch) {
CreateOrReplaceBranch createOrReplaceBranch = (CreateOrReplaceBranch) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, createOrReplaceBranch.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
CreateOrReplaceBranchExec createOrReplaceBranchExec =
new CreateOrReplaceBranchExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
createOrReplaceBranch.branch(),
createOrReplaceBranch.branchOptions(),
createOrReplaceBranch.create(),
createOrReplaceBranch.replace(),
createOrReplaceBranch.ifNotExists());
return toSeq(createOrReplaceBranchExec);
})
.get();
} else if (plan instanceof CreateOrReplaceTag) {
CreateOrReplaceTag createOrReplaceTag = (CreateOrReplaceTag) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, createOrReplaceTag.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
CreateOrReplaceTagExec createOrReplaceTagExec =
new CreateOrReplaceTagExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
createOrReplaceTag.tag(),
createOrReplaceTag.tagOptions(),
createOrReplaceTag.create(),
createOrReplaceTag.replace(),
createOrReplaceTag.ifNotExists());
return toSeq(createOrReplaceTagExec);
})
.get();
} else if (plan instanceof DropBranch) {
DropBranch dropBranch = (DropBranch) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, dropBranch.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
DropBranchExec dropBranchExec =
new DropBranchExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
dropBranch.branch(),
dropBranch.ifExists());
return toSeq(dropBranchExec);
})
.get();
} else if (plan instanceof DropTag) {
DropTag dropTag = (DropTag) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, dropTag.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
DropTagExec dropTagExec =
new DropTagExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
dropTag.tag(),
dropTag.ifExists());
return toSeq(dropTagExec);
})
.get();
} else if (plan instanceof DropPartitionField) {
DropPartitionField dropPartitionField = (DropPartitionField) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, dropPartitionField.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
DropPartitionFieldExec dropPartitionFieldExec =
new DropPartitionFieldExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
dropPartitionField.transform());
return toSeq(dropPartitionFieldExec);
})
.get();
} else if (plan instanceof ReplacePartitionField) {
ReplacePartitionField replacePartitionField = (ReplacePartitionField) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, replacePartitionField.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
ReplacePartitionFieldExec replacePartitionFieldExec =
new ReplacePartitionFieldExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
replacePartitionField.transformFrom(),
replacePartitionField.transformTo(),
replacePartitionField.name());
return toSeq(replacePartitionFieldExec);
})
.get();
} else if (plan instanceof SetIdentifierFields) {
SetIdentifierFields setIdentifierFields = (SetIdentifierFields) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, setIdentifierFields.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
SetIdentifierFieldsExec setIdentifierFieldsExec =
new SetIdentifierFieldsExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
setIdentifierFields.fields());
return toSeq(setIdentifierFieldsExec);
})
.get();
} else if (plan instanceof DropIdentifierFields) {
DropIdentifierFields dropIdentifierFields = (DropIdentifierFields) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, dropIdentifierFields.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
DropIdentifierFieldsExec dropIdentifierFieldsExec =
new DropIdentifierFieldsExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
dropIdentifierFields.fields());
return toSeq(dropIdentifierFieldsExec);
})
.get();
} else if (plan instanceof SetWriteDistributionAndOrdering) {
SetWriteDistributionAndOrdering setWriteDistributionAndOrdering =
(SetWriteDistributionAndOrdering) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, setWriteDistributionAndOrdering.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
SetWriteDistributionAndOrderingExec setWriteDistributionAndOrderingExec =
new SetWriteDistributionAndOrderingExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
setWriteDistributionAndOrdering.distributionMode(),
setWriteDistributionAndOrdering.sortOrder());
return toSeq(setWriteDistributionAndOrderingExec);
})
.get();
} else {
scala.collection.Seq<SparkPlan> sparkPlans = super.apply(plan);
if (sparkPlans != null) {
return sparkPlans.toIndexedSeq();
}
return null;
}
}