odps-examples/tunnel-examples/src/main/java/SchemaEvolution/StreamUploadIfSchemaEvolutionExpectedSample.java [121:213]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      // Any cleanup code can be placed here
      // In this example, there's no specific resource to close, but it's good practice
    }
  }

  private static TableTunnel.StreamUploadSession rebuildSessionUtilSchemaEvolution(Odps odps,
                                                                                   String latestSchemaVersion)
      throws Exception {
    if (!StringUtils.isNullOrEmpty(latestSchemaVersion)) {
      return odps.tableTunnel()
          .buildStreamUploadSession(project, table)
          .setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
          .allowSchemaMismatch(false)
          .setSchemaVersion(latestSchemaVersion)
          .build();
    } else {
      /**
       * If the schema has changed, but no new schema version,
       * (This may occur when the server is not updated and usually does not go to this branch.)
       * the user should re-create the session and check if the session has noticed the new schema.
       */
      TableTunnel.StreamUploadSession session;
      do {
        session =
            odps.tableTunnel()
                .buildStreamUploadSession(project, table)
                .setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
                .allowSchemaMismatch(false)
                .build();
        System.out.println("Session Schema: " + debugString(session.getSchema()));
        System.out.println("Table Schema: " + debugString(odps.tables().get(project, table).getSchema()));
        TimeUnit.SECONDS.sleep(5);
      } while (!basicallyEquals(odps.tables().get(project, table).getSchema()
          , session.getSchema()));
      return session;
    }
  }

  /**
   * Create a test table, with two columns, c1 bigint and c2 bigint
   */
  private static void createTestTable() throws OdpsException {
    getOdps().tables().delete(project, table, true);
    getOdps()
        .tables()
        .newTableCreator(project, table, TableSchema.builder()
            .withBigintColumn("c1")
            .withBigintColumn("c2")
            .build())
        .ifNotExists()
        .debug()
        .create();
  }

  /**
   * Trigger schema evolution on the table. Here we run a SQL statement to add a column.
   */
  private static void triggerSchemaEvolution() throws OdpsException {
    Instance
        instance =
        SQLTask.run(getOdps(), "alter table " + project + "." + table + " add column c3 bigint;");
    // print logview to check the progress of schema evolution
    System.out.println(getOdps().logview().generateLogView(instance, 24));
    instance.waitForSuccess();
  }

  private static String debugString(TableSchema schema) {
    return schema.getAllColumns().stream()
        .map(column -> column.getName() + "(" + column.getTypeInfo().getTypeName() + ")")
        .collect(Collectors.joining(", "));
  }

  /**
   * Check if two schemas are basically equal
   */
  private static boolean basicallyEquals(TableSchema a, TableSchema b) {
    List<Column> columnsA = a.getAllColumns();
    List<Column> columnsB = b.getAllColumns();
    if (columnsA.size() != columnsB.size()) {
      return false;
    }
    for (int i = 0; i < columnsA.size(); i++) {
      Column columnA = columnsA.get(i);
      Column columnB = columnsB.get(i);
      if (!columnA.getName().equals(columnB.getName()) || !columnA.getTypeInfo().getTypeName()
          .equals(columnB.getTypeInfo().getTypeName())) {
        return false;
      }
    }
    return true;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



odps-examples/tunnel-examples/src/main/java/SchemaEvolution/StreamUploadIfSchemaEvolutionUnexpectedSample.java [132:224]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      // Any cleanup code can be placed here
      // In this example, there's no specific resource to close, but it's good practice
    }
  }

  private static TableTunnel.StreamUploadSession rebuildSessionUtilSchemaEvolution(Odps odps,
                                                                                   String latestSchemaVersion)
      throws Exception {
    if (!StringUtils.isNullOrEmpty(latestSchemaVersion)) {
      return odps.tableTunnel()
          .buildStreamUploadSession(project, table)
          .setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
          .allowSchemaMismatch(false)
          .setSchemaVersion(latestSchemaVersion)
          .build();
    } else {
      /**
       * If the schema has changed, but no new schema version,
       * (This may occur when the server is not updated and usually does not go to this branch.)
       * the user should re-create the session and check if the session has noticed the new schema.
       */
      TableTunnel.StreamUploadSession session;
      do {
        session =
            odps.tableTunnel()
                .buildStreamUploadSession(project, table)
                .setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
                .allowSchemaMismatch(false)
                .build();
        System.out.println("Session Schema: " + debugString(session.getSchema()));
        System.out.println("Table Schema: " + debugString(odps.tables().get(project, table).getSchema()));
        TimeUnit.SECONDS.sleep(5);
      } while (!basicallyEquals(odps.tables().get(project, table).getSchema()
          , session.getSchema()));
      return session;
    }
  }

  /**
   * Create a test table, with two columns, c1 bigint and c2 bigint
   */
  private static void createTestTable() throws OdpsException {
    getOdps().tables().delete(project, table, true);
    getOdps()
        .tables()
        .newTableCreator(project, table, TableSchema.builder()
            .withBigintColumn("c1")
            .withBigintColumn("c2")
            .build())
        .ifNotExists()
        .debug()
        .create();
  }

  /**
   * Trigger schema evolution on the table. Here we run a SQL statement to add a column.
   */
  private static void triggerSchemaEvolution() throws OdpsException {
    Instance
        instance =
        SQLTask.run(getOdps(), "alter table " + project + "." + table + " add column c3 bigint;");
    // print logview to check the progress of schema evolution
    System.out.println(getOdps().logview().generateLogView(instance, 24));
    instance.waitForSuccess();
  }

  private static String debugString(TableSchema schema) {
    return schema.getAllColumns().stream()
        .map(column -> column.getName() + "(" + column.getTypeInfo().getTypeName() + ")")
        .collect(Collectors.joining(", "));
  }

  /**
   * Check if two schemas are basically equal
   */
  private static boolean basicallyEquals(TableSchema a, TableSchema b) {
    List<Column> columnsA = a.getAllColumns();
    List<Column> columnsB = b.getAllColumns();
    if (columnsA.size() != columnsB.size()) {
      return false;
    }
    for (int i = 0; i < columnsA.size(); i++) {
      Column columnA = columnsA.get(i);
      Column columnB = columnsB.get(i);
      if (!columnA.getName().equals(columnB.getName()) || !columnA.getTypeInfo().getTypeName()
          .equals(columnB.getTypeInfo().getTypeName())) {
        return false;
      }
    }
    return true;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



