gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java [45:81]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Slf4j
public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
  private final String topic;
  private final KafkaProducer<K, V> producer;
  private final Closer closer;

  public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
    this.closer = Closer.create();

    this.topic = topic;

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 3);
    //To guarantee ordered delivery, the maximum in flight requests must be set to 1.
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

    // add the kafka scoped config. if any of the above are specified then they are overridden
    if (kafkaConfig.isPresent()) {
      props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
    }

    this.producer = createProducer(props);
  }

  public KafkaKeyValueProducerPusher(String brokers, String topic) {
    this(brokers, topic, Optional.absent());
  }

  /**
   * Push all keyed messages to the Kafka topic.
   * @param messages List of keyed messages to push to Kakfa.
   */
  public void pushMessages(List<Pair<K, V>> messages) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java [43:80]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Slf4j
public class KafkaKeyValueProducerPusher<K, V> implements Pusher<Pair<K, V>> {
  private final String topic;
  private final KafkaProducer<K, V> producer;
  private final Closer closer;

  public KafkaKeyValueProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
    this.closer = Closer.create();

    this.topic = topic;

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 3);
    //To guarantee ordered delivery, the maximum in flight requests must be set to 1.
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

    // add the kafka scoped config. if any of the above are specified then they are overridden
    if (kafkaConfig.isPresent()) {
      props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
    }

    this.producer = createProducer(props);
  }

  public KafkaKeyValueProducerPusher(String brokers, String topic) {
    this(brokers, topic, Optional.absent());
  }

  /**
   * Push all keyed messages to the Kafka topic.
   *
   * @param messages List of keyed messages to push to Kakfa.
   */
  public void pushMessages(List<Pair<K, V>> messages) {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



