in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java [461:484]
public void put(Collection<SinkRecord> records) {
SinkRecord lastRecord = null;
for (SinkRecord sinkRecord : records) {
lastRecord = sinkRecord;
TopicPartition tp = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
TopicPartitionWriter writer = writers.get(tp);
if (writer == null) {
NotFoundException e = new NotFoundException(String.format("Received a record without " +
"a mapped writer for topic:partition(%s:%d), dropping record.", tp.topic(), tp.partition()));
log.error("Error putting records: ", e);
throw e;
}
if (sinkRecord.value() == null) {
log.warn("Filtering null value (tombstone) records at offset {}, key {} and partition {} ",
sinkRecord.kafkaOffset(), sinkRecord.key(), sinkRecord.kafkaPartition());
} else {
writer.writeRecord(sinkRecord);
}
}
if (lastRecord != null) {
log.debug("Last record offset: {}", lastRecord.kafkaOffset());
}
}