in transaction_manager.go [297:475]
func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupId string) (topicPartitionOffsets, error) {
// First AddOffsetsToTxn
attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
exec := func(run func() (bool, error), err error) error {
for attemptsRemaining >= 0 {
var retry bool
retry, err = run()
if !retry {
return err
}
backoff := t.computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return err
}
lastError := exec(func() (bool, error) {
coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
if err != nil {
return true, err
}
request := &AddOffsetsToTxnRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
GroupID: groupId,
}
if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
// Version 2 adds the support for new error code PRODUCER_FENCED.
request.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
response, err := coordinator.AddOffsetsToTxn(request)
if err != nil {
// If an error occurred try to refresh current transaction coordinator.
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
return true, err
}
if response == nil {
// If no response is returned just retry.
return true, ErrTxnUnableToParseResponse
}
if response.Err == ErrNoError {
DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n",
t.transactionalID, groupId, response)
// If no error, just exit.
return false, nil
}
switch response.Err {
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
fallthrough
case ErrOffsetsLoadInProgress:
fallthrough
case ErrConcurrentTransactions:
// Retry
case ErrUnknownProducerID:
fallthrough
case ErrInvalidProducerIDMapping:
return false, t.abortableErrorIfPossible(response.Err)
case ErrGroupAuthorizationFailed:
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, response.Err)
default:
// Others are fatal
return false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, response.Err)
}
return true, response.Err
}, nil)
if lastError != nil {
return offsets, lastError
}
resultOffsets := offsets
// Then TxnOffsetCommit
// note the result is not completed until the TxnOffsetCommit returns
attemptsRemaining = t.client.Config().Producer.Transaction.Retry.Max
execTxnOffsetCommit := func(run func() (topicPartitionOffsets, bool, error), err error) (topicPartitionOffsets, error) {
var r topicPartitionOffsets
for attemptsRemaining >= 0 {
var retry bool
r, retry, err = run()
if !retry {
return r, err
}
backoff := t.computeBackoff(attemptsRemaining)
Logger.Printf("txnmgr/txn-offset-commit [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
time.Sleep(backoff)
attemptsRemaining--
}
return r, err
}
return execTxnOffsetCommit(func() (topicPartitionOffsets, bool, error) {
consumerGroupCoordinator, err := t.client.Coordinator(groupId)
if err != nil {
return resultOffsets, true, err
}
request := &TxnOffsetCommitRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
GroupID: groupId,
Topics: offsets.mapToRequest(),
}
if t.client.Config().Version.IsAtLeast(V2_1_0_0) {
// Version 2 adds the committed leader epoch.
request.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
responses, err := consumerGroupCoordinator.TxnOffsetCommit(request)
if err != nil {
_ = consumerGroupCoordinator.Close()
_ = t.client.RefreshCoordinator(groupId)
return resultOffsets, true, err
}
if responses == nil {
return resultOffsets, true, ErrTxnUnableToParseResponse
}
var responseErrors []error
failedTxn := topicPartitionOffsets{}
for topic, partitionErrors := range responses.Topics {
for _, partitionError := range partitionErrors {
switch partitionError.Err {
case ErrNoError:
continue
// If the topic is unknown or the coordinator is loading, retry with the current coordinator
case ErrRequestTimedOut:
fallthrough
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
_ = consumerGroupCoordinator.Close()
_ = t.client.RefreshCoordinator(groupId)
fallthrough
case ErrUnknownTopicOrPartition:
fallthrough
case ErrOffsetsLoadInProgress:
// Do nothing just retry
case ErrIllegalGeneration:
fallthrough
case ErrUnknownMemberId:
fallthrough
case ErrFencedInstancedId:
fallthrough
case ErrGroupAuthorizationFailed:
return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, partitionError.Err)
default:
// Others are fatal
return resultOffsets, false, t.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, partitionError.Err)
}
tp := topicPartition{topic: topic, partition: partitionError.Partition}
failedTxn[tp] = offsets[tp]
responseErrors = append(responseErrors, partitionError.Err)
}
}
resultOffsets = failedTxn
if len(resultOffsets) == 0 {
DebugLogger.Printf("txnmgr/txn-offset-commit [%s] successful txn-offset-commit with group %s\n",
t.transactionalID, groupId)
return resultOffsets, false, nil
}
return resultOffsets, true, Wrap(ErrTxnOffsetCommit, responseErrors...)
}, nil)
}