in consumer_group.go [268:443]
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
coordinator, err := c.client.Coordinator(c.groupID)
if err != nil {
if retries <= 0 {
return nil, err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
}
var (
metricRegistry = c.metricRegistry
consumerGroupJoinTotal metrics.Counter
consumerGroupJoinFailed metrics.Counter
consumerGroupSyncTotal metrics.Counter
consumerGroupSyncFailed metrics.Counter
)
if metricRegistry != nil {
consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry)
consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry)
consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry)
consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry)
}
// Join consumer group
join, err := c.joinGroupRequest(coordinator, topics)
if consumerGroupJoinTotal != nil {
consumerGroupJoinTotal.Inc(1)
}
if err != nil {
_ = coordinator.Close()
if consumerGroupJoinFailed != nil {
consumerGroupJoinFailed.Inc(1)
}
return nil, err
}
if !errors.Is(join.Err, ErrNoError) {
if consumerGroupJoinFailed != nil {
consumerGroupJoinFailed.Inc(1)
}
}
switch join.Err {
case ErrNoError:
c.memberID = join.MemberId
case ErrUnknownMemberId, ErrIllegalGeneration:
// reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
// retry after backoff
if retries <= 0 {
return nil, join.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrMemberIdRequired:
// from JoinGroupRequest v4 onwards (due to KIP-394) if the client starts
// with an empty member id, it needs to get the assigned id from the
// response and send another join request with that id to actually join the
// group
c.memberID = join.MemberId
return c.newSession(ctx, topics, handler, retries)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
}
return nil, join.Err
default:
return nil, join.Err
}
var strategy BalanceStrategy
var ok bool
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil {
strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies)
if !ok {
// this case shouldn't happen in practice, since the leader will choose the protocol
// that all the members support
return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol)
}
}
// Prepare distribution plan if we joined as the leader
var plan BalanceStrategyPlan
var members map[string]ConsumerGroupMemberMetadata
var allSubscribedTopicPartitions map[string][]int32
var allSubscribedTopics []string
if join.LeaderId == join.MemberId {
members, err = join.GetMembers()
if err != nil {
return nil, err
}
allSubscribedTopicPartitions, allSubscribedTopics, plan, err = c.balance(strategy, members)
if err != nil {
return nil, err
}
}
// Sync consumer group
syncGroupResponse, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId, strategy)
if consumerGroupSyncTotal != nil {
consumerGroupSyncTotal.Inc(1)
}
if err != nil {
_ = coordinator.Close()
if consumerGroupSyncFailed != nil {
consumerGroupSyncFailed.Inc(1)
}
return nil, err
}
if !errors.Is(syncGroupResponse.Err, ErrNoError) {
if consumerGroupSyncFailed != nil {
consumerGroupSyncFailed.Inc(1)
}
}
switch syncGroupResponse.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration:
// reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
// retry after backoff
if retries <= 0 {
return nil, syncGroupResponse.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
}
return nil, syncGroupResponse.Err
default:
return nil, syncGroupResponse.Err
}
// Retrieve and sort claims
var claims map[string][]int32
if len(syncGroupResponse.MemberAssignment) > 0 {
members, err := syncGroupResponse.GetMemberAssignment()
if err != nil {
return nil, err
}
claims = members.Topics
// in the case of stateful balance strategies, hold on to the returned
// assignment metadata, otherwise, reset the statically defined consumer
// group metadata
if members.UserData != nil {
c.userData = members.UserData
} else {
c.userData = c.config.Consumer.Group.Member.UserData
}
for _, partitions := range claims {
sort.Sort(int32Slice(partitions))
}
}
session, err := newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
if err != nil {
return nil, err
}
// only the leader needs to check whether there are newly-added partitions in order to trigger a rebalance
if join.LeaderId == join.MemberId {
go c.loopCheckPartitionNumbers(allSubscribedTopicPartitions, allSubscribedTopics, session)
}
return session, err
}