in balance_strategy.go [172:271]
func (s *stickyBalanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
// track partition movements during generation of the partition assignment plan
s.movements = partitionMovements{
Movements: make(map[topicPartitionAssignment]consumerPair),
PartitionMovementsByTopic: make(map[string]map[consumerPair]map[topicPartitionAssignment]bool),
}
// prepopulate the current assignment state from userdata on the consumer group members
currentAssignment, prevAssignment, err := prepopulateCurrentAssignments(members)
if err != nil {
return nil, err
}
// determine if we're dealing with a completely fresh assignment, or if there's existing assignment state
isFreshAssignment := len(currentAssignment) == 0
// create a mapping of all current topic partitions and the consumers that can be assigned to them
partition2AllPotentialConsumers := make(map[topicPartitionAssignment][]string)
for topic, partitions := range topics {
for _, partition := range partitions {
partition2AllPotentialConsumers[topicPartitionAssignment{Topic: topic, Partition: partition}] = []string{}
}
}
// create a mapping of all consumers to all potential topic partitions that can be assigned to them
// also, populate the mapping of partitions to potential consumers
consumer2AllPotentialPartitions := make(map[string][]topicPartitionAssignment, len(members))
for memberID, meta := range members {
consumer2AllPotentialPartitions[memberID] = make([]topicPartitionAssignment, 0)
for _, topicSubscription := range meta.Topics {
// only evaluate topic subscriptions that are present in the supplied topics map
if _, found := topics[topicSubscription]; found {
for _, partition := range topics[topicSubscription] {
topicPartition := topicPartitionAssignment{Topic: topicSubscription, Partition: partition}
consumer2AllPotentialPartitions[memberID] = append(consumer2AllPotentialPartitions[memberID], topicPartition)
partition2AllPotentialConsumers[topicPartition] = append(partition2AllPotentialConsumers[topicPartition], memberID)
}
}
}
// add this consumer to currentAssignment (with an empty topic partition assignment) if it does not already exist
if _, exists := currentAssignment[memberID]; !exists {
currentAssignment[memberID] = make([]topicPartitionAssignment, 0)
}
}
// create a mapping of each partition to its current consumer, where possible
currentPartitionConsumers := make(map[topicPartitionAssignment]string, len(currentAssignment))
unvisitedPartitions := make(map[topicPartitionAssignment]bool, len(partition2AllPotentialConsumers))
for partition := range partition2AllPotentialConsumers {
unvisitedPartitions[partition] = true
}
var unassignedPartitions []topicPartitionAssignment
for memberID, partitions := range currentAssignment {
var keepPartitions []topicPartitionAssignment
for _, partition := range partitions {
// If this partition no longer exists at all, likely due to the
// topic being deleted, we remove the partition from the member.
if _, exists := partition2AllPotentialConsumers[partition]; !exists {
continue
}
delete(unvisitedPartitions, partition)
currentPartitionConsumers[partition] = memberID
if !strsContains(members[memberID].Topics, partition.Topic) {
unassignedPartitions = append(unassignedPartitions, partition)
continue
}
keepPartitions = append(keepPartitions, partition)
}
currentAssignment[memberID] = keepPartitions
}
for unvisited := range unvisitedPartitions {
unassignedPartitions = append(unassignedPartitions, unvisited)
}
// sort the topic partitions in order of priority for reassignment
sortedPartitions := sortPartitions(currentAssignment, prevAssignment, isFreshAssignment, partition2AllPotentialConsumers, consumer2AllPotentialPartitions)
// at this point we have preserved all valid topic partition to consumer assignments and removed
// all invalid topic partitions and invalid consumers. Now we need to assign unassignedPartitions
// to consumers so that the topic partition assignments are as balanced as possible.
// an ascending sorted set of consumers based on how many topic partitions are already assigned to them
sortedCurrentSubscriptions := sortMemberIDsByPartitionAssignments(currentAssignment)
s.balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumers)
// Assemble plan
plan := make(BalanceStrategyPlan, len(currentAssignment))
for memberID, assignments := range currentAssignment {
if len(assignments) == 0 {
plan[memberID] = make(map[string][]int32)
} else {
for _, assignment := range assignments {
plan.Add(memberID, assignment.Topic, assignment.Partition)
}
}
}
return plan, nil
}