func()

in transaction_manager.go [761:894]


func (t *transactionManager) publishTxnPartitions() error {
	t.partitionInTxnLock.Lock()
	defer t.partitionInTxnLock.Unlock()

	if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
		return t.lastError
	}

	if len(t.pendingPartitionsInCurrentTxn) == 0 {
		return nil
	}

	// Remove the partitions from the pending set regardless of the result. We use the presence
	// of partitions in the pending set to know when it is not safe to send batches. However, if
	// the partitions failed to be added and we enter an error state, we expect the batches to be
	// aborted anyway. In this case, we must be able to continue sending the batches which are in
	// retry for partitions that were successfully added.
	removeAllPartitionsOnFatalOrAbortedError := func() {
		t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
	}

	// We only want to reduce the backoff when retrying the first AddPartition which errored out due to a
	// CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and
	// we don't want to wait too long before trying to start the new one.
	//
	// This is only a temporary fix, the long term solution is being tracked in
	// https://issues.apache.org/jira/browse/KAFKA-5482
	retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff
	computeBackoff := func(attemptsRemaining int) time.Duration {
		if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
			maxRetries := t.client.Config().Producer.Transaction.Retry.Max
			retries := maxRetries - attemptsRemaining
			return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
		}
		return retryBackoff
	}
	attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max

	exec := func(run func() (bool, error), err error) error {
		for attemptsRemaining >= 0 {
			var retry bool
			retry, err = run()
			if !retry {
				return err
			}
			backoff := computeBackoff(attemptsRemaining)
			Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err)
			time.Sleep(backoff)
			attemptsRemaining--
		}
		return err
	}
	return exec(func() (bool, error) {
		coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
		if err != nil {
			return true, err
		}
		request := &AddPartitionsToTxnRequest{
			TransactionalID: t.transactionalID,
			ProducerID:      t.producerID,
			ProducerEpoch:   t.producerEpoch,
			TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(),
		}
		if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
			// Version 2 adds the support for new error code PRODUCER_FENCED.
			request.Version = 2
		} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
			// Version 1 is the same as version 0.
			request.Version = 1
		}
		addPartResponse, err := coordinator.AddPartitionsToTxn(request)
		if err != nil {
			_ = coordinator.Close()
			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
			return true, err
		}

		if addPartResponse == nil {
			return true, ErrTxnUnableToParseResponse
		}

		// remove from the list partitions that have been successfully updated
		var responseErrors []error
		for topic, results := range addPartResponse.Errors {
			for _, response := range results {
				tp := topicPartition{topic: topic, partition: response.Partition}
				switch response.Err {
				case ErrNoError:
					// Mark partition as added to transaction
					t.partitionsInCurrentTxn[tp] = struct{}{}
					delete(t.pendingPartitionsInCurrentTxn, tp)
					continue
				case ErrConsumerCoordinatorNotAvailable:
					fallthrough
				case ErrNotCoordinatorForConsumer:
					_ = coordinator.Close()
					_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
					fallthrough
				case ErrUnknownTopicOrPartition:
					fallthrough
				case ErrOffsetsLoadInProgress:
					// Retry topicPartition
				case ErrConcurrentTransactions:
					if len(t.partitionsInCurrentTxn) == 0 && retryBackoff > addPartitionsRetryBackoff {
						retryBackoff = addPartitionsRetryBackoff
					}
				case ErrOperationNotAttempted:
					fallthrough
				case ErrTopicAuthorizationFailed:
					removeAllPartitionsOnFatalOrAbortedError()
					return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
				case ErrUnknownProducerID:
					fallthrough
				case ErrInvalidProducerIDMapping:
					removeAllPartitionsOnFatalOrAbortedError()
					return false, t.abortableErrorIfPossible(response.Err)
				// Fatal errors
				default:
					removeAllPartitionsOnFatalOrAbortedError()
					return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
				}
				responseErrors = append(responseErrors, response.Err)
			}
		}

		// handle end
		if len(t.pendingPartitionsInCurrentTxn) == 0 {
			DebugLogger.Printf("txnmgr/add-partition-to-txn [%s] successful to add partitions txn %+v\n",
				t.transactionalID, addPartResponse)
			return false, nil
		}
		return true, Wrap(ErrAddPartitionsToTxn, responseErrors...)
	}, nil)
}