func()

in balance_strategy.go [172:271]


func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
	// track partition movements during generation of the partition assignment plan
	s.movements = partitionMovements{
		Movements:                 make(map[topicPartitionAssignment]consumerPair),
		PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
	}

	// prepopulate the current assignment state from userdata on the consumer group members
	currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
	if err != nil {
		return nil, err
	}

	// determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
	isFreshAssignment := len(currentAssignment) == 0

	// create a mapping of all current topic partitions and the consumers that can be assigned to them
	partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
	for topic, partitions := range topics {
		for _, partition := range partitions {
			partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
		}
	}

	// create a mapping of all consumers to all potential topic partitions that can be assigned to them
	// also, populate the mapping of partitions to potential consumers
	consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
	for memberID, meta := range members {
		consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
		for _, topicSubscription := range meta.Topics {
			// only evaluate topic subscriptions that are present in the supplied topics map
			if _, found := topics[topicSubscription]; found {
				for _, partition := range topics[topicSubscription] {
					topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
					consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
					partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
				}
			}
		}

		// add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
		if _, exists := currentAssignment[memberID]; !exists {
			currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
		}
	}

	// create a mapping of each partition to its current consumer, where possible
	currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
	unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
	for partition := range partition2AllPotentialConsumers {
		unvisitedPartitions[partition] = true
	}
	var unassignedPartitions []topicPartitionAssignment
	for memberID, partitions := range currentAssignment {
		var keepPartitions []topicPartitionAssignment
		for _, partition := range partitions {
			// If this partition no longer exists at all, likely due to the
			// topic being deleted, we remove the partition from the member.
			if _, exists := partition2AllPotentialConsumers[partition]; !exists {
				continue
			}
			delete(unvisitedPartitions, partition)
			currentPartitionConsumers[partition] = memberID

			if !strsContains(members[memberID].Topics, partition.Topic) {
				unassignedPartitions = append(unassignedPartitions, partition)
				continue
			}
			keepPartitions = append(keepPartitions, partition)
		}
		currentAssignment[memberID] = keepPartitions
	}
	for unvisited := range unvisitedPartitions {
		unassignedPartitions = append(unassignedPartitions, unvisited)
	}

	// sort the topic partitions in order of priority for reassignment
	sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)

	// at this point we have preserved all valid topic partition to consumer assignments and removed
	// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
	// to consumers so that the topic partition assignments are as balanced as possible.

	// an ascending sorted set of consumers based on how many topic partitions are already assigned to them
	sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
	s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)

	// Assemble plan
	plan := make(BalanceStrategyPlan, len(currentAssignment))
	for memberID, assignments := range currentAssignment {
		if len(assignments) == 0 {
			plan[memberID] = make(map[string][]int32)
		} else {
			for _, assignment := range assignments {
				plan.Add(memberID, assignment.Topic, assignment.Partition)
			}
		}
	}
	return plan, nil
}