in src/main/java/org/apache/flink/connector/rocketmq/sink/writer/RocketMQWriter.java [84:106]
public void write(IN element, Context context) throws IOException {
try {
Message message =
serializationSchema.serialize(
element, rocketmqSinkContext, System.currentTimeMillis());
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
producer.sendMessageInTransaction(message)
.whenComplete(
(sendResult, throwable) -> {
sendResultMap.put(sendResult.getTransactionId(), sendResult);
});
} else {
producer.send(message)
.whenComplete(
(sendResult, throwable) -> {
sendResultMap.put(sendResult.getTransactionId(), sendResult);
});
}
} catch (Exception e) {
LOG.error("Send message error", e);
throw new IOException(e);
}
}