in transaction_manager.go [695:739]
func (t *transactionManager) finishTransaction(commit bool) error {
t.mutex.Lock()
defer t.mutex.Unlock()
// Ensure no error when committing or aborting
if commit && t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
return t.lastError
} else if !commit && t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
return t.lastError
}
// if no records has been sent don't do anything.
if len(t.partitionsInCurrentTxn) == 0 {
return t.completeTransaction()
}
epochBump := t.epochBumpRequired
// If we're aborting the transaction, so there should be no need to add offsets.
if commit && len(t.offsetsInCurrentTxn) > 0 {
for group, offsets := range t.offsetsInCurrentTxn {
newOffsets, err := t.publishOffsetsToTxn(offsets, group)
if err != nil {
t.offsetsInCurrentTxn[group] = newOffsets
return err
}
delete(t.offsetsInCurrentTxn, group)
}
}
if t.currentTxnStatus()&ProducerTxnFlagFatalError != 0 {
return t.lastError
}
if !errors.Is(t.lastError, ErrInvalidProducerIDMapping) {
err := t.endTxn(commit)
if err != nil {
return err
}
if !epochBump {
return nil
}
}
// reset pid and epoch if needed.
return t.initializeTransactions()
}