in uReplicator-Worker/src/main/java/com/uber/stream/ureplicator/worker/helix/ManagerWorkerHelixHandler.java [148:201]
private synchronized void handleRouteAssignmentOnline(Message message) {
String[] clustersInfo = WorkerUtils.parseSrcDstCluster(message.getResourceName());
if (clustersInfo == null || clustersInfo.length != 3) {
LOGGER.info("Invalid clusters: {}", clustersInfo);
return;
}
String srcCluster = clustersInfo[1];
String dstCluster = clustersInfo[2];
String routeId = message.getPartitionName();
if (!validateRequest(srcCluster, dstCluster, routeId)) {
throw new IllegalArgumentException(String.format(
"Invalid handleRouteAssignmentOnline request, srcCluster: %s, dstCluster: %s, routeId: %s",
srcCluster, dstCluster, routeId));
}
if (workerInstance.isRunning()) {
if (StringUtils.isEmpty(currentSrcCluster) || StringUtils.isEmpty(currentDstCluster)) {
LOGGER.error("Previous worker instance failed to shutdown, attempt to shutdown again ");
// force clean shutdown previous instance before startup it again
workerInstance.cleanShutdown(true);
} else {
LOGGER.warn("Instance already online. srcCluster: {}, dstCluster: {}, routeId: {}",
srcCluster, dstCluster, routeId);
return;
}
}
// double check previous helix handler to make sure it has been shutdown
if (controllerWorkerHelixHandler != null) {
controllerWorkerHelixHandler.shutdown();
controllerWorkerHelixHandler = null;
}
LOGGER.info("Handling resource online {}", message.getResourceName());
String routeName = String.format("%s-%s-%s", srcCluster, dstCluster, routeId);
String helixCluster = WorkerUtils.getControllerWorkerHelixClusterName(routeName);
LOGGER.info("Join controller-worker cluster {}", helixCluster);
controllerWorkerHelixHandler = new ControllerWorkerHelixHandler(helixProps,
helixCluster, instanceId, srcCluster, dstCluster, routeId, federatedDeploymentName, workerInstance);
try {
controllerWorkerHelixHandler.start();
} catch (Exception e) {
LOGGER.error("Start controllerWorkerHelixHandler failed", e);
controllerWorkerHelixHandler.shutdown();
controllerWorkerHelixHandler = null;
// helix will mark this as ERROR and this can be captured by manager's validation job
throw new RuntimeException(e);
}
currentSrcCluster = srcCluster;
currentDstCluster = dstCluster;
currentRouteId = routeId;
LOGGER.info("Handling resource online {} finished", message.getResourceName());
}