in pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java [318:681]
public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
Map<String, Long> recentlyUnloadedBundles,
Map<String, Long> recentlyUnloadedBrokers) {
final var conf = context.brokerConfiguration();
decisionCache.clear();
stats.clear();
Map<String, BrokerLookupData> availableBrokers;
try {
availableBrokers = context.brokerRegistry().getAvailableBrokerLookupDataAsync()
.get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
counter.update(Failure, Unknown);
log.warn("Failed to fetch available brokers. Stop unloading.", e);
return decisionCache;
}
try {
final var loadStore = context.brokerLoadDataStore();
stats.setLoadDataStore(loadStore);
boolean debugMode = ExtensibleLoadManagerImpl.debug(conf, log);
var skipReason = stats.update(
context.brokerLoadDataStore(), availableBrokers, recentlyUnloadedBrokers, conf);
if (skipReason.isPresent()) {
if (debugMode) {
log.warn(CANNOT_CONTINUE_UNLOAD_MSG
+ " Skipped the load stat update. Reason:{}.",
skipReason.get());
}
counter.update(Skip, skipReason.get());
return decisionCache;
}
counter.updateLoadData(stats.avg, stats.std);
if (debugMode) {
log.info("brokers' load stats:{}", stats);
}
// skip metrics
int numOfBrokersWithEmptyLoadData = 0;
int numOfBrokersWithFewBundles = 0;
final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
boolean transfer = conf.isLoadBalancerTransferEnabled();
if (stats.std() > targetStd
|| isUnderLoaded(context, stats.peekMinBroker(), stats)
|| isOverLoaded(context, stats.peekMaxBroker(), stats.avg)) {
unloadConditionHitCount++;
} else {
unloadConditionHitCount = 0;
}
if (unloadConditionHitCount <= conf.getLoadBalancerSheddingConditionHitCountThreshold()) {
if (debugMode) {
log.info(CANNOT_CONTINUE_UNLOAD_MSG
+ " Shedding condition hit count:{} is less than or equal to the threshold:{}.",
unloadConditionHitCount, conf.getLoadBalancerSheddingConditionHitCountThreshold());
}
counter.update(Skip, HitCount);
return decisionCache;
}
while (true) {
if (!stats.hasTransferableBrokers()) {
if (debugMode) {
log.info(CANNOT_CONTINUE_UNLOAD_MSG
+ " Exhausted target transfer brokers.");
}
break;
}
UnloadDecision.Reason reason;
if (stats.std() > targetStd) {
reason = Overloaded;
} else if (isUnderLoaded(context, stats.peekMinBroker(), stats)) {
reason = Underloaded;
if (debugMode) {
log.info(String.format("broker:%s is underloaded:%s although "
+ "load std:%.2f <= targetStd:%.2f. "
+ "Continuing unload for this underloaded broker.",
stats.peekMinBroker(),
context.brokerLoadDataStore().get(stats.peekMinBroker()).get(),
stats.std(), targetStd));
}
} else if (isOverLoaded(context, stats.peekMaxBroker(), stats.avg)) {
reason = Overloaded;
if (debugMode) {
log.info(String.format("broker:%s is overloaded:%s although "
+ "load std:%.2f <= targetStd:%.2f. "
+ "Continuing unload for this overloaded broker.",
stats.peekMaxBroker(),
context.brokerLoadDataStore().get(stats.peekMaxBroker()).get(),
stats.std(), targetStd));
}
} else {
if (debugMode) {
log.info(CANNOT_CONTINUE_UNLOAD_MSG
+ "The overall cluster load meets the target, std:{} <= targetStd:{}."
+ "minBroker:{} is not underloaded. maxBroker:{} is not overloaded.",
stats.std(), targetStd, stats.peekMinBroker(), stats.peekMaxBroker());
}
break;
}
String maxBroker = stats.pollMaxBroker();
String minBroker = stats.peekMinBroker();
Optional<BrokerLoadData> maxBrokerLoadData = context.brokerLoadDataStore().get(maxBroker);
Optional<BrokerLoadData> minBrokerLoadData = context.brokerLoadDataStore().get(minBroker);
if (maxBrokerLoadData.isEmpty()) {
log.error(String.format(CANNOT_UNLOAD_BROKER_MSG
+ " MaxBrokerLoadData is empty.", maxBroker));
numOfBrokersWithEmptyLoadData++;
continue;
}
if (minBrokerLoadData.isEmpty()) {
log.error("Can't transfer load to broker:{}. MinBrokerLoadData is empty.", minBroker);
numOfBrokersWithEmptyLoadData++;
continue;
}
double maxLoad = maxBrokerLoadData.get().getWeightedMaxEMA();
double minLoad = minBrokerLoadData.get().getWeightedMaxEMA();
double offload = (maxLoad - minLoad) / 2;
BrokerLoadData brokerLoadData = maxBrokerLoadData.get();
double maxBrokerThroughput = brokerLoadData.getMsgThroughputIn()
+ brokerLoadData.getMsgThroughputOut();
double minBrokerThroughput = minBrokerLoadData.get().getMsgThroughputIn()
+ minBrokerLoadData.get().getMsgThroughputOut();
double offloadThroughput = maxBrokerThroughput * offload / maxLoad;
if (debugMode) {
log.info(String.format(
"Attempting to shed load from broker:%s%s, which has the max resource "
+ "usage:%.2f%%, targetStd:%.2f,"
+ " -- Trying to offload %.2f%%, %.2f KByte/s of traffic.",
maxBroker, transfer ? " to broker:" + minBroker : "",
maxLoad * 100,
targetStd,
offload * 100,
offloadThroughput / KB
));
}
double trafficMarkedToOffload = 0;
double trafficMarkedToGain = 0;
Optional<TopBundlesLoadData> bundlesLoadData = context.topBundleLoadDataStore().get(maxBroker);
if (bundlesLoadData.isEmpty() || bundlesLoadData.get().getTopBundlesLoadData().isEmpty()) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BROKER_MSG
+ " TopBundlesLoadData is empty.", maxBroker));
}
numOfBrokersWithEmptyLoadData++;
continue;
}
var maxBrokerTopBundlesLoadData = bundlesLoadData.get().getTopBundlesLoadData();
if (maxBrokerTopBundlesLoadData.size() == 1) {
numOfBrokersWithFewBundles++;
log.warn(String.format(CANNOT_UNLOAD_BROKER_MSG
+ " Sole namespace bundle:%s is overloading the broker. ",
maxBroker, maxBrokerTopBundlesLoadData.iterator().next()));
continue;
}
Optional<TopBundlesLoadData> minBundlesLoadData = context.topBundleLoadDataStore().get(minBroker);
var minBrokerTopBundlesLoadDataIter =
minBundlesLoadData.isPresent() ? minBundlesLoadData.get().getTopBundlesLoadData().iterator() :
null;
if (maxBrokerTopBundlesLoadData.isEmpty()) {
numOfBrokersWithFewBundles++;
log.warn(String.format(CANNOT_UNLOAD_BROKER_MSG
+ " Broker overloaded despite having no bundles", maxBroker));
continue;
}
int remainingTopBundles = maxBrokerTopBundlesLoadData.size();
for (var e : maxBrokerTopBundlesLoadData) {
String bundle = e.bundleName();
if (channel != null && !channel.isOwner(bundle, maxBroker)) {
if (debugMode) {
log.warn(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ " MaxBroker:%s is not the owner.", bundle, maxBroker));
}
continue;
}
if (recentlyUnloadedBundles.containsKey(bundle)) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ " Bundle has been recently unloaded at ts:%d.",
bundle, recentlyUnloadedBundles.get(bundle)));
}
continue;
}
if (!isTransferable(context, availableBrokers, bundle, maxBroker, Optional.of(minBroker))) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ " This unload can't meet "
+ "affinity(isolation) or anti-affinity group policies.", bundle));
}
continue;
}
if (remainingTopBundles <= 1) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ " The remaining bundles in TopBundlesLoadData from the maxBroker:%s is"
+ " less than or equal to 1.",
bundle, maxBroker));
}
break;
}
var bundleData = e.stats();
double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
if (maxBrokerBundleThroughput == 0) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ " It has zero throughput.", bundle));
}
continue;
}
boolean swap = false;
List<Unload> minToMaxUnloads = new ArrayList<>();
double minBrokerBundleSwapThroughput = 0.0;
if (trafficMarkedToOffload - trafficMarkedToGain + maxBrokerBundleThroughput > offloadThroughput) {
// see if we can swap bundles from min to max broker to balance better.
if (transfer && minBrokerTopBundlesLoadDataIter != null) {
var maxBrokerNewThroughput =
maxBrokerThroughput - trafficMarkedToOffload + trafficMarkedToGain
- maxBrokerBundleThroughput;
var minBrokerNewThroughput =
minBrokerThroughput + trafficMarkedToOffload - trafficMarkedToGain
+ maxBrokerBundleThroughput;
while (minBrokerTopBundlesLoadDataIter.hasNext()) {
var minBrokerBundleData = minBrokerTopBundlesLoadDataIter.next();
if (!isTransferable(context, availableBrokers,
minBrokerBundleData.bundleName(), minBroker, Optional.of(maxBroker))) {
continue;
}
var minBrokerBundleThroughput =
minBrokerBundleData.stats().msgThroughputIn
+ minBrokerBundleData.stats().msgThroughputOut;
if (minBrokerBundleThroughput == 0) {
continue;
}
var maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput;
var minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput;
if (maxBrokerNewThroughputTmp < maxBrokerThroughput
&& minBrokerNewThroughputTmp < maxBrokerThroughput) {
minToMaxUnloads.add(new Unload(minBroker,
minBrokerBundleData.bundleName(), Optional.of(maxBroker)));
maxBrokerNewThroughput = maxBrokerNewThroughputTmp;
minBrokerNewThroughput = minBrokerNewThroughputTmp;
minBrokerBundleSwapThroughput += minBrokerBundleThroughput;
if (minBrokerNewThroughput <= maxBrokerNewThroughput
&& maxBrokerNewThroughput < maxBrokerThroughput * 0.75) {
swap = true;
break;
}
}
}
}
if (!swap) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ " The traffic to unload:%.2f - gain:%.2f = %.2f KByte/s is "
+ "greater than the target :%.2f KByte/s.",
bundle,
(trafficMarkedToOffload + maxBrokerBundleThroughput) / KB,
trafficMarkedToGain / KB,
(trafficMarkedToOffload - trafficMarkedToGain + maxBrokerBundleThroughput) / KB,
offloadThroughput / KB));
}
break;
}
}
Unload unload;
if (transfer) {
if (swap) {
minToMaxUnloads.forEach(minToMaxUnload -> {
if (debugMode) {
log.info("Decided to gain bundle:{} from min broker:{}",
minToMaxUnload.serviceUnit(), minToMaxUnload.sourceBroker());
}
var decision = new UnloadDecision();
decision.setUnload(minToMaxUnload);
decision.succeed(reason);
decisionCache.add(decision);
});
if (debugMode) {
log.info(String.format(
"Total traffic %.2f KByte/s to transfer from min broker:%s to max broker:%s.",
minBrokerBundleSwapThroughput / KB, minBroker, maxBroker));
trafficMarkedToGain += minBrokerBundleSwapThroughput;
}
}
unload = new Unload(maxBroker, bundle, Optional.of(minBroker));
} else {
unload = new Unload(maxBroker, bundle);
}
var decision = new UnloadDecision();
decision.setUnload(unload);
decision.succeed(reason);
decisionCache.add(decision);
trafficMarkedToOffload += maxBrokerBundleThroughput;
remainingTopBundles--;
if (debugMode) {
log.info(String.format("Decided to unload bundle:%s, throughput:%.2f KByte/s."
+ " The traffic marked to unload:%.2f - gain:%.2f = %.2f KByte/s."
+ " Target:%.2f KByte/s.",
bundle, maxBrokerBundleThroughput / KB,
trafficMarkedToOffload / KB,
trafficMarkedToGain / KB,
(trafficMarkedToOffload - trafficMarkedToGain) / KB,
offloadThroughput / KB));
}
}
if (trafficMarkedToOffload > 0) {
var adjustedOffload =
(trafficMarkedToOffload - trafficMarkedToGain) * maxLoad / maxBrokerThroughput;
stats.offload(maxLoad, minLoad, adjustedOffload);
if (debugMode) {
log.info(
String.format("brokers' load stats:%s, after offload{max:%.2f, min:%.2f, offload:%.2f}",
stats, maxLoad, minLoad, adjustedOffload));
}
} else {
numOfBrokersWithFewBundles++;
log.warn(String.format(CANNOT_UNLOAD_BROKER_MSG
+ " There is no bundle that can be unloaded in top bundles load data. "
+ "Consider splitting bundles owned by the broker "
+ "to make each bundle serve less traffic "
+ "or increasing loadBalancerMaxNumberOfBundlesInBundleLoadReport"
+ " to report more bundles in the top bundles load data.", maxBroker));
}
} // while end
if (debugMode) {
log.info("decisionCache:{}", decisionCache);
}
if (decisionCache.isEmpty()) {
UnloadDecision.Reason reason;
if (numOfBrokersWithEmptyLoadData > 0) {
reason = NoLoadData;
} else if (numOfBrokersWithFewBundles > 0) {
reason = NoBundles;
} else {
reason = HitCount;
}
counter.update(Skip, reason);
} else {
unloadConditionHitCount = 0;
}
} catch (Throwable e) {
log.error("Failed to process unloading. ", e);
this.counter.update(Failure, Unknown);
}
return decisionCache;
}