in transaction_manager.go [619:691]
func (t *transactionManager) endTxn(commit bool) error {
attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
exec := func(run func() (bool, error), err error) error {
for attemptsRemaining >= 0 {
var retry bool
retry, err = run()
if !retry {
return err
}
backoff := t.computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return err
}
return exec(func() (bool, error) {
coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
if err != nil {
return true, err
}
request := &EndTxnRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
TransactionResult: commit,
}
if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
// Version 2 adds the support for new error code PRODUCER_FENCED.
request.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
response, err := coordinator.EndTxn(request)
if err != nil {
// Always retry on network error
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
return true, err
}
if response == nil {
return true, ErrTxnUnableToParseResponse
}
if response.Err == ErrNoError {
DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n",
t.transactionalID, response)
return false, t.completeTransaction()
}
switch response.Err {
// Need to refresh coordinator
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
fallthrough
case ErrOffsetsLoadInProgress:
fallthrough
case ErrConcurrentTransactions:
// Just retry
case ErrUnknownProducerID:
fallthrough
case ErrInvalidProducerIDMapping:
return false, t.abortableErrorIfPossible(response.Err)
// Fatal errors
default:
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
}
return true, response.Err
}, nil)
}