in producer/producer.go [662:704]
func (tp *transactionProducer) checkTransactionState() {
for ch := range tp.producer.callbackCh {
switch callback := ch.(type) {
case *internal.CheckTransactionStateCallback:
localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
if uniqueKey == "" {
uniqueKey = callback.Msg.MsgId
}
transactionId := callback.Msg.GetProperty(primitive.PropertyTransactionID)
if transactionId == "" {
transactionId = callback.Header.TransactionId
}
if transactionId == "" {
transactionId = callback.Msg.TransactionId
}
header := &internal.EndTransactionRequestHeader{
CommitLogOffset: callback.Header.CommitLogOffset,
ProducerGroup: tp.producer.group,
TranStateTableOffset: callback.Header.TranStateTableOffset,
FromTransactionCheck: true,
MsgID: uniqueKey,
TransactionId: transactionId,
CommitOrRollback: tp.transactionState(localTransactionState),
}
req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
req.Remark = tp.errRemark(nil)
err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
tp.producer.options.SendMsgTimeout)
if err != nil {
rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{
"callback": callback.Addr.String(),
"request": req.String(),
rlog.LogKeyUnderlayError: err,
})
}
default:
rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
}
}
}