private void resetStreamUploadSessionIfNeeded()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [352:372]


  private void resetStreamUploadSessionIfNeeded(Long timestamp) throws OdpsException, IOException {
    if (needToResetUploadSession(timestamp)) {
      LOGGER.info("Reset stream upload session, last timestamp: {}, current: {}",
                  partitionStartTimestamp, timestamp);
      // try flushing the pack
      flushStreamPackWithRetry(retryTimes);

      PartitionSpec partitionSpec = getPartitionSpec(timestamp);
      this.partitionSpec = partitionSpec;
      this.partitionStartTimestamp = null;
      resetPartitionStartTimestamp(timestamp);

      streamSession =
          tunnel.buildStreamUploadSession(project, table)
              .setPartitionSpec(partitionSpec)
              .setCreatePartition(true)
              .build();
      streamPack = recreateRecordPack();
      reusedRecord = streamSession.newRecord();
    }
  }