gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java [125:137]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void flush(long numRecordsToFlush) {
    log.debug("Flushing records from producer buffer");
    Future future;
    long numRecordsFlushed = 0L;
    while (((future = futures.poll()) != null) && (numRecordsFlushed++ < numRecordsToFlush)) {
      try {
        future.get();
      } catch (Exception e) {
        log.error("Exception encountered when flushing record", e);
      }
    }
    log.debug("Flushed {} records from producer buffer", numRecordsFlushed);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java [122:134]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private void flush(long numRecordsToFlush) {
    log.debug("Flushing records from producer buffer");
    Future future;
    long numRecordsFlushed = 0L;
    while (((future = futures.poll()) != null) && (numRecordsFlushed++ < numRecordsToFlush)) {
      try {
        future.get();
      } catch (Exception e) {
        log.error("Exception encountered when flushing record", e);
      }
    }
    log.debug("Flushed {} records from producer buffer", numRecordsFlushed);
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



