in transaction_manager.go [477:584]
func (t *transactionManager) initProducerId() (int64, int16, error) {
isEpochBump := false
req := &InitProducerIDRequest{}
if t.isTransactional() {
req.TransactionalID = &t.transactionalID
req.TransactionTimeout = t.transactionTimeout
}
if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
// Version 4 adds the support for new error code PRODUCER_FENCED.
req.Version = 4
} else {
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try
// to resume after an INVALID_PRODUCER_EPOCH error
req.Version = 3
}
isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
t.coordinatorSupportsBumpingEpoch = true
req.ProducerID = t.producerID
req.ProducerEpoch = t.producerEpoch
} else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
// Version 2 is the first flexible version.
req.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
req.Version = 1
}
if isEpochBump {
err := t.transitionTo(ProducerTxnFlagInitializing, nil)
if err != nil {
return -1, -1, err
}
DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n",
t.transactionalID)
} else {
DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n",
t.transactionalID, t.producerID, t.producerEpoch)
}
attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) {
pid := int64(-1)
pepoch := int16(-1)
for attemptsRemaining >= 0 {
var retry bool
pid, pepoch, retry, err = run()
if !retry {
return pid, pepoch, err
}
backoff := t.computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return -1, -1, err
}
return exec(func() (int64, int16, bool, error) {
var err error
var coordinator *Broker
if t.isTransactional() {
coordinator, err = t.client.TransactionCoordinator(t.transactionalID)
} else {
coordinator = t.client.LeastLoadedBroker()
}
if err != nil {
return -1, -1, true, err
}
response, err := coordinator.InitProducerID(req)
if err != nil {
if t.isTransactional() {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
}
return -1, -1, true, err
}
if response == nil {
return -1, -1, true, ErrTxnUnableToParseResponse
}
if response.Err == ErrNoError {
if isEpochBump {
t.sequenceNumbers = make(map[string]int32)
}
err := t.transitionTo(ProducerTxnFlagReady, nil)
if err != nil {
return -1, -1, true, err
}
DebugLogger.Printf("txnmgr/init-producer-id [%s] successful init producer id %+v\n",
t.transactionalID, response)
return response.ProducerID, response.ProducerEpoch, false, nil
}
switch response.Err {
// Retriable errors
case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
if t.isTransactional() {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
}
// Fatal errors
default:
return -1, -1, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
}
return -1, -1, true, response.Err
}, nil)
}