private List removeOverloadedParitions()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [642:753]


  private List<TopicPartition> removeOverloadedParitions(
      TreeSet<InstanceTopicPartitionHolder> orderedInstances,
      List<TopicPartition> partitionsToBeAssigned, Set<TopicPartition> pinnedPartitions,
      boolean forced,
      ITopicWorkloadWeighter weighter) {
    List<TopicPartition> overloaded = new ArrayList<>();
    WorkloadInfoRetriever retriever = _helixMirrorMakerManager.getWorkloadInfoRetriever();
    TopicWorkload totalWorkload = new TopicWorkload(0, 0, 0);
    boolean noPartitions = true;
    for (TopicPartition tp : partitionsToBeAssigned) {
      double weight = weighter.partitionWeight(tp);
      // weight == 0 means the partition is not considered for workload
      if (weight > 0) {
        TopicWorkload tw = retriever.topicWorkload(tp.getTopic());
        totalWorkload.add(weight * tw.getBytesPerSecondPerPartition(),
            weight * tw.getMsgsPerSecondPerPartition());
        noPartitions = false;
      }
    }

    // instancesPartitionsCount count how many partitions are considered for workload for each instance
    Map<String, Integer> instancesPartitionsCount = new HashMap<>();
    for (InstanceTopicPartitionHolder instance : orderedInstances) {
      int partitionCount = 0;
      for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
        double weight = weighter.partitionWeight(tp);
        if (weight > 0) {
          partitionCount++;
          TopicWorkload tw = retriever.topicWorkload(tp.getTopic());
          totalWorkload.add(weight * tw.getBytesPerSecondPerPartition(),
              weight * tw.getMsgsPerSecondPerPartition());
          noPartitions = false;
        }
      }
      instancesPartitionsCount.put(instance.getInstanceName(), partitionCount);
    }
    if (noPartitions) {
      return overloaded;
    }

    TopicWorkload averageWorkload = new TopicWorkload(
        totalWorkload.getBytesPerSecond() / orderedInstances.size(),
        totalWorkload.getMsgsPerSecond() / orderedInstances.size());
    // adjust average by excluding the instances that have a single partition but exceeds the average workload
    // because the workload cannot be further divided to multiple workers
    int excludeInstances = 0;
    for (InstanceTopicPartitionHolder instance : orderedInstances) {
      if (instancesPartitionsCount.get(instance.getInstanceName()) == 1) {
        for (TopicPartition tp : instance.getServingTopicPartitionSet()) {
          if (weighter.partitionWeight(tp) > 0) {
            double weight = weighter.partitionWeight(tp);
            TopicWorkload tw = retriever.topicWorkload(tp.getTopic());
            if (tw.compareTotal(averageWorkload) > 0) {
              excludeInstances++;
              totalWorkload.setMsgsPerSecond(
                  totalWorkload.getMsgsPerSecond() - weight * tw.getMsgsPerSecondPerPartition());
              totalWorkload.setBytesPerSecond(
                  totalWorkload.getBytesPerSecond() - weight * tw.getBytesPerSecondPerPartition());
            }
            break;
          }
        }
      }
    }
    int numInstances = orderedInstances.size() - excludeInstances;
    if (numInstances > 0) {
      averageWorkload = new TopicWorkload(totalWorkload.getBytesPerSecond() / numInstances,
          totalWorkload.getMsgsPerSecond() / numInstances);
    }

    TopicWorkload maxWorkload = (forced && _overloadedRatioThreshold > 1.0) ? averageWorkload
        : new TopicWorkload(averageWorkload.getBytesPerSecond() * _overloadedRatioThreshold,
            averageWorkload.getMsgsPerSecond() * _overloadedRatioThreshold);

    List<InstanceTopicPartitionHolder> processedInstances = new ArrayList<>();
    while (!orderedInstances.isEmpty()) {
      InstanceTopicPartitionHolder highest = orderedInstances.pollLast();
      processedInstances.add(highest);
      if (instancesPartitionsCount.get(highest.getInstanceName()) <= 1) {
        // no need to rebalance to other worker because it is a single or no partition
        continue;
      }
      TopicWorkload workerWorkload = highest.totalWorkload(retriever, weighter);
      if (workerWorkload.compareTotal(maxWorkload) <= 0) {
        break;
      }
      TopicWorkload diff = new TopicWorkload(
          workerWorkload.getBytesPerSecond() - maxWorkload.getBytesPerSecond(),
          workerWorkload.getMsgsPerSecond() - maxWorkload.getMsgsPerSecond());
      TopicWorkload workloadToRemove = new TopicWorkload(0, 0, 0);
      List<TopicPartition> partitions = new ArrayList<>(highest.getServingTopicPartitionSet());
      Collections.shuffle(partitions); // choose random partitions
      for (TopicPartition tp : partitions) {
        double weight = weighter.partitionWeight(tp);
        if (weight == 0 || pinnedPartitions != null && pinnedPartitions.contains(tp)) {
          continue;
        }
        TopicWorkload tpWorkload = retriever.topicWorkload(tp.getTopic());
        workloadToRemove.add(weight * tpWorkload.getBytesPerSecondPerPartition(),
            weight * tpWorkload.getMsgsPerSecondPerPartition());
        highest.removeTopicPartition(tp);
        overloaded.add(tp);
        if (workloadToRemove.compareTotal(diff) >= 0) {
          break;
        }
      }
    }
    // re-sort the list after removing partitions
    orderedInstances.addAll(processedInstances);

    return overloaded;
  }