public Set findBundlesForUnloading()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java [318:681]


    public Set<UnloadDecision> findBundlesForUnloading(LoadManagerContext context,
                                                  Map<String, Long> recentlyUnloadedBundles,
                                                  Map<String, Long> recentlyUnloadedBrokers) {
        final var conf = context.brokerConfiguration();
        decisionCache.clear();
        stats.clear();
        Map<String, BrokerLookupData> availableBrokers;
        try {
            availableBrokers = context.brokerRegistry().getAvailableBrokerLookupDataAsync()
                    .get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        } catch (ExecutionException | InterruptedException | TimeoutException e) {
            counter.update(Failure, Unknown);
            log.warn("Failed to fetch available brokers. Stop unloading.", e);
            return decisionCache;
        }

        try {
            final var loadStore = context.brokerLoadDataStore();
            stats.setLoadDataStore(loadStore);
            boolean debugMode = ExtensibleLoadManagerImpl.debug(conf, log);

            var skipReason = stats.update(
                    context.brokerLoadDataStore(), availableBrokers, recentlyUnloadedBrokers, conf);
            if (skipReason.isPresent()) {
                if (debugMode) {
                    log.warn(CANNOT_CONTINUE_UNLOAD_MSG
                                    + " Skipped the load stat update. Reason:{}.",
                            skipReason.get());
                }
                counter.update(Skip, skipReason.get());
                return decisionCache;
            }
            counter.updateLoadData(stats.avg, stats.std);



            if (debugMode) {
                log.info("brokers' load stats:{}", stats);
            }

            // skip metrics
            int numOfBrokersWithEmptyLoadData = 0;
            int numOfBrokersWithFewBundles = 0;

            final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
            boolean transfer = conf.isLoadBalancerTransferEnabled();
            if (stats.std() > targetStd
                    || isUnderLoaded(context, stats.peekMinBroker(), stats)
                    || isOverLoaded(context, stats.peekMaxBroker(), stats.avg)) {
                unloadConditionHitCount++;
            } else {
                unloadConditionHitCount = 0;
            }

            if (unloadConditionHitCount <= conf.getLoadBalancerSheddingConditionHitCountThreshold()) {
                if (debugMode) {
                    log.info(CANNOT_CONTINUE_UNLOAD_MSG
                                    + " Shedding condition hit count:{} is less than or equal to the threshold:{}.",
                            unloadConditionHitCount, conf.getLoadBalancerSheddingConditionHitCountThreshold());
                }
                counter.update(Skip, HitCount);
                return decisionCache;
            }

            while (true) {
                if (!stats.hasTransferableBrokers()) {
                    if (debugMode) {
                        log.info(CANNOT_CONTINUE_UNLOAD_MSG
                                + " Exhausted target transfer brokers.");
                    }
                    break;
                }
                UnloadDecision.Reason reason;
                if (stats.std() > targetStd) {
                    reason = Overloaded;
                } else if (isUnderLoaded(context, stats.peekMinBroker(), stats)) {
                    reason = Underloaded;
                    if (debugMode) {
                        log.info(String.format("broker:%s is underloaded:%s although "
                                        + "load std:%.2f <= targetStd:%.2f. "
                                        + "Continuing unload for this underloaded broker.",
                                stats.peekMinBroker(),
                                context.brokerLoadDataStore().get(stats.peekMinBroker()).get(),
                                stats.std(), targetStd));
                    }
                } else if (isOverLoaded(context, stats.peekMaxBroker(), stats.avg)) {
                    reason = Overloaded;
                    if (debugMode) {
                        log.info(String.format("broker:%s is overloaded:%s although "
                                        + "load std:%.2f <= targetStd:%.2f. "
                                        + "Continuing unload for this overloaded broker.",
                                stats.peekMaxBroker(),
                                context.brokerLoadDataStore().get(stats.peekMaxBroker()).get(),
                                stats.std(), targetStd));
                    }
                } else {
                    if (debugMode) {
                        log.info(CANNOT_CONTINUE_UNLOAD_MSG
                                        + "The overall cluster load meets the target, std:{} <= targetStd:{}."
                                        + "minBroker:{} is not underloaded. maxBroker:{} is not overloaded.",
                                stats.std(), targetStd, stats.peekMinBroker(), stats.peekMaxBroker());
                    }
                    break;
                }

                String maxBroker = stats.pollMaxBroker();
                String minBroker = stats.peekMinBroker();
                Optional<BrokerLoadData> maxBrokerLoadData = context.brokerLoadDataStore().get(maxBroker);
                Optional<BrokerLoadData> minBrokerLoadData = context.brokerLoadDataStore().get(minBroker);
                if (maxBrokerLoadData.isEmpty()) {
                    log.error(String.format(CANNOT_UNLOAD_BROKER_MSG
                            + " MaxBrokerLoadData is empty.", maxBroker));
                    numOfBrokersWithEmptyLoadData++;
                    continue;
                }
                if (minBrokerLoadData.isEmpty()) {
                    log.error("Can't transfer load to broker:{}. MinBrokerLoadData is empty.", minBroker);
                    numOfBrokersWithEmptyLoadData++;
                    continue;
                }
                double maxLoad = maxBrokerLoadData.get().getWeightedMaxEMA();
                double minLoad = minBrokerLoadData.get().getWeightedMaxEMA();
                double offload = (maxLoad - minLoad) / 2;
                BrokerLoadData brokerLoadData = maxBrokerLoadData.get();
                double maxBrokerThroughput = brokerLoadData.getMsgThroughputIn()
                        + brokerLoadData.getMsgThroughputOut();
                double minBrokerThroughput = minBrokerLoadData.get().getMsgThroughputIn()
                        + minBrokerLoadData.get().getMsgThroughputOut();
                double offloadThroughput = maxBrokerThroughput * offload / maxLoad;

                if (debugMode) {
                    log.info(String.format(
                            "Attempting to shed load from broker:%s%s, which has the max resource "
                                    + "usage:%.2f%%, targetStd:%.2f,"
                                    + " -- Trying to offload %.2f%%, %.2f KByte/s of traffic.",
                            maxBroker, transfer ? " to broker:" + minBroker : "",
                            maxLoad * 100,
                            targetStd,
                            offload * 100,
                            offloadThroughput / KB
                    ));
                }

                double trafficMarkedToOffload = 0;
                double trafficMarkedToGain = 0;

                Optional<TopBundlesLoadData> bundlesLoadData = context.topBundleLoadDataStore().get(maxBroker);
                if (bundlesLoadData.isEmpty() || bundlesLoadData.get().getTopBundlesLoadData().isEmpty()) {
                    if (debugMode) {
                        log.info(String.format(CANNOT_UNLOAD_BROKER_MSG
                                + " TopBundlesLoadData is empty.", maxBroker));
                    }
                    numOfBrokersWithEmptyLoadData++;
                    continue;
                }

                var maxBrokerTopBundlesLoadData = bundlesLoadData.get().getTopBundlesLoadData();
                if (maxBrokerTopBundlesLoadData.size() == 1) {
                    numOfBrokersWithFewBundles++;
                    log.warn(String.format(CANNOT_UNLOAD_BROKER_MSG
                                    + " Sole namespace bundle:%s is overloading the broker. ",
                            maxBroker, maxBrokerTopBundlesLoadData.iterator().next()));
                    continue;
                }
                Optional<TopBundlesLoadData> minBundlesLoadData = context.topBundleLoadDataStore().get(minBroker);
                var minBrokerTopBundlesLoadDataIter =
                        minBundlesLoadData.isPresent() ? minBundlesLoadData.get().getTopBundlesLoadData().iterator() :
                                null;


                if (maxBrokerTopBundlesLoadData.isEmpty()) {
                    numOfBrokersWithFewBundles++;
                    log.warn(String.format(CANNOT_UNLOAD_BROKER_MSG
                            + " Broker overloaded despite having no bundles", maxBroker));
                    continue;
                }

                int remainingTopBundles = maxBrokerTopBundlesLoadData.size();
                for (var e : maxBrokerTopBundlesLoadData) {
                    String bundle = e.bundleName();
                    if (channel != null && !channel.isOwner(bundle, maxBroker)) {
                        if (debugMode) {
                            log.warn(String.format(CANNOT_UNLOAD_BUNDLE_MSG
                                    + " MaxBroker:%s is not the owner.", bundle, maxBroker));
                        }
                        continue;
                    }
                    if (recentlyUnloadedBundles.containsKey(bundle)) {
                        if (debugMode) {
                            log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
                                            + " Bundle has been recently unloaded at ts:%d.",
                                    bundle, recentlyUnloadedBundles.get(bundle)));
                        }
                        continue;
                    }
                    if (!isTransferable(context, availableBrokers, bundle, maxBroker, Optional.of(minBroker))) {
                        if (debugMode) {
                            log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
                                    + " This unload can't meet "
                                    + "affinity(isolation) or anti-affinity group policies.", bundle));
                        }
                        continue;
                    }
                    if (remainingTopBundles <= 1) {
                        if (debugMode) {
                            log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
                                            + " The remaining bundles in TopBundlesLoadData from the maxBroker:%s is"
                                            + " less than or equal to 1.",
                                    bundle, maxBroker));
                        }
                        break;
                    }

                    var bundleData = e.stats();
                    double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut;
                    if (maxBrokerBundleThroughput == 0) {
                        if (debugMode) {
                            log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
                                    + " It has zero throughput.", bundle));
                        }
                        continue;
                    }
                    boolean swap = false;
                    List<Unload> minToMaxUnloads = new ArrayList<>();
                    double minBrokerBundleSwapThroughput = 0.0;
                    if (trafficMarkedToOffload - trafficMarkedToGain + maxBrokerBundleThroughput > offloadThroughput) {
                        // see if we can swap bundles from min to max broker to balance better.
                        if (transfer && minBrokerTopBundlesLoadDataIter != null) {
                            var maxBrokerNewThroughput =
                                    maxBrokerThroughput - trafficMarkedToOffload + trafficMarkedToGain
                                            - maxBrokerBundleThroughput;
                            var minBrokerNewThroughput =
                                    minBrokerThroughput + trafficMarkedToOffload - trafficMarkedToGain
                                            + maxBrokerBundleThroughput;
                            while (minBrokerTopBundlesLoadDataIter.hasNext()) {
                                var minBrokerBundleData = minBrokerTopBundlesLoadDataIter.next();
                                if (!isTransferable(context, availableBrokers,
                                        minBrokerBundleData.bundleName(), minBroker, Optional.of(maxBroker))) {
                                    continue;
                                }
                                var minBrokerBundleThroughput =
                                        minBrokerBundleData.stats().msgThroughputIn
                                                + minBrokerBundleData.stats().msgThroughputOut;
                                if (minBrokerBundleThroughput == 0) {
                                    continue;
                                }
                                var maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput;
                                var minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput;
                                if (maxBrokerNewThroughputTmp < maxBrokerThroughput
                                        && minBrokerNewThroughputTmp < maxBrokerThroughput) {
                                    minToMaxUnloads.add(new Unload(minBroker,
                                            minBrokerBundleData.bundleName(), Optional.of(maxBroker)));
                                    maxBrokerNewThroughput = maxBrokerNewThroughputTmp;
                                    minBrokerNewThroughput = minBrokerNewThroughputTmp;
                                    minBrokerBundleSwapThroughput += minBrokerBundleThroughput;
                                    if (minBrokerNewThroughput <= maxBrokerNewThroughput
                                            && maxBrokerNewThroughput < maxBrokerThroughput * 0.75) {
                                        swap = true;
                                        break;
                                    }
                                }
                            }
                        }
                        if (!swap) {
                            if (debugMode) {
                                log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
                                                + " The traffic to unload:%.2f - gain:%.2f = %.2f KByte/s is "
                                                + "greater than the target :%.2f KByte/s.",
                                        bundle,
                                        (trafficMarkedToOffload + maxBrokerBundleThroughput) / KB,
                                        trafficMarkedToGain / KB,
                                        (trafficMarkedToOffload - trafficMarkedToGain + maxBrokerBundleThroughput) / KB,
                                        offloadThroughput / KB));
                            }
                            break;
                        }
                    }
                    Unload unload;
                    if (transfer) {
                        if (swap) {
                            minToMaxUnloads.forEach(minToMaxUnload -> {
                                if (debugMode) {
                                    log.info("Decided to gain bundle:{} from min broker:{}",
                                            minToMaxUnload.serviceUnit(), minToMaxUnload.sourceBroker());
                                }
                                var decision = new UnloadDecision();
                                decision.setUnload(minToMaxUnload);
                                decision.succeed(reason);
                                decisionCache.add(decision);
                            });
                            if (debugMode) {
                                log.info(String.format(
                                        "Total traffic %.2f KByte/s to transfer from min broker:%s to max broker:%s.",
                                        minBrokerBundleSwapThroughput / KB, minBroker, maxBroker));
                                trafficMarkedToGain += minBrokerBundleSwapThroughput;
                            }
                        }
                        unload = new Unload(maxBroker, bundle, Optional.of(minBroker));
                    } else {
                        unload = new Unload(maxBroker, bundle);
                    }
                    var decision = new UnloadDecision();
                    decision.setUnload(unload);
                    decision.succeed(reason);
                    decisionCache.add(decision);
                    trafficMarkedToOffload += maxBrokerBundleThroughput;
                    remainingTopBundles--;

                    if (debugMode) {
                        log.info(String.format("Decided to unload bundle:%s, throughput:%.2f KByte/s."
                                        + " The traffic marked to unload:%.2f - gain:%.2f = %.2f KByte/s."
                                        + " Target:%.2f KByte/s.",
                                bundle, maxBrokerBundleThroughput / KB,
                                trafficMarkedToOffload / KB,
                                trafficMarkedToGain / KB,
                                (trafficMarkedToOffload - trafficMarkedToGain) / KB,
                                offloadThroughput / KB));
                    }
                }
                if (trafficMarkedToOffload > 0) {
                    var adjustedOffload =
                            (trafficMarkedToOffload - trafficMarkedToGain) * maxLoad / maxBrokerThroughput;
                    stats.offload(maxLoad, minLoad, adjustedOffload);
                    if (debugMode) {
                        log.info(
                                String.format("brokers' load stats:%s, after offload{max:%.2f, min:%.2f, offload:%.2f}",
                                        stats, maxLoad, minLoad, adjustedOffload));
                    }
                } else {
                    numOfBrokersWithFewBundles++;
                    log.warn(String.format(CANNOT_UNLOAD_BROKER_MSG
                            + " There is no bundle that can be unloaded in top bundles load data. "
                            + "Consider splitting bundles owned by the broker "
                            + "to make each bundle serve less traffic "
                            + "or increasing loadBalancerMaxNumberOfBundlesInBundleLoadReport"
                            + " to report more bundles in the top bundles load data.", maxBroker));
                }

            } // while end

            if (debugMode) {
                log.info("decisionCache:{}", decisionCache);
            }

            if (decisionCache.isEmpty()) {
                UnloadDecision.Reason reason;
                if (numOfBrokersWithEmptyLoadData > 0) {
                    reason = NoLoadData;
                } else if (numOfBrokersWithFewBundles > 0) {
                    reason = NoBundles;
                } else {
                    reason = HitCount;
                }
                counter.update(Skip, reason);
            } else {
                unloadConditionHitCount = 0;
            }

        } catch (Throwable e) {
            log.error("Failed to process unloading. ", e);
            this.counter.update(Failure, Unknown);
        }
        return decisionCache;
    }