func()

in transaction_manager.go [619:691]


func (t *transactionManager) endTxn(commit bool) error {
	attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
	exec := func(run func() (bool, error), err error) error {
		for attemptsRemaining >= 0 {
			var retry bool
			retry, err = run()
			if !retry {
				return err
			}
			backoff := t.computeBackoff(attemptsRemaining)
			Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
				t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
			time.Sleep(backoff)
			attemptsRemaining--
		}
		return err
	}
	return exec(func() (bool, error) {
		coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
		if err != nil {
			return true, err
		}
		request := &EndTxnRequest{
			TransactionalID:   t.transactionalID,
			ProducerEpoch:     t.producerEpoch,
			ProducerID:        t.producerID,
			TransactionResult: commit,
		}
		if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
			// Version 2 adds the support for new error code PRODUCER_FENCED.
			request.Version = 2
		} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
			// Version 1 is the same as version 0.
			request.Version = 1
		}
		response, err := coordinator.EndTxn(request)
		if err != nil {
			// Always retry on network error
			_ = coordinator.Close()
			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
			return true, err
		}
		if response == nil {
			return true, ErrTxnUnableToParseResponse
		}
		if response.Err == ErrNoError {
			DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n",
				t.transactionalID, response)
			return false, t.completeTransaction()
		}
		switch response.Err {
		// Need to refresh coordinator
		case ErrConsumerCoordinatorNotAvailable:
			fallthrough
		case ErrNotCoordinatorForConsumer:
			_ = coordinator.Close()
			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
			fallthrough
		case ErrOffsetsLoadInProgress:
			fallthrough
		case ErrConcurrentTransactions:
			// Just retry
		case ErrUnknownProducerID:
			fallthrough
		case ErrInvalidProducerIDMapping:
			return false, t.abortableErrorIfPossible(response.Err)
		// Fatal errors
		default:
			return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
		}
		return true, response.Err
	}, nil)
}