in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [374:395]
private void resetNormalUploadSessionIfNeeded(Long timestamp) throws OdpsException, IOException {
if (needToResetUploadSession(timestamp)) {
closeCurrentSessionWithRetry(retryTimes);
if (needResetPartition) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Reset upload session and odps partition, last timestamp: {}, current: {}",
partitionStartTimestamp, timestamp);
}
PartitionSpec partitionSpec = getPartitionSpec(timestamp);
createPartition(odps, project, table, partitionSpec);
this.partitionSpec = partitionSpec;
this.partitionStartTimestamp = null;
resetPartitionStartTimestamp(timestamp);
}
session = tunnel.createUploadSession(project, table, partitionSpec);
writer = session.openBufferedWriter(true);
reusedRecord = session.newRecord();
((TunnelBufferedWriter) writer).setBufferSize(getActualBufferBytes());
}
}