public InstanceTopicPartitionHolder createNewRoute()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [1110:1177]


  public InstanceTopicPartitionHolder createNewRoute(String pipeline, int routeId) throws Exception {
    if (_availableControllerList.isEmpty()) {
      LOGGER.info("No available controller!");
      throw new Exception("No available controller!");
    }

    if (_workerHelixManager.getAvailableWorkerList().isEmpty()) {
      LOGGER.info("No available worker!");
      throw new Exception("No available worker!");
    }

    String instanceName = _availableControllerList.get(0);
    InstanceTopicPartitionHolder instance = new InstanceTopicPartitionHolder(instanceName,
        new TopicPartition(pipeline, routeId));
    if (!isPipelineExisted(pipeline)) {
      setEmptyResourceConfig(pipeline);
      LOGGER
          .info("Create new pipeline {} partition {} to instance {}", pipeline, routeId, instance);
      _helixAdmin.addResource(_helixClusterName, pipeline,
          IdealStateBuilder.buildCustomIdealStateFor(pipeline, String.valueOf(routeId), instance));
    } else {
      LOGGER.info("Expanding pipeline {} new partition {} to instance {}", pipeline, routeId,
          instance);
      _helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
          IdealStateBuilder.expandCustomIdealStateFor(
              _helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
              pipeline, String.valueOf(routeId), instance));
      LOGGER.info("New IdealState: {}",
          _helixAdmin.getResourceIdealState(_helixClusterName, pipeline));
    }

    String[] srcDst = pipeline.split(SEPARATOR);
    String routeString = String.format("%s-%s-%s", srcDst[1], srcDst[2], routeId);
    String controllerWorkerHelixClusterName = "controller-worker-" + routeString;
    HelixManager spectator = HelixManagerFactory.getZKHelixManager(controllerWorkerHelixClusterName,
        _instanceId,
        InstanceType.SPECTATOR,
        _helixZkURL);

    long ts1 = System.currentTimeMillis();
    while (true) {
      try {
        spectator.connect();
        break;
      } catch (Exception e) {
        // Do nothing
      }

      if (System.currentTimeMillis() - ts1 > 60000) {
        throw new Exception(String.format("Controller %s failed to set up new route cluster %s!",
            instanceName, controllerWorkerHelixClusterName));
      }
      Thread.sleep(1000);
    }

    _availableControllerList.remove(instanceName);
    _pipelineToInstanceMap.put(pipeline, new PriorityQueue<>(1,
        InstanceTopicPartitionHolder.totalWorkloadComparator(_controllerWorkloadSnapshot.getPipelineWorkloadMap())));
    _pipelineToInstanceMap.get(pipeline).add(instance);
    _assignedControllerCount.inc();
    _workerHelixManager.addTopicToMirrorMaker(instance, pipeline, routeId);

    // register metrics
    maybeRegisterMetrics(routeString);

    spectator.disconnect();
    return instance;
  }