func()

in transaction_manager.go [695:739]


func (t *transactionManager) finishTransaction(commit bool) error {
	t.mutex.Lock()
	defer t.mutex.Unlock()

	// Ensure no error when committing or aborting
	if commit && t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
		return t.lastError
	} else if !commit && t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
		return t.lastError
	}

	// if no records has been sent don't do anything.
	if len(t.partitionsInCurrentTxn) == 0 {
		return t.completeTransaction()
	}

	epochBump := t.epochBumpRequired
	// If we're aborting the transaction, so there should be no need to add offsets.
	if commit && len(t.offsetsInCurrentTxn) > 0 {
		for group, offsets := range t.offsetsInCurrentTxn {
			newOffsets, err := t.publishOffsetsToTxn(offsets, group)
			if err != nil {
				t.offsetsInCurrentTxn[group] = newOffsets
				return err
			}
			delete(t.offsetsInCurrentTxn, group)
		}
	}

	if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
		return t.lastError
	}

	if !errors.Is(t.lastError, ErrInvalidProducerIDMapping) {
		err := t.endTxn(commit)
		if err != nil {
			return err
		}
		if !epochBump {
			return nil
		}
	}
	// reset pid and epoch if needed.
	return t.initializeTransactions()
}