in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [535:635]
public synchronized void updateCurrentStatus() {
_lock.lock();
try {
long currTimeMs = System.currentTimeMillis();
if (currTimeMs - lastUpdateTimeMs < _conf.getUpdateStatusCoolDownMs()) {
LOGGER.info("Only {} ms since last updateCurrentStatus, wait for next one",
currTimeMs - lastUpdateTimeMs);
return;
}
LOGGER.info("Trying to run controller updateCurrentStatus");
_workerHelixManager.updateCurrentStatus();
// Map<InstanceName, InstanceTopicPartitionHolder>
Map<String, InstanceTopicPartitionHolder> instanceMap = new HashMap<>();
// Map<TopicName, Map<Pipeline, Instance>>
Map<String, Map<String, InstanceTopicPartitionHolder>> currTopicToPipelineInstanceMap = new HashMap<>();
// Map<Pipeline, PriorityQueue<Instance>>
Map<String, PriorityQueue<InstanceTopicPartitionHolder>> currPipelineToInstanceMap = new HashMap<>();
// Set<InstanceName>
List<String> currAvailableControllerList = new ArrayList<>();
Map<TopicPartition, List<String>> workerRouteToInstanceMap = _workerHelixManager
.getWorkerRouteToInstanceMap();
// Map<Instance, Set<Pipeline>> from IdealState
Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap = HelixUtils
.getInstanceToTopicPartitionsMap(_helixManager,
_kafkaValidationManager.getClusterToObserverMap());
List<String> liveInstances = HelixUtils.liveInstances(_helixManager);
currAvailableControllerList.addAll(liveInstances);
int assignedControllerCount = 0;
for (String instanceId : instanceToTopicPartitionsMap.keySet()) {
Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceId);
// TODO: one instance suppose to have only one route
for (TopicPartition tp : topicPartitions) {
String topicName = tp.getTopic();
if (topicName.startsWith(SEPARATOR)) {
currPipelineToInstanceMap.putIfAbsent(topicName, new PriorityQueue<>(1,
InstanceTopicPartitionHolder.totalWorkloadComparator(_controllerWorkloadSnapshot.getPipelineWorkloadMap())));
InstanceTopicPartitionHolder itph = new InstanceTopicPartitionHolder(instanceId, tp);
if (workerRouteToInstanceMap.get(tp) != null) {
itph.addWorkers(workerRouteToInstanceMap.get(tp));
}
currPipelineToInstanceMap.get(topicName).add(itph);
instanceMap.put(instanceId, itph);
currAvailableControllerList.remove(instanceId);
assignedControllerCount++;
}
}
for (TopicPartition tp : topicPartitions) {
String topicName = tp.getTopic();
if (!topicName.startsWith(SEPARATOR)) {
if (instanceMap.containsKey(instanceId)) {
instanceMap.get(instanceId).addTopicPartition(tp);
currTopicToPipelineInstanceMap.putIfAbsent(topicName, new ConcurrentHashMap<>());
currTopicToPipelineInstanceMap.get(tp.getTopic())
.put(getPipelineFromRoute(tp.getPipeline()),
instanceMap.get(instanceId));
}
}
}
}
Map<String, HostAndPort> pipelineHostInfoMap = new HashMap<>();
for (InstanceTopicPartitionHolder holder : instanceMap.values()) {
try {
pipelineHostInfoMap.put(holder.getRouteString(), getHostInfo(holder.getInstanceName()));
} catch (Exception e) {
LOGGER.warn("Failed to find hostInfo for instanceId {}", holder.getInstanceName());
}
}
_controllerWorkloadSnapshot.updatePipelineHostInfoMap(pipelineHostInfoMap);
_controllerWorkloadSnapshot.refreshWorkloadInfo();
_pipelineToInstanceMap = currPipelineToInstanceMap;
_topicToPipelineInstanceMap = currTopicToPipelineInstanceMap;
_availableControllerList = currAvailableControllerList;
if (_helixManager.isLeader()) {
_availableController.inc(_availableControllerList.size() - _availableController.getCount());
_availableWorker
.inc(_workerHelixManager.getAvailableWorkerList().size() - _availableWorker.getCount());
_assignedControllerCount.inc(assignedControllerCount - _assignedControllerCount.getCount());
}
// Validation
validateInstanceToTopicPartitionsMap(instanceToTopicPartitionsMap, instanceMap);
//LOGGER.info("For controller _pipelineToInstanceMap: {}", _pipelineToInstanceMap);
//LOGGER.info("For controller _topicToPipelineInstanceMap: {}", _topicToPipelineInstanceMap);
LOGGER.info("For controller {} available", _availableControllerList.size());
lastUpdateTimeMs = System.currentTimeMillis();
} catch (Exception e) {
LOGGER.error("Got exception in updateCurrentStatus", e);
} finally {
_lock.unlock();
}
}