public void write()

in src/main/java/com/aliyun/odps/kafka/KafkaWriter.java [43:61]


  public void write(SinkRecord message) {
    ProducerRecord<String, String> producerRecord;
    if (message.timestamp() == null || message.timestamp() == RecordBatch.NO_TIMESTAMP) {
      producerRecord = new ProducerRecord<>(topic, null,
                                            (String) (message.key()), (String) (message.value()));
    } else {
      producerRecord = new ProducerRecord<>(topic, null, message.timestamp(),
                                            (String) (message.key()), (String) (message.value()));
    }

    this.producer.send(producerRecord, (metadata, exception) -> {
      if (exception != null) {
        LOGGER.error("Could not produce message to runtime error queue. topic=" + topic, exception);
      }

      LOGGER.info("Write message to runtime error queue ok: " + metadata.toString());
    });

  }