func()

in transaction_manager.go [477:584]


func (t *transactionManager) initProducerId() (int64, int16, error) {
	isEpochBump := false

	req := &InitProducerIDRequest{}
	if t.isTransactional() {
		req.TransactionalID = &t.transactionalID
		req.TransactionTimeout = t.transactionTimeout
	}

	if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
		if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
			// Version 4 adds the support for new error code PRODUCER_FENCED.
			req.Version = 4
		} else {
			// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try
			// to resume after an INVALID_PRODUCER_EPOCH error
			req.Version = 3
		}
		isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
		t.coordinatorSupportsBumpingEpoch = true
		req.ProducerID = t.producerID
		req.ProducerEpoch = t.producerEpoch
	} else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
		// Version 2 is the first flexible version.
		req.Version = 2
	} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
		// Version 1 is the same as version 0.
		req.Version = 1
	}

	if isEpochBump {
		err := t.transitionTo(ProducerTxnFlagInitializing, nil)
		if err != nil {
			return -1, -1, err
		}
		DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n",
			t.transactionalID)
	} else {
		DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n",
			t.transactionalID, t.producerID, t.producerEpoch)
	}

	attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
	exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) {
		pid := int64(-1)
		pepoch := int16(-1)
		for attemptsRemaining >= 0 {
			var retry bool
			pid, pepoch, retry, err = run()
			if !retry {
				return pid, pepoch, err
			}
			backoff := t.computeBackoff(attemptsRemaining)
			Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
				t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
			time.Sleep(backoff)
			attemptsRemaining--
		}
		return -1, -1, err
	}
	return exec(func() (int64, int16, bool, error) {
		var err error
		var coordinator *Broker
		if t.isTransactional() {
			coordinator, err = t.client.TransactionCoordinator(t.transactionalID)
		} else {
			coordinator = t.client.LeastLoadedBroker()
		}
		if err != nil {
			return -1, -1, true, err
		}
		response, err := coordinator.InitProducerID(req)
		if err != nil {
			if t.isTransactional() {
				_ = coordinator.Close()
				_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
			}
			return -1, -1, true, err
		}
		if response == nil {
			return -1, -1, true, ErrTxnUnableToParseResponse
		}
		if response.Err == ErrNoError {
			if isEpochBump {
				t.sequenceNumbers = make(map[string]int32)
			}
			err := t.transitionTo(ProducerTxnFlagReady, nil)
			if err != nil {
				return -1, -1, true, err
			}
			DebugLogger.Printf("txnmgr/init-producer-id [%s] successful init producer id %+v\n",
				t.transactionalID, response)
			return response.ProducerID, response.ProducerEpoch, false, nil
		}
		switch response.Err {
		// Retriable errors
		case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
			if t.isTransactional() {
				_ = coordinator.Close()
				_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
			}
		// Fatal errors
		default:
			return -1, -1, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
		}
		return -1, -1, true, response.Err
	}, nil)
}