func()

in producer/producer.go [662:704]


func (tp *transactionProducer) checkTransactionState() {
	for ch := range tp.producer.callbackCh {
		switch callback := ch.(type) {
		case *internal.CheckTransactionStateCallback:
			localTransactionState := tp.listener.CheckLocalTransaction(callback.Msg)
			uniqueKey := callback.Msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
			if uniqueKey == "" {
				uniqueKey = callback.Msg.MsgId
			}
			transactionId := callback.Msg.GetProperty(primitive.PropertyTransactionID)
			if transactionId == "" {
				transactionId = callback.Header.TransactionId
			}
			if transactionId == "" {
				transactionId = callback.Msg.TransactionId
			}
			header := &internal.EndTransactionRequestHeader{
				CommitLogOffset:      callback.Header.CommitLogOffset,
				ProducerGroup:        tp.producer.group,
				TranStateTableOffset: callback.Header.TranStateTableOffset,
				FromTransactionCheck: true,
				MsgID:                uniqueKey,
				TransactionId:        transactionId,
				CommitOrRollback:     tp.transactionState(localTransactionState),
			}

			req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
			req.Remark = tp.errRemark(nil)

			err := tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req,
				tp.producer.options.SendMsgTimeout)
			if err != nil {
				rlog.Error("send ReqENDTransaction to broker error", map[string]interface{}{
					"callback":               callback.Addr.String(),
					"request":                req.String(),
					rlog.LogKeyUnderlayError: err,
				})
			}
		default:
			rlog.Error(fmt.Sprintf("unknown type %v", ch), nil)
		}
	}
}