in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/AutoRebalanceLiveInstanceChangeListener.java [102:141]
public AutoRebalanceLiveInstanceChangeListener(HelixMirrorMakerManager helixMirrorMakerManager,
HelixManager helixManager, ControllerConf controllerConf) {
_helixMirrorMakerManager = helixMirrorMakerManager;
_helixManager = helixManager;
_maxWorkingInstances = controllerConf.getMaxWorkingInstances();
_delayedAutoReblanceTimeInSeconds = controllerConf.getAutoRebalanceDelayInSeconds();
_overloadedRatioThreshold = controllerConf.getAutoRebalanceWorkloadRatioThreshold();
_maxDedicatedInstancesRatio = controllerConf.getMaxDedicatedLaggingInstancesRatio();
_maxStuckPartitionMovements = controllerConf.getMaxStuckPartitionMovements();
_movePartitionAfterStuckMillis = TimeUnit.MINUTES
.toMillis(controllerConf.getMoveStuckPartitionAfterMinutes());
LOGGER.info("Delayed Auto Reblance Time In Seconds: {}", _delayedAutoReblanceTimeInSeconds);
registerMetrics();
int autoRebalancePeriodInSeconds = controllerConf.getAutoRebalancePeriodInSeconds();
final int minIntervalInSeconds = controllerConf.getAutoRebalanceMinIntervalInSeconds();
if (autoRebalancePeriodInSeconds > 0) {
LOGGER.info("Trying to schedule auto rebalancing at rate " + autoRebalancePeriodInSeconds
+ " seconds");
_delayedScheuler.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
try {
if (_helixMirrorMakerManager.getWorkloadInfoRetriever().isInitialized()
&& System.currentTimeMillis() - _lastRebalanceTimeMillis
> 1000L * minIntervalInSeconds) {
rebalanceCurrentCluster(_helixMirrorMakerManager.getCurrentLiveInstances(),
_helixMirrorMakerManager.getBlacklistedInstances(), false, false);
}
} catch (Exception e) {
LOGGER
.error("Got exception during periodically rebalancing the whole cluster! ", e);
}
}
}, Math.max(_delayedAutoReblanceTimeInSeconds, autoRebalancePeriodInSeconds),
autoRebalancePeriodInSeconds,
TimeUnit.SECONDS);
}
}