public void endTransaction()

in src/main/java/org/apache/flink/connector/rocketmq/sink/InnerProducerImpl.java [227:271]


    public void endTransaction(
            final SendCommittable sendCommittable, final TransactionResult transactionResult) {

        try {
            final String brokerName =
                    this.mqClientInstance.getBrokerNameFromMessageQueue(
                            producer.queueWithNamespace(sendCommittable.getMessageQueue()));
            final String brokerAddress =
                    this.mqClientInstance.findBrokerAddressInPublish(brokerName);

            EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
            requestHeader.setTransactionId(sendCommittable.getTransactionId());
            requestHeader.setCommitLogOffset(sendCommittable.getMessageOffset());
            requestHeader.setBname(brokerName);
            requestHeader.setProducerGroup(this.groupId);
            requestHeader.setTranStateTableOffset(sendCommittable.getQueueOffset());
            requestHeader.setFromTransactionCheck(true);
            requestHeader.setMsgId(sendCommittable.getMsgId());

            switch (transactionResult) {
                case COMMIT:
                    requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                    break;
                case ROLLBACK:
                    requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                    break;
                case UNKNOWN:
                    requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                    break;
                default:
                    break;
            }

            if (sendCommittable.getMessageOffset() != -1L) {
                this.endTransaction(
                        brokerAddress, requestHeader, "", this.producer.getSendMsgTimeout());
            } else {
                LOG.error(
                        "Convert message physical offset error, msgId={}",
                        sendCommittable.getMsgId());
            }
        } catch (Exception e) {
            LOG.error("Try end transaction error", e);
        }
    }