func()

in consumer_group.go [268:443]


func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
	if ctx.Err() != nil {
		return nil, ctx.Err()
	}
	coordinator, err := c.client.Coordinator(c.groupID)
	if err != nil {
		if retries <= 0 {
			return nil, err
		}

		return c.retryNewSession(ctx, topics, handler, retries, true)
	}

	var (
		metricRegistry          = c.metricRegistry
		consumerGroupJoinTotal  metrics.Counter
		consumerGroupJoinFailed metrics.Counter
		consumerGroupSyncTotal  metrics.Counter
		consumerGroupSyncFailed metrics.Counter
	)

	if metricRegistry != nil {
		consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry)
		consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry)
		consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry)
		consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry)
	}

	// Join consumer group
	join, err := c.joinGroupRequest(coordinator, topics)
	if consumerGroupJoinTotal != nil {
		consumerGroupJoinTotal.Inc(1)
	}
	if err != nil {
		_ = coordinator.Close()
		if consumerGroupJoinFailed != nil {
			consumerGroupJoinFailed.Inc(1)
		}
		return nil, err
	}
	if !errors.Is(join.Err, ErrNoError) {
		if consumerGroupJoinFailed != nil {
			consumerGroupJoinFailed.Inc(1)
		}
	}
	switch join.Err {
	case ErrNoError:
		c.memberID = join.MemberId
	case ErrUnknownMemberId, ErrIllegalGeneration:
		// reset member ID and retry immediately
		c.memberID = ""
		return c.newSession(ctx, topics, handler, retries)
	case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
		// retry after backoff
		if retries <= 0 {
			return nil, join.Err
		}
		return c.retryNewSession(ctx, topics, handler, retries, true)
	case ErrMemberIdRequired:
		// from JoinGroupRequest v4 onwards (due to KIP-394) if the client starts
		// with an empty member id, it needs to get the assigned id from the
		// response and send another join request with that id to actually join the
		// group
		c.memberID = join.MemberId
		return c.newSession(ctx, topics, handler, retries)
	case ErrFencedInstancedId:
		if c.groupInstanceId != nil {
			Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
		}
		return nil, join.Err
	default:
		return nil, join.Err
	}

	var strategy BalanceStrategy
	var ok bool
	if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil {
		strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies)
		if !ok {
			// this case shouldn't happen in practice, since the leader will choose the protocol
			// that all the members support
			return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol)
		}
	}

	// Prepare distribution plan if we joined as the leader
	var plan BalanceStrategyPlan
	var members map[string]ConsumerGroupMemberMetadata
	var allSubscribedTopicPartitions map[string][]int32
	var allSubscribedTopics []string
	if join.LeaderId == join.MemberId {
		members, err = join.GetMembers()
		if err != nil {
			return nil, err
		}

		allSubscribedTopicPartitions, allSubscribedTopics, plan, err = c.balance(strategy, members)
		if err != nil {
			return nil, err
		}
	}

	// Sync consumer group
	syncGroupResponse, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId, strategy)
	if consumerGroupSyncTotal != nil {
		consumerGroupSyncTotal.Inc(1)
	}
	if err != nil {
		_ = coordinator.Close()
		if consumerGroupSyncFailed != nil {
			consumerGroupSyncFailed.Inc(1)
		}
		return nil, err
	}
	if !errors.Is(syncGroupResponse.Err, ErrNoError) {
		if consumerGroupSyncFailed != nil {
			consumerGroupSyncFailed.Inc(1)
		}
	}

	switch syncGroupResponse.Err {
	case ErrNoError:
	case ErrUnknownMemberId, ErrIllegalGeneration:
		// reset member ID and retry immediately
		c.memberID = ""
		return c.newSession(ctx, topics, handler, retries)
	case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
		// retry after backoff
		if retries <= 0 {
			return nil, syncGroupResponse.Err
		}
		return c.retryNewSession(ctx, topics, handler, retries, true)
	case ErrFencedInstancedId:
		if c.groupInstanceId != nil {
			Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
		}
		return nil, syncGroupResponse.Err
	default:
		return nil, syncGroupResponse.Err
	}

	// Retrieve and sort claims
	var claims map[string][]int32
	if len(syncGroupResponse.MemberAssignment) > 0 {
		members, err := syncGroupResponse.GetMemberAssignment()
		if err != nil {
			return nil, err
		}
		claims = members.Topics

		// in the case of stateful balance strategies, hold on to the returned
		// assignment metadata, otherwise, reset the statically defined consumer
		// group metadata
		if members.UserData != nil {
			c.userData = members.UserData
		} else {
			c.userData = c.config.Consumer.Group.Member.UserData
		}

		for _, partitions := range claims {
			sort.Sort(int32Slice(partitions))
		}
	}

	session, err := newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
	if err != nil {
		return nil, err
	}

	// only the leader needs to check whether there are newly-added partitions in order to trigger a rebalance
	if join.LeaderId == join.MemberId {
		go c.loopCheckPartitionNumbers(allSubscribedTopicPartitions, allSubscribedTopics, session)
	}

	return session, err
}