hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java [93:139]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static InternalSchema applyTableChange(InternalSchema oldSchema, TableChange change, Function<LogicalType, Type> convertFunc) {
    InternalSchemaChangeApplier changeApplier = new InternalSchemaChangeApplier(oldSchema);
    if (change instanceof TableChange.AddColumn) {
      if (((TableChange.AddColumn) change).getColumn().isPhysical()) {
        TableChange.AddColumn add = (TableChange.AddColumn) change;
        Column column = add.getColumn();
        String colName = column.getName();
        Type colType = convertFunc.apply(column.getDataType().getLogicalType());
        String comment = column.getComment().orElse(null);
        Pair<org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType, String> colPositionPair =
            parseColumnPosition(add.getPosition());
        return changeApplier.applyAddChange(
            colName, colType, comment, colPositionPair.getRight(), colPositionPair.getLeft());
      } else {
        throw new HoodieNotSupportedException("Add non-physical column is not supported yet.");
      }
    } else if (change instanceof TableChange.DropColumn) {
      TableChange.DropColumn drop = (TableChange.DropColumn) change;
      return changeApplier.applyDeleteChange(drop.getColumnName());
    } else if (change instanceof TableChange.ModifyColumnName) {
      TableChange.ModifyColumnName modify = (TableChange.ModifyColumnName) change;
      String oldColName = modify.getOldColumnName();
      String newColName = modify.getNewColumnName();
      return changeApplier.applyRenameChange(oldColName, newColName);
    } else if (change instanceof TableChange.ModifyPhysicalColumnType) {
      TableChange.ModifyPhysicalColumnType modify = (TableChange.ModifyPhysicalColumnType) change;
      String colName = modify.getOldColumn().getName();
      Type newColType = convertFunc.apply(modify.getNewType().getLogicalType());
      return changeApplier.applyColumnTypeChange(colName, newColType);
    } else if (change instanceof TableChange.ModifyColumnPosition) {
      TableChange.ModifyColumnPosition modify = (TableChange.ModifyColumnPosition) change;
      String colName = modify.getOldColumn().getName();
      Pair<org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType, String> colPositionPair =
          parseColumnPosition(modify.getNewPosition());
      return changeApplier.applyReOrderColPositionChange(
          colName, colPositionPair.getRight(), colPositionPair.getLeft());
    } else if (change instanceof TableChange.ModifyColumnComment) {
      TableChange.ModifyColumnComment modify  = (TableChange.ModifyColumnComment) change;
      String colName = modify.getOldColumn().getName();
      String comment = modify.getNewComment();
      return changeApplier.applyColumnCommentChange(colName, comment);
    } else if (change instanceof TableChange.ResetOption || change instanceof TableChange.SetOption) {
      return oldSchema;
    } else {
      throw new HoodieNotSupportedException(change.getClass().getSimpleName() + " is not supported.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java [93:139]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private static InternalSchema applyTableChange(InternalSchema oldSchema, TableChange change, Function<LogicalType, Type> convertFunc) {
    InternalSchemaChangeApplier changeApplier = new InternalSchemaChangeApplier(oldSchema);
    if (change instanceof TableChange.AddColumn) {
      if (((TableChange.AddColumn) change).getColumn().isPhysical()) {
        TableChange.AddColumn add = (TableChange.AddColumn) change;
        Column column = add.getColumn();
        String colName = column.getName();
        Type colType = convertFunc.apply(column.getDataType().getLogicalType());
        String comment = column.getComment().orElse(null);
        Pair<org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType, String> colPositionPair =
            parseColumnPosition(add.getPosition());
        return changeApplier.applyAddChange(
            colName, colType, comment, colPositionPair.getRight(), colPositionPair.getLeft());
      } else {
        throw new HoodieNotSupportedException("Add non-physical column is not supported yet.");
      }
    } else if (change instanceof TableChange.DropColumn) {
      TableChange.DropColumn drop = (TableChange.DropColumn) change;
      return changeApplier.applyDeleteChange(drop.getColumnName());
    } else if (change instanceof TableChange.ModifyColumnName) {
      TableChange.ModifyColumnName modify = (TableChange.ModifyColumnName) change;
      String oldColName = modify.getOldColumnName();
      String newColName = modify.getNewColumnName();
      return changeApplier.applyRenameChange(oldColName, newColName);
    } else if (change instanceof TableChange.ModifyPhysicalColumnType) {
      TableChange.ModifyPhysicalColumnType modify = (TableChange.ModifyPhysicalColumnType) change;
      String colName = modify.getOldColumn().getName();
      Type newColType = convertFunc.apply(modify.getNewType().getLogicalType());
      return changeApplier.applyColumnTypeChange(colName, newColType);
    } else if (change instanceof TableChange.ModifyColumnPosition) {
      TableChange.ModifyColumnPosition modify = (TableChange.ModifyColumnPosition) change;
      String colName = modify.getOldColumn().getName();
      Pair<org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType, String> colPositionPair =
          parseColumnPosition(modify.getNewPosition());
      return changeApplier.applyReOrderColPositionChange(
          colName, colPositionPair.getRight(), colPositionPair.getLeft());
    } else if (change instanceof TableChange.ModifyColumnComment) {
      TableChange.ModifyColumnComment modify  = (TableChange.ModifyColumnComment) change;
      String colName = modify.getOldColumn().getName();
      String comment = modify.getNewComment();
      return changeApplier.applyColumnCommentChange(colName, comment);
    } else if (change instanceof TableChange.ResetOption || change instanceof TableChange.SetOption) {
      return oldSchema;
    } else {
      throw new HoodieNotSupportedException(change.getClass().getSimpleName() + " is not supported.");
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



