public void scaleCurrentCluster()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [951:1096]


  public void scaleCurrentCluster() throws Exception {
    int oldTotalNumWorker = 0;
    int newTotalNumWorker = 0;
    Map<String, Integer> _routeWorkerOverrides = getRouteWorkerOverride();
    for (String pipeline : _pipelineToInstanceMap.keySet()) {
      LOGGER.info("Start rescale pipeline: {}", pipeline);
      PriorityQueue<InstanceTopicPartitionHolder> newItphQueue = new PriorityQueue<>(1,
          InstanceTopicPartitionHolder.totalWorkloadComparator(_controllerWorkloadSnapshot.getPipelineWorkloadMap()));
      // TODO: what if routeId is not continuous
      int nextRouteId = _pipelineToInstanceMap.get(pipeline).size();
      for (InstanceTopicPartitionHolder itph : _pipelineToInstanceMap.get(pipeline)) {
        if (itph.getTotalNumPartitions() > _maxNumPartitionsPerRoute) {
          LOGGER.info(
              "Checking route {} with controller {} and topics {} since it exceeds maxNumPartitionsPerRoute {}",
              itph.getRouteString(), itph.getInstanceName(), itph.getServingTopicPartitionSet(),
              _maxNumPartitionsPerRoute);
          while (itph.getTotalNumPartitions() > _maxNumPartitionsPerRoute) {
            // Only one topic left, do nothing
            if (itph.getNumServingTopicPartitions() == 1) {
              LOGGER.info("Only one topic {} in route {}, do nothing",
                  itph.getServingTopicPartitionSet().iterator().next(), itph.getRouteString());
              break;
            }

            // Get the topic with largest number of partitions
            TopicPartition tpToMove = new TopicPartition("tmp", -1);
            for (TopicPartition tp : itph.getServingTopicPartitionSet()) {
              if (tp.getPartition() > tpToMove.getPartition()) {
                tpToMove = tp;
              }
            }

            // If existing lightest route cannot fit the largest topic to move
            if (newItphQueue.isEmpty() ||
                newItphQueue.peek().getTotalNumPartitions() + tpToMove.getPartition()
                    > _initMaxNumPartitionsPerRoute) {
              try {
                InstanceTopicPartitionHolder newHolder = createNewRoute(pipeline, nextRouteId);

                _helixAdmin.setResourceIdealState(_helixClusterName, tpToMove.getTopic(),
                    IdealStateBuilder.resetCustomIdealStateFor(
                        _helixAdmin.getResourceIdealState(_helixClusterName, tpToMove.getTopic()),
                        tpToMove.getTopic(), itph.getRouteString(), newHolder.getRouteString(),
                        newHolder.getInstanceName()));

                itph.removeTopicPartition(tpToMove);
                newHolder.addTopicPartition(tpToMove);
                newItphQueue.add(newHolder);
                nextRouteId++;

              } catch (Exception e) {
                LOGGER.error("Got exception when create a new route when rebalancing, abandon!", e);
                throw new Exception(
                    "Got exception when create a new route when rebalancing, abandon!", e);
              }
            } else {
              InstanceTopicPartitionHolder newHolder = newItphQueue.poll();

              _helixAdmin.setResourceIdealState(_helixClusterName, tpToMove.getTopic(),
                  IdealStateBuilder.resetCustomIdealStateFor(
                      _helixAdmin.getResourceIdealState(_helixClusterName, tpToMove.getTopic()),
                      tpToMove.getTopic(), itph.getRouteString(), newHolder.getRouteString(),
                      newHolder.getInstanceName()));
              itph.removeTopicPartition(tpToMove);
              newHolder.addTopicPartition(tpToMove);
              newItphQueue.add(newHolder);
            }
          }
        }
        newItphQueue.add(itph);
      }

      // After moving topics, scale workers based on workload
      int rescaleFailedCount = 0;
      for (InstanceTopicPartitionHolder itph : newItphQueue) {
        oldTotalNumWorker += itph.getWorkerSet().size();
        String routeString = itph.getRouteString();
        int initWorkerCount = _initMaxNumWorkersPerRoute;
        if (_routeWorkerOverrides.containsKey(routeString)
            && _routeWorkerOverrides.get(routeString) > initWorkerCount) {
          initWorkerCount = _routeWorkerOverrides.get(routeString);
        }

        try {
          boolean isFailed = _controllerWorkloadSnapshot.getFailedPipelines().contains(itph.getRouteString());
          if (isFailed) {
            LOGGER.error(String.format("Get workload error on route %s. No change on number of workers", itph.getInstanceName(), itph.getRouteString()));
            continue;
          }
          ControllerWorkloadInfo workloadInfo = _controllerWorkloadSnapshot.getPipelineWorkloadMap().get(itph.getRouteString());

          if (workloadInfo == null || !workloadInfo.isAutoBalancingEnabled()) {
            LOGGER.warn("Skip scaling worker for route {} because of auto balancing is disabled on controller", routeString);
            continue;
          }
          if (workloadInfo != null && workloadInfo.getNumOfExpectedWorkers() != 0) {
            int expectedNumWorkers = workloadInfo.getNumOfExpectedWorkers();
            LOGGER.info("Current {} workers in route {}, expect {} workers",
                itph.getWorkerSet().size(), itph.getRouteString(), expectedNumWorkers);
            int actualExpectedNumWorkers = getActualExpectedNumWorkers(expectedNumWorkers,
                initWorkerCount);
            LOGGER.info("Current {} workers in route {}, actual expect {} workers",
                itph.getWorkerSet().size(), itph.getRouteString(), actualExpectedNumWorkers);

            if (actualExpectedNumWorkers > itph.getWorkerSet().size()) {
              LOGGER
                  .info("Current {} workers in route {}, actual expect {} workers, add {} workers",
                      itph.getWorkerSet().size(), itph.getRouteString(), actualExpectedNumWorkers,
                      actualExpectedNumWorkers - itph.getWorkerSet().size());
              _workerHelixManager.addWorkersToMirrorMaker(itph, itph.getRoute().getTopic(),
                  itph.getRoute().getPartition(),
                  actualExpectedNumWorkers - itph.getWorkerSet().size());
            }

            if (actualExpectedNumWorkers < itph.getWorkerSet().size()) {
              //_numOfWorkersBatchSize is the smallest unit for scaling worker in a safe way
              LOGGER.info(
                  "Current {} workers in route {}, actual expect {} workers, remove {} workers",
                  itph.getWorkerSet().size(), itph.getRouteString(), actualExpectedNumWorkers,
                  _numOfWorkersBatchSize);
              _workerHelixManager.removeWorkersToMirrorMaker(itph, itph.getRoute().getTopic(),
                  itph.getRoute().getPartition(), _numOfWorkersBatchSize);
            }
            newTotalNumWorker += actualExpectedNumWorkers;
          } else {
            LOGGER
                .warn("Get workload route: {} returns 0. No change on number of workers",
                    itph.getRouteString());
            newTotalNumWorker += itph.getWorkerSet().size();
            rescaleFailedCount++;
          }
        } catch (Exception e) {
          rescaleFailedCount++;
          LOGGER.error(String.format(
              "Get workload error when connecting to %s for route %s. No change on number of workers",
              itph.getInstanceName(), itph.getRouteString()), e);
          newTotalNumWorker += itph.getWorkerSet().size();
          rescaleFailedCount++;
        }
      }
      _pipelineToInstanceMap.put(pipeline, newItphQueue);
      _rescaleFailedCount.inc(rescaleFailedCount - _rescaleFailedCount.getCount());
    }
    LOGGER
        .info("oldTotalNumWorker: {}, newTotalNumWorker: {}", oldTotalNumWorker, newTotalNumWorker);
  }