in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [352:372]
private void resetStreamUploadSessionIfNeeded(Long timestamp) throws OdpsException, IOException {
if (needToResetUploadSession(timestamp)) {
LOGGER.info("Reset stream upload session, last timestamp: {}, current: {}",
partitionStartTimestamp, timestamp);
// try flushing the pack
flushStreamPackWithRetry(retryTimes);
PartitionSpec partitionSpec = getPartitionSpec(timestamp);
this.partitionSpec = partitionSpec;
this.partitionStartTimestamp = null;
resetPartitionStartTimestamp(timestamp);
streamSession =
tunnel.buildStreamUploadSession(project, table)
.setPartitionSpec(partitionSpec)
.setCreatePartition(true)
.build();
streamPack = recreateRecordPack();
reusedRecord = streamSession.newRecord();
}
}