in pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java [74:289]
public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarService pulsar) {
decisionCache.clear();
namespaceBundleCount.clear();
splittingBundles.clear();
final ServiceConfiguration conf = pulsar.getConfiguration();
int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;
long maxSplitCount = conf.getLoadBalancerMaxNumberOfBundlesToSplitPerCycle();
long splitConditionHitCountThreshold = conf.getLoadBalancerNamespaceBundleSplitConditionHitCountThreshold();
boolean debug = log.isDebugEnabled() || conf.isLoadBalancerDebugModeEnabled();
var channel = ServiceUnitStateChannelImpl.get(pulsar);
for (var etr : channel.getOwnershipEntrySet()) {
var eData = etr.getValue();
if (eData.state() == ServiceUnitState.Splitting) {
String bundle = etr.getKey();
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
splittingBundles.put(bundle, bundleRange);
}
}
Map<String, NamespaceBundleStats> bundleStatsMap = pulsar.getBrokerService().getBundleStats();
NamespaceBundleFactory namespaceBundleFactory =
pulsar.getNamespaceService().getNamespaceBundleFactory();
// clean splitConditionHitCounts
splitConditionHitCounts.keySet().retainAll(bundleStatsMap.keySet());
for (var entry : bundleStatsMap.entrySet()) {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
if (stats.topics < 2) {
if (debug) {
log.info(String.format(CANNOT_SPLIT_BUNDLE_MSG
+ " The topic count is less than 2.", bundle));
}
continue;
}
if (!channel.isOwner(bundle)) {
if (debug) {
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG
+ " This broker is not the owner.", bundle));
}
continue;
}
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
if (!namespaceBundleFactory
.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) {
if (debug) {
log.info(String.format(CANNOT_SPLIT_BUNDLE_MSG
+ " Invalid bundle range:%s.", bundle, bundleRange));
}
counter.update(Failure, Unknown);
continue;
}
double totalMessageRate = stats.msgRateIn + stats.msgRateOut;
double totalMessageThroughput = stats.msgThroughputIn + stats.msgThroughputOut;
int totalSessionCount = stats.consumerCount + stats.producerCount;
SplitDecision.Reason reason = Unknown;
if (stats.topics > maxBundleTopics) {
reason = Topics;
} else if (maxBundleSessions > 0 && (totalSessionCount > maxBundleSessions)) {
reason = Sessions;
} else if (totalMessageRate > maxBundleMsgRate) {
reason = MsgRate;
} else if (totalMessageThroughput > maxBundleBandwidth) {
reason = Bandwidth;
}
if (reason != Unknown) {
splitConditionHitCounts.put(bundle, splitConditionHitCounts.getOrDefault(bundle, 0) + 1);
} else {
splitConditionHitCounts.remove(bundle);
}
if (splitConditionHitCounts.getOrDefault(bundle, 0) <= splitConditionHitCountThreshold) {
if (debug) {
log.info(String.format(
CANNOT_SPLIT_BUNDLE_MSG
+ " Split condition hit count: %d is"
+ " less than or equal to threshold: %d. "
+ "Topics: %d/%d, "
+ "Sessions: (%d+%d)/%d, "
+ "Message Rate: %.2f/%d (msgs/s), "
+ "Message Throughput: %.2f/%d (MB/s).",
bundle,
splitConditionHitCounts.getOrDefault(bundle, 0),
splitConditionHitCountThreshold,
stats.topics, maxBundleTopics,
stats.producerCount, stats.consumerCount, maxBundleSessions,
totalMessageRate, maxBundleMsgRate,
totalMessageThroughput / LoadManagerShared.MIBI,
maxBundleBandwidth / LoadManagerShared.MIBI
));
}
continue;
}
final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
try {
final int bundleCount = pulsar.getNamespaceService()
.getBundleCount(NamespaceName.get(namespace));
if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0))
>= maxBundleCount) {
if (debug) {
log.info(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Namespace:%s has too many bundles:%d",
bundle, namespace, bundleCount));
}
continue;
}
} catch (Exception e) {
counter.update(Failure, Unknown);
log.warn("Failed to get bundle count in namespace:{}", namespace, e);
continue;
}
var ranges = bundleRange.split("_");
var foundSplittingBundle = false;
for (var etr : splittingBundles.entrySet()) {
var splittingBundle = etr.getKey();
if (splittingBundle.startsWith(namespace)) {
var splittingBundleRange = etr.getValue();
if (splittingBundleRange.startsWith(ranges[0])
|| splittingBundleRange.endsWith(ranges[1])) {
if (debug) {
log.info(String.format(CANNOT_SPLIT_BUNDLE_MSG
+ " (parent) bundle:%s is in Splitting state.", bundle, splittingBundle));
}
foundSplittingBundle = true;
break;
}
}
}
if (foundSplittingBundle) {
continue;
}
if (debug) {
log.info(String.format(
"Splitting bundle: %s. "
+ "Topics: %d/%d, "
+ "Sessions: (%d+%d)/%d, "
+ "Message Rate: %.2f/%d (msgs/s), "
+ "Message Throughput: %.2f/%d (MB/s)",
bundle,
stats.topics, maxBundleTopics,
stats.producerCount, stats.consumerCount, maxBundleSessions,
totalMessageRate, maxBundleMsgRate,
totalMessageThroughput / LoadManagerShared.MIBI,
maxBundleBandwidth / LoadManagerShared.MIBI
));
}
var decision = new SplitDecision();
var namespaceService = pulsar.getNamespaceService();
var namespaceBundle = namespaceService.getNamespaceBundleFactory()
.getBundle(namespaceName, bundleRange);
NamespaceBundleSplitAlgorithm algorithm =
namespaceService.getNamespaceBundleSplitAlgorithmByName(
conf.getDefaultNamespaceBundleSplitAlgorithm());
List<Long> splitBoundary = null;
try {
splitBoundary = namespaceService
.getSplitBoundary(namespaceBundle, null, algorithm)
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (Throwable e) {
counter.update(Failure, Unknown);
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " Failed to get split boundaries.", bundle, e));
continue;
}
if (splitBoundary == null) {
counter.update(Failure, Unknown);
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The split boundaries is null.", bundle));
continue;
}
if (splitBoundary.size() != 1) {
counter.update(Failure, Unknown);
log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG + " The size of split boundaries is not 1. "
+ "splitBoundary:%s", bundle, splitBoundary));
continue;
}
var parentRange = namespaceBundle.getKeyRange();
var leftChildBundle = namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
NamespaceBundleFactory.getRange(parentRange.lowerEndpoint(), splitBoundary.get(0)));
var rightChildBundle = namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject(),
NamespaceBundleFactory.getRange(splitBoundary.get(0), parentRange.upperEndpoint()));
Map<String, Optional<String>> splitServiceUnitToDestBroker = Map.of(
leftChildBundle.getBundleRange(), Optional.empty(),
rightChildBundle.getBundleRange(), Optional.empty());
decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId(), splitServiceUnitToDestBroker));
decision.succeed(reason);
decisionCache.add(decision);
int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
namespaceBundleCount.put(namespace, bundleNum + 1);
splitConditionHitCounts.remove(bundle);
// Clear namespace bundle-cache
namespaceBundleFactory.invalidateBundleCache(NamespaceName.get(namespaceName));
if (decisionCache.size() == maxSplitCount) {
if (debug) {
log.info(CANNOT_CONTINUE_SPLIT_MSG
+ "Too many bundles split in this cycle {} / {}.",
decisionCache.size(), maxSplitCount);
}
break;
}
}
return decisionCache;
}