in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/core/ControllerHelixManager.java [951:1096]
public void scaleCurrentCluster() throws Exception {
int oldTotalNumWorker = 0;
int newTotalNumWorker = 0;
Map<String, Integer> _routeWorkerOverrides = getRouteWorkerOverride();
for (String pipeline : _pipelineToInstanceMap.keySet()) {
LOGGER.info("Start rescale pipeline: {}", pipeline);
PriorityQueue<InstanceTopicPartitionHolder> newItphQueue = new PriorityQueue<>(1,
InstanceTopicPartitionHolder.totalWorkloadComparator(_controllerWorkloadSnapshot.getPipelineWorkloadMap()));
// TODO: what if routeId is not continuous
int nextRouteId = _pipelineToInstanceMap.get(pipeline).size();
for (InstanceTopicPartitionHolder itph : _pipelineToInstanceMap.get(pipeline)) {
if (itph.getTotalNumPartitions() > _maxNumPartitionsPerRoute) {
LOGGER.info(
"Checking route {} with controller {} and topics {} since it exceeds maxNumPartitionsPerRoute {}",
itph.getRouteString(), itph.getInstanceName(), itph.getServingTopicPartitionSet(),
_maxNumPartitionsPerRoute);
while (itph.getTotalNumPartitions() > _maxNumPartitionsPerRoute) {
// Only one topic left, do nothing
if (itph.getNumServingTopicPartitions() == 1) {
LOGGER.info("Only one topic {} in route {}, do nothing",
itph.getServingTopicPartitionSet().iterator().next(), itph.getRouteString());
break;
}
// Get the topic with largest number of partitions
TopicPartition tpToMove = new TopicPartition("tmp", -1);
for (TopicPartition tp : itph.getServingTopicPartitionSet()) {
if (tp.getPartition() > tpToMove.getPartition()) {
tpToMove = tp;
}
}
// If existing lightest route cannot fit the largest topic to move
if (newItphQueue.isEmpty() ||
newItphQueue.peek().getTotalNumPartitions() + tpToMove.getPartition()
> _initMaxNumPartitionsPerRoute) {
try {
InstanceTopicPartitionHolder newHolder = createNewRoute(pipeline, nextRouteId);
_helixAdmin.setResourceIdealState(_helixClusterName, tpToMove.getTopic(),
IdealStateBuilder.resetCustomIdealStateFor(
_helixAdmin.getResourceIdealState(_helixClusterName, tpToMove.getTopic()),
tpToMove.getTopic(), itph.getRouteString(), newHolder.getRouteString(),
newHolder.getInstanceName()));
itph.removeTopicPartition(tpToMove);
newHolder.addTopicPartition(tpToMove);
newItphQueue.add(newHolder);
nextRouteId++;
} catch (Exception e) {
LOGGER.error("Got exception when create a new route when rebalancing, abandon!", e);
throw new Exception(
"Got exception when create a new route when rebalancing, abandon!", e);
}
} else {
InstanceTopicPartitionHolder newHolder = newItphQueue.poll();
_helixAdmin.setResourceIdealState(_helixClusterName, tpToMove.getTopic(),
IdealStateBuilder.resetCustomIdealStateFor(
_helixAdmin.getResourceIdealState(_helixClusterName, tpToMove.getTopic()),
tpToMove.getTopic(), itph.getRouteString(), newHolder.getRouteString(),
newHolder.getInstanceName()));
itph.removeTopicPartition(tpToMove);
newHolder.addTopicPartition(tpToMove);
newItphQueue.add(newHolder);
}
}
}
newItphQueue.add(itph);
}
// After moving topics, scale workers based on workload
int rescaleFailedCount = 0;
for (InstanceTopicPartitionHolder itph : newItphQueue) {
oldTotalNumWorker += itph.getWorkerSet().size();
String routeString = itph.getRouteString();
int initWorkerCount = _initMaxNumWorkersPerRoute;
if (_routeWorkerOverrides.containsKey(routeString)
&& _routeWorkerOverrides.get(routeString) > initWorkerCount) {
initWorkerCount = _routeWorkerOverrides.get(routeString);
}
try {
boolean isFailed = _controllerWorkloadSnapshot.getFailedPipelines().contains(itph.getRouteString());
if (isFailed) {
LOGGER.error(String.format("Get workload error on route %s. No change on number of workers", itph.getInstanceName(), itph.getRouteString()));
continue;
}
ControllerWorkloadInfo workloadInfo = _controllerWorkloadSnapshot.getPipelineWorkloadMap().get(itph.getRouteString());
if (workloadInfo == null || !workloadInfo.isAutoBalancingEnabled()) {
LOGGER.warn("Skip scaling worker for route {} because of auto balancing is disabled on controller", routeString);
continue;
}
if (workloadInfo != null && workloadInfo.getNumOfExpectedWorkers() != 0) {
int expectedNumWorkers = workloadInfo.getNumOfExpectedWorkers();
LOGGER.info("Current {} workers in route {}, expect {} workers",
itph.getWorkerSet().size(), itph.getRouteString(), expectedNumWorkers);
int actualExpectedNumWorkers = getActualExpectedNumWorkers(expectedNumWorkers,
initWorkerCount);
LOGGER.info("Current {} workers in route {}, actual expect {} workers",
itph.getWorkerSet().size(), itph.getRouteString(), actualExpectedNumWorkers);
if (actualExpectedNumWorkers > itph.getWorkerSet().size()) {
LOGGER
.info("Current {} workers in route {}, actual expect {} workers, add {} workers",
itph.getWorkerSet().size(), itph.getRouteString(), actualExpectedNumWorkers,
actualExpectedNumWorkers - itph.getWorkerSet().size());
_workerHelixManager.addWorkersToMirrorMaker(itph, itph.getRoute().getTopic(),
itph.getRoute().getPartition(),
actualExpectedNumWorkers - itph.getWorkerSet().size());
}
if (actualExpectedNumWorkers < itph.getWorkerSet().size()) {
//_numOfWorkersBatchSize is the smallest unit for scaling worker in a safe way
LOGGER.info(
"Current {} workers in route {}, actual expect {} workers, remove {} workers",
itph.getWorkerSet().size(), itph.getRouteString(), actualExpectedNumWorkers,
_numOfWorkersBatchSize);
_workerHelixManager.removeWorkersToMirrorMaker(itph, itph.getRoute().getTopic(),
itph.getRoute().getPartition(), _numOfWorkersBatchSize);
}
newTotalNumWorker += actualExpectedNumWorkers;
} else {
LOGGER
.warn("Get workload route: {} returns 0. No change on number of workers",
itph.getRouteString());
newTotalNumWorker += itph.getWorkerSet().size();
rescaleFailedCount++;
}
} catch (Exception e) {
rescaleFailedCount++;
LOGGER.error(String.format(
"Get workload error when connecting to %s for route %s. No change on number of workers",
itph.getInstanceName(), itph.getRouteString()), e);
newTotalNumWorker += itph.getWorkerSet().size();
rescaleFailedCount++;
}
}
_pipelineToInstanceMap.put(pipeline, newItphQueue);
_rescaleFailedCount.inc(rescaleFailedCount - _rescaleFailedCount.getCount());
}
LOGGER
.info("oldTotalNumWorker: {}, newTotalNumWorker: {}", oldTotalNumWorker, newTotalNumWorker);
}