in transaction_manager.go [761:894]
func (t *transactionManager) publishTxnPartitions() error {
t.partitionInTxnLock.Lock()
defer t.partitionInTxnLock.Unlock()
if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
return t.lastError
}
if len(t.pendingPartitionsInCurrentTxn) == 0 {
return nil
}
// Remove the partitions from the pending set regardless of the result. We use the presence
// of partitions in the pending set to know when it is not safe to send batches. However, if
// the partitions failed to be added and we enter an error state, we expect the batches to be
// aborted anyway. In this case, we must be able to continue sending the batches which are in
// retry for partitions that were successfully added.
removeAllPartitionsOnFatalOrAbortedError := func() {
t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
}
// We only want to reduce the backoff when retrying the first AddPartition which errored out due to a
// CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and
// we don't want to wait too long before trying to start the new one.
//
// This is only a temporary fix, the long term solution is being tracked in
// https://issues.apache.org/jira/browse/KAFKA-5482
retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff
computeBackoff := func(attemptsRemaining int) time.Duration {
if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
maxRetries := t.client.Config().Producer.Transaction.Retry.Max
retries := maxRetries - attemptsRemaining
return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
}
return retryBackoff
}
attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
exec := func(run func() (bool, error), err error) error {
for attemptsRemaining >= 0 {
var retry bool
retry, err = run()
if !retry {
return err
}
backoff := computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return err
}
return exec(func() (bool, error) {
coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
if err != nil {
return true, err
}
request := &AddPartitionsToTxnRequest{
TransactionalID: t.transactionalID,
ProducerID: t.producerID,
ProducerEpoch: t.producerEpoch,
TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(),
}
if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
// Version 2 adds the support for new error code PRODUCER_FENCED.
request.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
addPartResponse, err := coordinator.AddPartitionsToTxn(request)
if err != nil {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
return true, err
}
if addPartResponse == nil {
return true, ErrTxnUnableToParseResponse
}
// remove from the list partitions that have been successfully updated
var responseErrors []error
for topic, results := range addPartResponse.Errors {
for _, response := range results {
tp := topicPartition{topic: topic, partition: response.Partition}
switch response.Err {
case ErrNoError:
// Mark partition as added to transaction
t.partitionsInCurrentTxn[tp] = struct{}{}
delete(t.pendingPartitionsInCurrentTxn, tp)
continue
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
fallthrough
case ErrUnknownTopicOrPartition:
fallthrough
case ErrOffsetsLoadInProgress:
// Retry topicPartition
case ErrConcurrentTransactions:
if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff {
retryBackoff = addPartitionsRetryBackoff
}
case ErrOperationNotAttempted:
fallthrough
case ErrTopicAuthorizationFailed:
removeAllPartitionsOnFatalOrAbortedError()
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
case ErrUnknownProducerID:
fallthrough
case ErrInvalidProducerIDMapping:
removeAllPartitionsOnFatalOrAbortedError()
return false, t.abortableErrorIfPossible(response.Err)
// Fatal errors
default:
removeAllPartitionsOnFatalOrAbortedError()
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
}
responseErrors = append(responseErrors, response.Err)
}
}
// handle end
if len(t.pendingPartitionsInCurrentTxn) == 0 {
DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
t.transactionalID, addPartResponse)
return false, nil
}
return true, Wrap(ErrAddPartitionsToTxn, responseErrors...)
}, nil)
}