in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [728:928]
public synchronized void handleLiveInstanceChange(boolean onlyCheckOffline, boolean forceBalance)
throws Exception {
_lock.lock();
try {
LOGGER.info("handleLiveInstanceChange, onlyCheckOffline : {}, forceBalance : {}",
onlyCheckOffline, forceBalance);
// Check if any controller in route is down
Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap = HelixUtils
.getInstanceToTopicPartitionsMap(_helixManager,
_kafkaValidationManager.getClusterToObserverMap());
List<String> liveInstances = HelixUtils.liveInstances(_helixManager);
List<String> instanceToReplace = new ArrayList<>();
boolean routeControllerDown = false;
// Check if any worker in route is down
boolean routeWorkerDown = false;
if (isAutoBalancingEnabled() || forceBalance) {
for (String instanceName : instanceToTopicPartitionsMap.keySet()) {
if (!liveInstances.contains(instanceName)) {
routeControllerDown = true;
instanceToReplace.add(instanceName);
}
}
LOGGER.info("Controller need to replace: {}", instanceToReplace);
// Make sure controller status is up-to-date
updateCurrentStatus();
// Happy scenario: instance contains route topic
for (String instance : instanceToReplace) {
Set<TopicPartition> tpOrRouteSet = instanceToTopicPartitionsMap.get(instance);
for (TopicPartition tpOrRoute : tpOrRouteSet) {
if (tpOrRoute.getTopic().startsWith(SEPARATOR)) {
String pipeline = tpOrRoute.getTopic();
int routeId = tpOrRoute.getPartition();
if (_availableControllerList.isEmpty()) {
LOGGER
.warn("no available controller to process the route {}@{}", pipeline, routeId);
break;
}
String newInstanceName = _availableControllerList.remove(0);
LOGGER.info("Controller {} in route {}@{} will be replaced by {}", instance, pipeline,
routeId,
newInstanceName);
InstanceTopicPartitionHolder newInstance = new InstanceTopicPartitionHolder(
newInstanceName, tpOrRoute);
List<TopicPartition> tpToReassign = new ArrayList<>();
PriorityQueue<InstanceTopicPartitionHolder> itphList = _pipelineToInstanceMap
.get(pipeline);
for (InstanceTopicPartitionHolder itph : itphList) {
if (itph.getInstanceName().equals(instance)) {
tpToReassign.addAll(itph.getServingTopicPartitionSet());
// TODO: is it possible to have different route on same host?
break;
}
}
// Helix doesn't guarantee the order of execution, so we have to wait for new controller to be online
// before reassigning topics
// But this might cause long rebalance time
_helixAdmin.setResourceIdealState(_helixClusterName, pipeline,
IdealStateBuilder
.resetCustomIdealStateFor(
_helixAdmin.getResourceIdealState(_helixClusterName, pipeline),
pipeline, String.valueOf(routeId), newInstanceName));
long ts1 = System.currentTimeMillis();
while (!isControllerOnline(newInstanceName, pipeline, String.valueOf(routeId))) {
if (System.currentTimeMillis() - ts1 > 30000) {
break;
}
try {
// Based on testing, the wait time is usually in the order of 100 ms
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long ts2 = System.currentTimeMillis();
LOGGER.info("Controller {} in route {}@{} is replaced by {}, it took {} ms",
instance, pipeline, routeId, newInstanceName, ts2 - ts1);
for (TopicPartition tp : tpToReassign) {
_helixAdmin.setResourceIdealState(_helixClusterName, tp.getTopic(),
IdealStateBuilder
.resetCustomIdealStateFor(
_helixAdmin.getResourceIdealState(_helixClusterName, tp.getTopic()),
tp.getTopic(), pipeline + SEPARATOR + routeId, newInstanceName));
}
LOGGER.info(
"Controller {} in route {}@{} is replaced by {}, topics are reassigned, it took {} ms",
instance, pipeline, routeId, newInstanceName, System.currentTimeMillis() - ts2);
break;
}
}
}
// Failure scenario: instance doesn't contain route topic
// e.g. route and the topic in that route are not assigned to the same host
// In this case, assume the instance of the route is correct and reassign the topic to that host
for (String instance : instanceToTopicPartitionsMap.keySet()) {
Set<TopicPartition> topicPartitionSet = instanceToTopicPartitionsMap.get(instance);
if (topicPartitionSet.isEmpty()) {
continue;
}
boolean foundRoute = false;
for (TopicPartition tp : topicPartitionSet) {
if (tp.getTopic().startsWith(SEPARATOR)) {
foundRoute = true;
break;
}
}
if (!foundRoute) {
routeControllerDown = true;
String instanceForRoute = null;
// Find the host for its route
String route = topicPartitionSet.iterator().next().getPipeline();
for (String pipeline : _pipelineToInstanceMap.keySet()) {
if (pipeline.equals(getPipelineFromRoute(route))) {
for (InstanceTopicPartitionHolder itph : _pipelineToInstanceMap.get(pipeline)) {
if (itph.getRouteString().equals(route)) {
instanceForRoute = itph.getInstanceName();
break;
}
}
}
}
LOGGER.info("Need to reassign: {} from {} to {}", topicPartitionSet, instance,
instanceForRoute);
for (TopicPartition tp : topicPartitionSet) {
_helixAdmin.setResourceIdealState(_helixClusterName, tp.getTopic(),
IdealStateBuilder
.resetCustomIdealStateFor(
_helixAdmin.getResourceIdealState(_helixClusterName, tp.getTopic()),
tp.getTopic(), route, instanceForRoute));
}
}
}
if (routeControllerDown) {
updateCurrentStatus();
}
HelixManager workeManager = _workerHelixManager.getHelixManager();
Map<String, Set<TopicPartition>> workerInstanceToTopicPartitionsMap = HelixUtils
.getInstanceToTopicPartitionsMap(workeManager, null);
List<String> workerLiveInstances = HelixUtils.liveInstances(workeManager);
Map<String, List<String>> workerPipelineToRouteIdToReplace = new HashMap<>();
List<String> workerToReplace = new ArrayList<>();
for (String instanceName : workerInstanceToTopicPartitionsMap.keySet()) {
if (!workerLiveInstances.contains(instanceName)) {
routeWorkerDown = true;
TopicPartition route = workerInstanceToTopicPartitionsMap.get(instanceName).iterator()
.next();
workerPipelineToRouteIdToReplace.putIfAbsent(route.getTopic(), new ArrayList<>());
workerPipelineToRouteIdToReplace.get(route.getTopic())
.add(String.valueOf(route.getPartition()));
workerToReplace.add(instanceName);
LOGGER.info("Worker changed: {} for {}", instanceName, route);
}
}
if (!routeWorkerDown) {
LOGGER.info("No worker in route is changed, do nothing!");
} else {
LOGGER.info("Worker need to replace: {}, {}", workerToReplace,
workerPipelineToRouteIdToReplace);
// Make sure worker status is up-to-date
if (!routeControllerDown) {
updateCurrentStatus();
}
_workerHelixManager
.replaceWorkerInMirrorMaker(workerPipelineToRouteIdToReplace, workerToReplace);
updateCurrentStatus();
}
} else {
LOGGER.info("AutoBalancing is disabled, do nothing");
}
if (onlyCheckOffline) {
return;
}
LOGGER.info("Start rebalancing current cluster");
// Haven't run updateCurrentStatus() before
if (!routeControllerDown && !routeWorkerDown) {
updateCurrentStatus();
}
if (isAutoScalingEnabled()) {
scaleCurrentCluster();
} else {
LOGGER.info("AutoScaling is disabled, do nothing");
}
} finally {
_lock.unlock();
}
}