in src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java [129:152]
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets) throws RetriableException {
// return an empty map means that offset commitment is not desired
if (sink == null || sink.getPartitionCount() == 0) {
return new HashMap<>();
}
sink.commit(offsets);
Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
// it's ok to just log the error since commit can retry
try {
offsets.forEach(
(topicPartition, offsetAndMetadata) -> {
long offSet = sink.getOffset(topicPartition);
if (offSet != 0) {
committedOffsets.put(topicPartition, new OffsetAndMetadata(offSet));
}
});
} catch (Exception e) {
return new HashMap<>();
}
LOG.info("Returning committed offsets {}", committedOffsets);
return committedOffsets;
}