in src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java [100:111]
void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsMap) {
// spin off a new thread to handle this operation? Downside is ordering and retries...
for (SinkRecord record : records) {
logger.debug("kafka coordinates:(Topic:"
+ record.topic() +
" Partition:" + record.kafkaPartition() + " Offset:" + record.kafkaOffset()
+ ")");
updateBatchForRegionByTopic(record, batchRecordsMap);
}
batchRecordsMap.forEach(
(region, batchRecords) -> batchRecords.executeOperations(regionNameToRegion.get(region)));
}