in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [555:572]
private void resetOffset(TopicPartition partition, MaxComputeSinkWriter writer) {
if (writer == null) {
StringWriter s = new StringWriter();
PrintWriter p = new PrintWriter(s, true);
p.flush();
LOGGER.error("Thread(" + Thread.currentThread().getId() + ") Reset offset but null " +
", topic: " + partition.topic() +
", partition: " + partition.partition() +
", stack: " + p.toString());
return;
}
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Reset offset to " +
writer.getMinOffset() + ", topic: " + partition.topic() +
", partition: " + partition.partition());
// Reset offset
context.offset(partition, writer.getMinOffset());
}