in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [491:510]
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> offsets) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignment) {
if (writers.get(tp) == null) {
throw new ConnectException("Topic Partition not configured properly. " +
"verify your `topics` and `kusto.tables.topics.mapping` configurations");
}
Long lastCommittedOffset = writers.get(tp).lastCommittedOffset;
if (lastCommittedOffset != null) {
long offset = lastCommittedOffset + 1L;
log.debug("Forwarding to framework request to commit offset: {} for {} while the offset is {}", offset,
tp, offsets.get(tp));
offsetsToCommit.put(tp, new OffsetAndMetadata(offset));
}
}
return offsetsToCommit;
}