public synchronized void updateCurrentStatus()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [535:635]


  public synchronized void updateCurrentStatus() {
    _lock.lock();
    try {
      long currTimeMs = System.currentTimeMillis();
      if (currTimeMs - lastUpdateTimeMs < _conf.getUpdateStatusCoolDownMs()) {
        LOGGER.info("Only {} ms since last updateCurrentStatus, wait for next one",
            currTimeMs - lastUpdateTimeMs);
        return;
      }
      LOGGER.info("Trying to run controller updateCurrentStatus");

      _workerHelixManager.updateCurrentStatus();

      // Map<InstanceName, InstanceTopicPartitionHolder>
      Map<String, InstanceTopicPartitionHolder> instanceMap = new HashMap<>();
      // Map<TopicName, Map<Pipeline, Instance>>
      Map<String, Map<String, InstanceTopicPartitionHolder>> currTopicToPipelineInstanceMap = new HashMap<>();
      // Map<Pipeline, PriorityQueue<Instance>>
      Map<String, PriorityQueue<InstanceTopicPartitionHolder>> currPipelineToInstanceMap = new HashMap<>();
      // Set<InstanceName>
      List<String> currAvailableControllerList = new ArrayList<>();

      Map<TopicPartition, List<String>> workerRouteToInstanceMap = _workerHelixManager
          .getWorkerRouteToInstanceMap();
      // Map<Instance, Set<Pipeline>> from IdealState
      Map<String, Set<TopicPartition>> instanceToTopicPartitionsMap = HelixUtils
          .getInstanceToTopicPartitionsMap(_helixManager,
              _kafkaValidationManager.getClusterToObserverMap());

      List<String> liveInstances = HelixUtils.liveInstances(_helixManager);
      currAvailableControllerList.addAll(liveInstances);

      int assignedControllerCount = 0;
      for (String instanceId : instanceToTopicPartitionsMap.keySet()) {
        Set<TopicPartition> topicPartitions = instanceToTopicPartitionsMap.get(instanceId);
        // TODO: one instance suppose to have only one route
        for (TopicPartition tp : topicPartitions) {
          String topicName = tp.getTopic();
          if (topicName.startsWith(SEPARATOR)) {
            currPipelineToInstanceMap.putIfAbsent(topicName, new PriorityQueue<>(1,
                InstanceTopicPartitionHolder.totalWorkloadComparator(_controllerWorkloadSnapshot.getPipelineWorkloadMap())));
            InstanceTopicPartitionHolder itph = new InstanceTopicPartitionHolder(instanceId, tp);
            if (workerRouteToInstanceMap.get(tp) != null) {
              itph.addWorkers(workerRouteToInstanceMap.get(tp));
            }
            currPipelineToInstanceMap.get(topicName).add(itph);
            instanceMap.put(instanceId, itph);
            currAvailableControllerList.remove(instanceId);
            assignedControllerCount++;
          }
        }

        for (TopicPartition tp : topicPartitions) {
          String topicName = tp.getTopic();
          if (!topicName.startsWith(SEPARATOR)) {
            if (instanceMap.containsKey(instanceId)) {
              instanceMap.get(instanceId).addTopicPartition(tp);
              currTopicToPipelineInstanceMap.putIfAbsent(topicName, new ConcurrentHashMap<>());
              currTopicToPipelineInstanceMap.get(tp.getTopic())
                  .put(getPipelineFromRoute(tp.getPipeline()),
                      instanceMap.get(instanceId));
            }
          }
        }
      }
      Map<String, HostAndPort> pipelineHostInfoMap = new HashMap<>();
      for (InstanceTopicPartitionHolder holder : instanceMap.values()) {
        try {
          pipelineHostInfoMap.put(holder.getRouteString(), getHostInfo(holder.getInstanceName()));
        } catch (Exception e) {
          LOGGER.warn("Failed to find hostInfo for instanceId {}", holder.getInstanceName());
        }
      }
      _controllerWorkloadSnapshot.updatePipelineHostInfoMap(pipelineHostInfoMap);
      _controllerWorkloadSnapshot.refreshWorkloadInfo();

      _pipelineToInstanceMap = currPipelineToInstanceMap;
      _topicToPipelineInstanceMap = currTopicToPipelineInstanceMap;
      _availableControllerList = currAvailableControllerList;

      if (_helixManager.isLeader()) {
        _availableController.inc(_availableControllerList.size() - _availableController.getCount());
        _availableWorker
            .inc(_workerHelixManager.getAvailableWorkerList().size() - _availableWorker.getCount());
        _assignedControllerCount.inc(assignedControllerCount - _assignedControllerCount.getCount());
      }

      // Validation
      validateInstanceToTopicPartitionsMap(instanceToTopicPartitionsMap, instanceMap);

      //LOGGER.info("For controller _pipelineToInstanceMap: {}", _pipelineToInstanceMap);
      //LOGGER.info("For controller _topicToPipelineInstanceMap: {}", _topicToPipelineInstanceMap);
      LOGGER.info("For controller {} available", _availableControllerList.size());

      lastUpdateTimeMs = System.currentTimeMillis();
    } catch (Exception e) {
      LOGGER.error("Got exception in updateCurrentStatus", e);
    } finally {
      _lock.unlock();
    }
  }