in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [427:454]
private boolean needToResetUploadSession(Long timestamp) {
if (partitionStartTimestamp != null) {
switch (partitionWindowType) {
case DAY:
needResetPartition = timestamp >= partitionStartTimestamp + 24 * 60 * 60;
break;
case HOUR:
needResetPartition = timestamp >= partitionStartTimestamp + 60 * 60;
break;
case MINUTE:
needResetPartition = timestamp >= partitionStartTimestamp + 60;
break;
default:
throw new RuntimeException("Unsupported partition window type");
}
} else {
needResetPartition = true;
}
if (session == null && !useStreamingTunnel) {
return true;
}
if (streamSession == null && useStreamingTunnel) {
return true;
}
return needResetPartition;
}