public void refresh()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [196:233]


  public void refresh(Odps odps) {
    LOGGER.info("Enter refresh.");
    this.odps = odps;
    this.tunnel = new TableTunnel(odps);
    if (!Objects.equals(this.tunnelEndpoint, "")) {
      this.tunnel.setEndpoint(this.tunnelEndpoint);
    }

    // Upload session may not exist
    if (session != null) {
      String sessionId = session.getId();
      try {
        this.session = tunnel.getUploadSession(project, table, partitionSpec, sessionId);
      } catch (Exception e) {
        LOGGER.error("Set session failed!!!", e);
        throw new RuntimeException(e);
      }
    }
    if (streamSession != null) {
      try {
        // old version: streamSession = tunnel.createStreamUploadSession(project, table, partitionSpec, true);
        // new version: below
        streamSession =
            tunnel.buildStreamUploadSession(project, table)
                .setPartitionSpec(partitionSpec)
                .setCreatePartition(true)
                .build();
        flushStreamPackWithRetry(retryTimes);
        streamPack = recreateRecordPack();
      } catch (TunnelException e) {
        LOGGER.error("Refresh sts token failed: cannot recreate stream session", e);
        throw new RuntimeException(e);
      } catch (IOException e) {
        LOGGER.error("Refresh sts token failed: cannot recreate stream pack", e);
        throw new RuntimeException(e);
      }
    }
  }