in src/main/java/com/aliyun/odps/kafka/KafkaWriter.java [43:61]
public void write(SinkRecord message) {
ProducerRecord<String, String> producerRecord;
if (message.timestamp() == null || message.timestamp() == RecordBatch.NO_TIMESTAMP) {
producerRecord = new ProducerRecord<>(topic, null,
(String) (message.key()), (String) (message.value()));
} else {
producerRecord = new ProducerRecord<>(topic, null, message.timestamp(),
(String) (message.key()), (String) (message.value()));
}
this.producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
LOGGER.error("Could not produce message to runtime error queue. topic=" + topic, exception);
}
LOGGER.info("Write message to runtime error queue ok: " + metadata.toString());
});
}