in src/main/java/org/apache/flink/connector/rocketmq/sink/InnerProducerImpl.java [179:225]
public CompletableFuture<SendResult> sendMessageInTransaction(Message message) {
return CompletableFuture.supplyAsync(
() -> {
try {
message.setTopic(
NamespaceUtil.wrapNamespace(
producer.getNamespace(), message.getTopic()));
// Ignore DelayTimeLevel parameter
if (message.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(
message, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
// In general, message id and transaction id should be the same
long transactionTimeout =
configuration.get(RocketMQSinkOptions.TRANSACTION_TIMEOUT);
message.putUserProperty(
MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS,
String.valueOf(transactionTimeout));
MessageAccessor.putProperty(
message,
MessageConst.PROPERTY_TRANSACTION_PREPARED,
Boolean.TRUE.toString().toLowerCase());
MessageAccessor.putProperty(
message, MessageConst.PROPERTY_PRODUCER_GROUP, this.groupId);
SendResult sendResult = producer.send(message);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
LOG.info(
"Send transaction message successfully, topic={}, transId={}",
message.getTopic(),
sendResult.getTransactionId());
} else {
LOG.warn(
"Failed to send message, topic={}, message={}",
message.getTopic(),
message);
}
return sendResult;
} catch (Exception e) {
LOG.error("Failed to send message, topic={}", message.getTopic(), e);
throw new RuntimeException(e);
}
},
MoreExecutors.directExecutor());
}