gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java [102:142]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (e != null) {
          log.error("Failed to send message to topic {} due to exception: ", topic, e);
        }
      }));
    }

    //Once the low watermark of numFuturesToBuffer is hit, start flushing messages from the futures
    // buffer. In order to avoid blocking on newest messages added to futures queue, we only invoke future.get() on
    // the oldest messages in the futures buffer. The number of messages to flush is same as the number of messages added
    // in the current call. Note this does not completely avoid calling future.get() on the newer messages e.g. when
    // multiple threads enter the if{} block concurrently, and invoke flush().
    if (this.futures.size() >= this.numFuturesToBuffer) {
      flush(messages.size());
    }
  }

  /**
   * Flush any records that may be present in the producer buffer upto a maximum of <code>numRecordsToFlush</code>.
   * This method is needed since Kafka 0.8 producer does not have a flush() API. In the absence of the flush()
   * implementation, records which are present in the buffer but not in-flight may not be delivered at all when close()
   * is called, leading to data loss.
   * @param numRecordsToFlush
   */
  private void flush(long numRecordsToFlush) {
    log.debug("Flushing records from producer buffer");
    Future future;
    long numRecordsFlushed = 0L;
    while (((future = futures.poll()) != null) && (numRecordsFlushed++ < numRecordsToFlush)) {
      try {
        future.get();
      } catch (Exception e) {
        log.error("Exception encountered when flushing record", e);
      }
    }
    log.debug("Flushed {} records from producer buffer", numRecordsFlushed);
  }

  @Override
  public void close()
      throws IOException {
    log.info("Flushing records before close");
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaProducerPusher.java [99:139]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        if (e != null) {
          log.error("Failed to send message to topic {} due to exception: ", topic, e);
        }
      }));
    }

    //Once the low watermark of numFuturesToBuffer is hit, start flushing messages from the futures
    // buffer. In order to avoid blocking on newest messages added to futures queue, we only invoke future.get() on
    // the oldest messages in the futures buffer. The number of messages to flush is same as the number of messages added
    // in the current call. Note this does not completely avoid calling future.get() on the newer messages e.g. when
    // multiple threads enter the if{} block concurrently, and invoke flush().
    if (this.futures.size() >= this.numFuturesToBuffer) {
      flush(messages.size());
    }
  }

  /**
   * Flush any records that may be present in the producer buffer upto a maximum of <code>numRecordsToFlush</code>.
   * This method is needed since Kafka 0.8 producer does not have a flush() API. In the absence of the flush()
   * implementation, records which are present in the buffer but not in-flight may not be delivered at all when close()
   * is called, leading to data loss.
   * @param numRecordsToFlush
   */
  private void flush(long numRecordsToFlush) {
    log.debug("Flushing records from producer buffer");
    Future future;
    long numRecordsFlushed = 0L;
    while (((future = futures.poll()) != null) && (numRecordsFlushed++ < numRecordsToFlush)) {
      try {
        future.get();
      } catch (Exception e) {
        log.error("Exception encountered when flushing record", e);
      }
    }
    log.debug("Flushed {} records from producer buffer", numRecordsFlushed);
  }

  @Override
  public void close()
      throws IOException {
    log.info("Flushing records before close");
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



