in src/main/java/org/apache/flink/connector/rocketmq/sink/InnerProducerImpl.java [227:271]
public void endTransaction(
final SendCommittable sendCommittable, final TransactionResult transactionResult) {
try {
final String brokerName =
this.mqClientInstance.getBrokerNameFromMessageQueue(
producer.queueWithNamespace(sendCommittable.getMessageQueue()));
final String brokerAddress =
this.mqClientInstance.findBrokerAddressInPublish(brokerName);
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(sendCommittable.getTransactionId());
requestHeader.setCommitLogOffset(sendCommittable.getMessageOffset());
requestHeader.setBname(brokerName);
requestHeader.setProducerGroup(this.groupId);
requestHeader.setTranStateTableOffset(sendCommittable.getQueueOffset());
requestHeader.setFromTransactionCheck(true);
requestHeader.setMsgId(sendCommittable.getMsgId());
switch (transactionResult) {
case COMMIT:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOWN:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
if (sendCommittable.getMessageOffset() != -1L) {
this.endTransaction(
brokerAddress, requestHeader, "", this.producer.getSendMsgTimeout());
} else {
LOG.error(
"Convert message physical offset error, msgId={}",
sendCommittable.getMsgId());
}
} catch (Exception e) {
LOG.error("Try end transaction error", e);
}
}