gobblin-modules/gobblin-kafka-1/src/main/java/org/apache/gobblin/kafka/writer/Kafka1DataWriter.java [120:136]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public Future<WriteResponse> write(Pair<K, V> keyValuePair, final WriteCallback callback) {
    try {
      return new WriteResponseFuture<>(this.producer
          .send(new ProducerRecord<>(topic, keyValuePair.getKey(), keyValuePair.getValue()), new Callback() {
            @Override
            public void onCompletion(final RecordMetadata metadata, Exception exception) {
              if (exception != null) {
                callback.onFailure(exception);
              } else {
                callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
              }
            }
          }), WRITE_RESPONSE_WRAPPER);
    } catch (Exception e) {
      throw new RuntimeException("Failed to create a Kafka write request", e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java [130:146]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public Future<WriteResponse> write(Pair<K, V> keyValuePair, final WriteCallback callback) {
    try {
      return new WriteResponseFuture<>(this.producer
          .send(new ProducerRecord<>(topic, keyValuePair.getKey(), keyValuePair.getValue()), new Callback() {
            @Override
            public void onCompletion(final RecordMetadata metadata, Exception exception) {
              if (exception != null) {
                callback.onFailure(exception);
              } else {
                callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
              }
            }
          }), WRITE_RESPONSE_WRAPPER);
    } catch (Exception e) {
      throw new RuntimeException("Failed to create a Kafka write request", e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/Kafka08DataWriter.java [127:143]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public Future<WriteResponse> write(Pair<K, V> keyValuePair, final WriteCallback callback) {
    try {
      return new WriteResponseFuture<>(this.producer
          .send(new ProducerRecord<>(topic, keyValuePair.getKey(), keyValuePair.getValue()), new Callback() {
            @Override
            public void onCompletion(final RecordMetadata metadata, Exception exception) {
              if (exception != null) {
                callback.onFailure(exception);
              } else {
                callback.onSuccess(WRITE_RESPONSE_WRAPPER.wrap(metadata));
              }
            }
          }), WRITE_RESPONSE_WRAPPER);
    } catch (Exception e) {
      throw new RuntimeException("Failed to create a Kafka write request", e);
    }
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



