private void handleRouteAssignmentOnline()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/ManagerControllerHelix.java [107:186]


  private void handleRouteAssignmentOnline(String srcCluster, String dstCluster, String routePartition) {
    if (_currentControllerInstance != null) {
      if (!(srcCluster.equals(_currentSrcCluster) && dstCluster.equals(_currentDstCluster)
          && routePartition.equals(_currentRoutePartition))) {
        String msg = String.format(
            "Invalid route partition assignment. Current route src=%s, dst=%s, partition=%s; new route src=%s, dst=%s, partition=%s",
            _currentSrcCluster, _currentDstCluster, _currentRoutePartition, srcCluster, dstCluster, routePartition);
        LOGGER.error(msg);
        throw new IllegalArgumentException(msg);
      } else {
        if (_currentControllerInstance.isStarted()) {
          LOGGER.info("Controller has already been started");
        } else {
          String msg = "Controller has already been initiated but not started yet";
          LOGGER.error(msg);
          throw new IllegalStateException(msg);
        }
      }
      return;
    }

    // validate src and dst clusters in configuration
    if (srcCluster.equals(dstCluster)) {
      String msg = String.format("The source cluster %s cannot be the same as destination cluster", srcCluster);
      LOGGER.error(msg);
      throw new IllegalArgumentException(msg);
    }
    if (!_controllerConf.getSourceClusters().contains(srcCluster)) {
      String msg = String.format("The cluster %s is not a valid source cluster", srcCluster);
      LOGGER.error(msg);
      throw new IllegalArgumentException(msg);
    }
    if (!_controllerConf.getDestinationClusters().contains(dstCluster)) {
      String msg = String.format("The cluster %s is not a valid destination cluster", dstCluster);
      LOGGER.error(msg);
      throw new IllegalArgumentException(msg);
    }

    // set corresponding zkpath for src and dst clusters
    String srcKafkaZkPath = (String) _controllerConf.getProperty(CONFIG_KAFKA_CLUSTER_KEY_PREFIX + srcCluster);
    if (srcKafkaZkPath == null) {
      String msg = "Failed to find configuration of ZooKeeper path for source cluster " + srcCluster;
      LOGGER.error(msg);
      throw new IllegalArgumentException(msg);
    }
    _controllerConf.setSrcKafkaZkPath(srcKafkaZkPath);
    String destKafkaZkPath = (String) _controllerConf.getProperty(CONFIG_KAFKA_CLUSTER_KEY_PREFIX + dstCluster);
    if (destKafkaZkPath == null) {
      String msg = "Failed to find configuration of ZooKeeper path for destination cluster " + dstCluster;
      LOGGER.error(msg);
      throw new IllegalArgumentException(msg);
    }
    _controllerConf.setDestKafkaZkPath(destKafkaZkPath);

    String clusterName = CONTROLLER_WORKER_HELIX_PREFIX + srcCluster + "-" + dstCluster + "-" + routePartition;
    _controllerConf.setHelixClusterName(clusterName);
    _controllerConf.setEnableSrcKafkaValidation("true");
    _controllerConf.setGroupId("ureplicator-" + srcCluster + "-" + dstCluster);
    _controllerConf.setSourceCluster(srcCluster);
    _controllerConf.setDestinationCluster(dstCluster);
    
    _currentControllerInstance = new ControllerInstance(this, _controllerConf);
    LOGGER.info("Starting controller instance for route {}", clusterName);
    try {
      _currentControllerInstance.start();
    } catch (Exception e) {
      String msg = "Failed to start controller instance. Roll back.";
      LOGGER.error(msg, e);
      if (_currentControllerInstance.stop()) {
        _currentControllerInstance = null;
      } else {
        LOGGER.error("Failed to stop the controller instance.");
      }
      throw new RuntimeException(msg);
    }
    _currentSrcCluster = srcCluster;
    _currentDstCluster = dstCluster;
    _currentRoutePartition = routePartition;
    LOGGER.info("Successfully started controller instance for route {}", clusterName);
  }