public synchronized void addWorkersToMirrorMaker()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/WorkerHelixManager.java [208:254]


  public synchronized void addWorkersToMirrorMaker(InstanceTopicPartitionHolder controller, String pipeline,
      int routeId, int numWorkersToAdd) throws Exception {
    LOGGER.info("Trying to add {} workers to route: {}@{}", numWorkersToAdd, pipeline, routeId);
    _lock.lock();
    try {
      if (_availableWorkerList.size() == 0) {
          LOGGER.warn("No available worker!");
          return;
      }
      List<String> instances = new ArrayList<>();
      for (int i = 0; i < numWorkersToAdd && i < _availableWorkerList.size(); i++) {
        instances.add(_availableWorkerList.get(i));
      }

      LOGGER.info("Add {} instance to route {}: {}", instances.size(), pipeline + SEPARATOR + routeId, instances);
      if (_helixAdmin.getResourceIdealState(_helixClusterName, pipeline) == null) {
        // this can happen when manager-controller idealstate is finished but manager-worker idealstate not exist
        if (!isPipelineExisted(pipeline)) {
          setEmptyResourceConfig(pipeline);
          _helixAdmin.addResource(_helixClusterName, pipeline,
              IdealStateBuilder.buildCustomIdealStateFor(pipeline, String.valueOf(routeId), instances));
        } else {
          _helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
              IdealStateBuilder.expandCustomIdealStateFor(_helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
                  pipeline, String.valueOf(routeId), instances, _conf.getMaxNumWorkersPerRoute()));
        }
      } else if (_helixAdmin.getResourceIdealState(_helixClusterName, pipeline).getPartitionSet().contains(String.valueOf(routeId))) {
        LOGGER.info("Topic {} Partition {} exists", pipeline, routeId);
        _helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
            IdealStateBuilder.expandInstanceCustomIdealStateFor(_helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
                    pipeline, String.valueOf(routeId), instances, _conf.getMaxNumWorkersPerRoute()));
      } else {
        LOGGER.info("Topic {} Partition {} does not exist", pipeline, routeId);
        _helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
            IdealStateBuilder.expandCustomIdealStateFor(_helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
                pipeline, String.valueOf(routeId), instances, _conf.getMaxNumWorkersPerRoute()));
      }

      TopicPartition route = new TopicPartition(pipeline, routeId);
      _routeToInstanceMap.putIfAbsent(route, new ArrayList<>());
      _routeToInstanceMap.get(route).addAll(instances);
      _availableWorkerList.removeAll(instances);
      controller.addWorkers(instances);
    } finally {
      _lock.unlock();
    }
  }