private synchronized void handleRouteAssignmentOnline()

in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/helix/ManagerWorkerHelixHandler.java [148:201]


  private synchronized void handleRouteAssignmentOnline(Message message) {
    String[] clustersInfo = WorkerUtils.parseSrcDstCluster(message.getResourceName());
    if (clustersInfo == null || clustersInfo.length != 3) {
      LOGGER.info("Invalid clusters: {}", clustersInfo);
      return;
    }
    String srcCluster = clustersInfo[1];
    String dstCluster = clustersInfo[2];
    String routeId = message.getPartitionName();

    if (!validateRequest(srcCluster, dstCluster, routeId)) {
      throw new IllegalArgumentException(String.format(
          "Invalid handleRouteAssignmentOnline request, srcCluster: %s, dstCluster: %s, routeId: %s",
          srcCluster, dstCluster, routeId));
    }

    if (workerInstance.isRunning()) {
      if (StringUtils.isEmpty(currentSrcCluster) || StringUtils.isEmpty(currentDstCluster)) {
        LOGGER.error("Previous worker instance failed to shutdown, attempt to shutdown again ");
        // force clean shutdown previous instance before startup it again
        workerInstance.cleanShutdown(true);
      } else {
        LOGGER.warn("Instance already online. srcCluster: {}, dstCluster: {}, routeId: {}",
            srcCluster, dstCluster, routeId);
        return;
      }
    }
    // double check previous helix handler to make sure it has been shutdown
    if (controllerWorkerHelixHandler != null) {
      controllerWorkerHelixHandler.shutdown();
      controllerWorkerHelixHandler = null;
    }

    LOGGER.info("Handling resource online {}", message.getResourceName());
    String routeName = String.format("%s-%s-%s", srcCluster, dstCluster, routeId);
    String helixCluster = WorkerUtils.getControllerWorkerHelixClusterName(routeName);
    LOGGER.info("Join controller-worker cluster {}", helixCluster);

    controllerWorkerHelixHandler = new ControllerWorkerHelixHandler(helixProps,
        helixCluster, instanceId, srcCluster, dstCluster, routeId, federatedDeploymentName, workerInstance);
    try {
      controllerWorkerHelixHandler.start();
    } catch (Exception e) {
      LOGGER.error("Start controllerWorkerHelixHandler failed", e);
      controllerWorkerHelixHandler.shutdown();
      controllerWorkerHelixHandler = null;
      // helix will mark this as ERROR and this can be captured by manager's validation job
      throw new RuntimeException(e);
    }
    currentSrcCluster = srcCluster;
    currentDstCluster = dstCluster;
    currentRouteId = routeId;
    LOGGER.info("Handling resource online {} finished", message.getResourceName());
  }