public CompletableFuture sendMessageInTransaction()

in src/main/java/org/apache/flink/connector/rocketmq/sink/InnerProducerImpl.java [179:225]


    public CompletableFuture<SendResult> sendMessageInTransaction(Message message) {
        return CompletableFuture.supplyAsync(
                () -> {
                    try {
                        message.setTopic(
                                NamespaceUtil.wrapNamespace(
                                        producer.getNamespace(), message.getTopic()));

                        // Ignore DelayTimeLevel parameter
                        if (message.getDelayTimeLevel() != 0) {
                            MessageAccessor.clearProperty(
                                    message, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                        }

                        // In general, message id and transaction id should be the same
                        long transactionTimeout =
                                configuration.get(RocketMQSinkOptions.TRANSACTION_TIMEOUT);
                        message.putUserProperty(
                                MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS,
                                String.valueOf(transactionTimeout));
                        MessageAccessor.putProperty(
                                message,
                                MessageConst.PROPERTY_TRANSACTION_PREPARED,
                                Boolean.TRUE.toString().toLowerCase());
                        MessageAccessor.putProperty(
                                message, MessageConst.PROPERTY_PRODUCER_GROUP, this.groupId);

                        SendResult sendResult = producer.send(message);
                        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                            LOG.info(
                                    "Send transaction message successfully, topic={}, transId={}",
                                    message.getTopic(),
                                    sendResult.getTransactionId());
                        } else {
                            LOG.warn(
                                    "Failed to send message, topic={}, message={}",
                                    message.getTopic(),
                                    message);
                        }
                        return sendResult;
                    } catch (Exception e) {
                        LOG.error("Failed to send message, topic={}", message.getTopic(), e);
                        throw new RuntimeException(e);
                    }
                },
                MoreExecutors.directExecutor());
    }