private void updateBatchRecordsForRecord()

in src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java [121:138]


  private void updateBatchRecordsForRecord(SinkRecord record,
      Map<String, BatchRecords> batchRecordsMap, String region) {
    BatchRecords batchRecords = batchRecordsMap.get(region);
    if (batchRecords == null) {
      batchRecords = new BatchRecords();
      batchRecordsMap.put(region, batchRecords);
    }
    if (record.key() != null) {
      if (record.value() == null && nullValuesMeansRemove) {
        batchRecords.addRemoveOperation(record);
      } else {
        batchRecords.addUpdateOperation(record, nullValuesMeansRemove);
      }
    } else {
      // Invest in configurable auto key generator?
      logger.warn("Unable to push to Geode, missing key in record : " + record.value());
    }
  }