public AutoRebalanceLiveInstanceChangeListener()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [102:141]


  public AutoRebalanceLiveInstanceChangeListener(HelixMirrorMakerManager helixMirrorMakerManager,
      HelixManager helixManager, ControllerConf controllerConf) {
    _helixMirrorMakerManager = helixMirrorMakerManager;
    _helixManager = helixManager;
    _maxWorkingInstances = controllerConf.getMaxWorkingInstances();
    _delayedAutoReblanceTimeInSeconds = controllerConf.getAutoRebalanceDelayInSeconds();
    _overloadedRatioThreshold = controllerConf.getAutoRebalanceWorkloadRatioThreshold();
    _maxDedicatedInstancesRatio = controllerConf.getMaxDedicatedLaggingInstancesRatio();
    _maxStuckPartitionMovements = controllerConf.getMaxStuckPartitionMovements();
    _movePartitionAfterStuckMillis = TimeUnit.MINUTES
        .toMillis(controllerConf.getMoveStuckPartitionAfterMinutes());
    LOGGER.info("Delayed Auto Reblance Time In Seconds: {}", _delayedAutoReblanceTimeInSeconds);
    registerMetrics();

    int autoRebalancePeriodInSeconds = controllerConf.getAutoRebalancePeriodInSeconds();
    final int minIntervalInSeconds = controllerConf.getAutoRebalanceMinIntervalInSeconds();
    if (autoRebalancePeriodInSeconds > 0) {
      LOGGER.info("Trying to schedule auto rebalancing at rate " + autoRebalancePeriodInSeconds
          + " seconds");
      _delayedScheuler.scheduleWithFixedDelay(
          new Runnable() {
            @Override
            public void run() {
              try {
                if (_helixMirrorMakerManager.getWorkloadInfoRetriever().isInitialized()
                    && System.currentTimeMillis() - _lastRebalanceTimeMillis
                    > 1000L * minIntervalInSeconds) {
                  rebalanceCurrentCluster(_helixMirrorMakerManager.getCurrentLiveInstances(),
                      _helixMirrorMakerManager.getBlacklistedInstances(), false, false);
                }
              } catch (Exception e) {
                LOGGER
                    .error("Got exception during periodically rebalancing the whole cluster! ", e);
              }
            }
          }, Math.max(_delayedAutoReblanceTimeInSeconds, autoRebalancePeriodInSeconds),
          autoRebalancePeriodInSeconds,
          TimeUnit.SECONDS);
    }
  }