private void resetNormalUploadSessionIfNeeded()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [374:395]


  private void resetNormalUploadSessionIfNeeded(Long timestamp) throws OdpsException, IOException {
    if (needToResetUploadSession(timestamp)) {
      closeCurrentSessionWithRetry(retryTimes);

      if (needResetPartition) {
        if (LOGGER.isDebugEnabled()) {
          LOGGER.debug("Reset upload session and odps partition, last timestamp: {}, current: {}",
                       partitionStartTimestamp, timestamp);
        }
        PartitionSpec partitionSpec = getPartitionSpec(timestamp);
        createPartition(odps, project, table, partitionSpec);
        this.partitionSpec = partitionSpec;
        this.partitionStartTimestamp = null;
        resetPartitionStartTimestamp(timestamp);
      }

      session = tunnel.createUploadSession(project, table, partitionSpec);
      writer = session.openBufferedWriter(true);
      reusedRecord = session.newRecord();
      ((TunnelBufferedWriter) writer).setBufferSize(getActualBufferBytes());
    }
  }