func()

in transaction_manager.go [297:475]


func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupId string) (topicPartitionOffsets, error) {
	// First AddOffsetsToTxn
	attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
	exec := func(run func() (bool, error), err error) error {
		for attemptsRemaining >= 0 {
			var retry bool
			retry, err = run()
			if !retry {
				return err
			}
			backoff := t.computeBackoff(attemptsRemaining)
			Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
				t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
			time.Sleep(backoff)
			attemptsRemaining--
		}
		return err
	}
	lastError := exec(func() (bool, error) {
		coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
		if err != nil {
			return true, err
		}
		request := &AddOffsetsToTxnRequest{
			TransactionalID: t.transactionalID,
			ProducerEpoch:   t.producerEpoch,
			ProducerID:      t.producerID,
			GroupID:         groupId,
		}
		if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
			// Version 2 adds the support for new error code PRODUCER_FENCED.
			request.Version = 2
		} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
			// Version 1 is the same as version 0.
			request.Version = 1
		}
		response, err := coordinator.AddOffsetsToTxn(request)
		if err != nil {
			// If an error occurred try to refresh current transaction coordinator.
			_ = coordinator.Close()
			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
			return true, err
		}
		if response == nil {
			// If no response is returned just retry.
			return true, ErrTxnUnableToParseResponse
		}
		if response.Err == ErrNoError {
			DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n",
				t.transactionalID, groupId, response)
			// If no error, just exit.
			return false, nil
		}
		switch response.Err {
		case ErrConsumerCoordinatorNotAvailable:
			fallthrough
		case ErrNotCoordinatorForConsumer:
			_ = coordinator.Close()
			_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
			fallthrough
		case ErrOffsetsLoadInProgress:
			fallthrough
		case ErrConcurrentTransactions:
			// Retry
		case ErrUnknownProducerID:
			fallthrough
		case ErrInvalidProducerIDMapping:
			return false, t.abortableErrorIfPossible(response.Err)
		case ErrGroupAuthorizationFailed:
			return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
		default:
			// Others are fatal
			return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
		}
		return true, response.Err
	}, nil)

	if lastError != nil {
		return offsets, lastError
	}

	resultOffsets := offsets
	// Then TxnOffsetCommit
	// note the result is not completed until the TxnOffsetCommit returns
	attemptsRemaining = t.client.Config().Producer.Transaction.Retry.Max
	execTxnOffsetCommit := func(run func() (topicPartitionOffsets, bool, error), err error) (topicPartitionOffsets, error) {
		var r topicPartitionOffsets
		for attemptsRemaining >= 0 {
			var retry bool
			r, retry, err = run()
			if !retry {
				return r, err
			}
			backoff := t.computeBackoff(attemptsRemaining)
			Logger.Printf("txnmgr/txn-offset-commit [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
				t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
			time.Sleep(backoff)
			attemptsRemaining--
		}
		return r, err
	}
	return execTxnOffsetCommit(func() (topicPartitionOffsets, bool, error) {
		consumerGroupCoordinator, err := t.client.Coordinator(groupId)
		if err != nil {
			return resultOffsets, true, err
		}
		request := &TxnOffsetCommitRequest{
			TransactionalID: t.transactionalID,
			ProducerEpoch:   t.producerEpoch,
			ProducerID:      t.producerID,
			GroupID:         groupId,
			Topics:          offsets.mapToRequest(),
		}
		if t.client.Config().Version.IsAtLeast(V2_1_0_0) {
			// Version 2 adds the committed leader epoch.
			request.Version = 2
		} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
			// Version 1 is the same as version 0.
			request.Version = 1
		}
		responses, err := consumerGroupCoordinator.TxnOffsetCommit(request)
		if err != nil {
			_ = consumerGroupCoordinator.Close()
			_ = t.client.RefreshCoordinator(groupId)
			return resultOffsets, true, err
		}

		if responses == nil {
			return resultOffsets, true, ErrTxnUnableToParseResponse
		}

		var responseErrors []error
		failedTxn := topicPartitionOffsets{}
		for topic, partitionErrors := range responses.Topics {
			for _, partitionError := range partitionErrors {
				switch partitionError.Err {
				case ErrNoError:
					continue
				// If the topic is unknown or the coordinator is loading, retry with the current coordinator
				case ErrRequestTimedOut:
					fallthrough
				case ErrConsumerCoordinatorNotAvailable:
					fallthrough
				case ErrNotCoordinatorForConsumer:
					_ = consumerGroupCoordinator.Close()
					_ = t.client.RefreshCoordinator(groupId)
					fallthrough
				case ErrUnknownTopicOrPartition:
					fallthrough
				case ErrOffsetsLoadInProgress:
					// Do nothing just retry
				case ErrIllegalGeneration:
					fallthrough
				case ErrUnknownMemberId:
					fallthrough
				case ErrFencedInstancedId:
					fallthrough
				case ErrGroupAuthorizationFailed:
					return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, partitionError.Err)
				default:
					// Others are fatal
					return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, partitionError.Err)
				}
				tp := topicPartition{topic: topic, partition: partitionError.Partition}
				failedTxn[tp] = offsets[tp]
				responseErrors = append(responseErrors, partitionError.Err)
			}
		}

		resultOffsets = failedTxn

		if len(resultOffsets) == 0 {
			DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s\n",
				t.transactionalID, groupId)
			return resultOffsets, false, nil
		}
		return resultOffsets, true, Wrap(ErrTxnOffsetCommit, responseErrors...)
	}, nil)
}