gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java [109:160]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.producer = producer;
    this.commonConfig = new KafkaWriterCommonConfig(config);
  }

  @Override
  public void close()
      throws IOException {
    log.debug("Close called");
    this.producer.close();
  }

  @Override
  public Future<WriteResponse> write(final V record, final WriteCallback callback) {
    try {
      Pair<K, V> keyValuePair = KafkaWriterHelper.getKeyValuePair(record, this.commonConfig);
      return write(keyValuePair, callback);
    } catch (Exception e) {
      throw new RuntimeException("Failed to create a Kafka write request", e);
    }
  }

  public Future<WriteResponse> write(Pair<K, V> keyValuePair, final WriteCallback callback) {
    try {
      return new WriteResponseFuture<>(this.producer
          .send(new ProducerRecord<>(topic, keyValuePair.getKey(), keyValuePair.getValue()), new Callback() {
            @Override
            public void onCompletion(final RecordMetadata metadata, Exception exception) {
              if (exception != null) {
                callback.onFailure(exception);
              } else {
                callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
              }
            }
          }), WRITE_RESPONSE_WRAPPER);
    } catch (Exception e) {
      throw new RuntimeException("Failed to create a Kafka write request", e);
    }
  }

  @Override
  public void flush()
      throws IOException {
	  this.producer.flush();
  }

  private void provisionTopic(String topicName, Config config) {
    String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
    if (!config.hasPath(zooKeeperPropKey)) {
      log.debug("Topic " + topicName + " is configured without the partition and replication");
      return;
    }
    String zookeeperConnect = config.getString(zooKeeperPropKey);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1DataWriter.java [99:150]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    this.producer = producer;
    this.commonConfig = new KafkaWriterCommonConfig(config);
  }

  @Override
  public void close()
      throws IOException {
    log.debug("Close called");
    this.producer.close();
  }

  @Override
  public Future<WriteResponse> write(final V record, final WriteCallback callback) {
    try {
      Pair<K, V> keyValuePair = KafkaWriterHelper.getKeyValuePair(record, this.commonConfig);
      return write(keyValuePair, callback);
    } catch (Exception e) {
      throw new RuntimeException("Failed to create a Kafka write request", e);
    }
  }

  public Future<WriteResponse> write(Pair<K, V> keyValuePair, final WriteCallback callback) {
    try {
      return new WriteResponseFuture<>(this.producer
          .send(new ProducerRecord<>(topic, keyValuePair.getKey(), keyValuePair.getValue()), new Callback() {
            @Override
            public void onCompletion(final RecordMetadata metadata, Exception exception) {
              if (exception != null) {
                callback.onFailure(exception);
              } else {
                callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
              }
            }
          }), WRITE_RESPONSE_WRAPPER);
    } catch (Exception e) {
      throw new RuntimeException("Failed to create a Kafka write request", e);
    }
  }

  @Override
  public void flush()
      throws IOException {
    this.producer.flush();
  }

  private void provisionTopic(String topicName, Config config) {
    String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
    if (!config.hasPath(zooKeeperPropKey)) {
      log.debug("Topic " + topicName + " is configured without the partition and replication");
      return;
    }
    String zookeeperConnect = config.getString(zooKeeperPropKey);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



