in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [456:482]
private void resetPartitionStartTimestamp(Long timestamp) {
if (partitionStartTimestamp == null) {
ZonedDateTime dt = Instant.ofEpochSecond(timestamp).atZone(tz.toZoneId());
ZonedDateTime partitionStartDatetime;
switch (partitionWindowType) {
case DAY:
partitionStartDatetime =
ZonedDateTime.of(dt.getYear(), dt.getMonthValue(), dt.getDayOfMonth(),
0, 0, 0, 0, tz.toZoneId());
break;
case HOUR:
partitionStartDatetime =
ZonedDateTime.of(dt.getYear(), dt.getMonthValue(), dt.getDayOfMonth(),
dt.getHour(), 0, 0, 0, tz.toZoneId());
break;
case MINUTE:
partitionStartDatetime =
ZonedDateTime.of(dt.getYear(), dt.getMonthValue(), dt.getDayOfMonth(),
dt.getHour(), dt.getMinute(), 0, 0, tz.toZoneId());
break;
default:
throw new RuntimeException("Unsupported partition window type");
}
partitionStartTimestamp = partitionStartDatetime.toEpochSecond();
}
}