in src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java [121:138]
private void updateBatchRecordsForRecord(SinkRecord record,
Map<String, BatchRecords> batchRecordsMap, String region) {
BatchRecords batchRecords = batchRecordsMap.get(region);
if (batchRecords == null) {
batchRecords = new BatchRecords();
batchRecordsMap.put(region, batchRecords);
}
if (record.key() != null) {
if (record.value() == null && nullValuesMeansRemove) {
batchRecords.addRemoveOperation(record);
} else {
batchRecords.addUpdateOperation(record, nullValuesMeansRemove);
}
} else {
// Invest in configurable auto key generator?
logger.warn("Unable to push to Geode, missing key in record : " + record.value());
}
}