private boolean assignPartitions()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [594:630]


  private boolean assignPartitions(TreeSet<InstanceTopicPartitionHolder> orderedInstances,
      List<TopicPartition> partitionsToBeAssigned) {
    if (orderedInstances.isEmpty() || partitionsToBeAssigned.isEmpty()) {
      return false;
    }
    // sort partitions based on workload in reverse order (high -> low)
    Collections.sort(partitionsToBeAssigned,
        Collections.reverseOrder(
            TopicPartition
                .getWorkloadComparator(_helixMirrorMakerManager.getWorkloadInfoRetriever())));
    // assign partitions of the same topic to different workers if possible

    List<TopicPartition> sameTopic = new ArrayList<>();
    List<InstanceTopicPartitionHolder> lowestInstances = new ArrayList<>();
    for (int i = 0; i < partitionsToBeAssigned.size(); ) {
      sameTopic.clear();
      lowestInstances.clear();

      TopicPartition tp = partitionsToBeAssigned.get(i);
      sameTopic.add(tp);
      i++;
      while (i < partitionsToBeAssigned.size() && partitionsToBeAssigned.get(i).getTopic()
          .equals(tp.getTopic())) {
        sameTopic.add(partitionsToBeAssigned.get(i));
        i++;
      }

      while (!orderedInstances.isEmpty() && lowestInstances.size() < sameTopic.size()) {
        lowestInstances.add(orderedInstances.pollFirst());
      }
      for (int j = 0; j < sameTopic.size(); j++) {
        lowestInstances.get(j % lowestInstances.size()).addTopicPartition(sameTopic.get(j));
      }
      orderedInstances.addAll(lowestInstances);
    }
    return true;
  }