in pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java [1648:1781]
protected void monitorOwnerships(List<String> brokers) {
try {
if (!isChannelOwner()) {
log.warn("This broker is not the leader now. Skipping ownership monitor.");
return;
}
} catch (Exception e) {
log.error("Failed to monitor ownerships", e);
return;
}
if (brokers == null || brokers.size() == 0) {
log.error("no active brokers found. Skipping ownership monitor.");
return;
}
var metadataState = getMetadataState();
if (metadataState != Stable) {
log.warn("metadata state:{} is not Stable. Skipping ownership monitor.", metadataState);
return;
}
var debug = debug();
if (debug) {
log.info("Started the ownership monitor run for activeBrokerCount:{}", brokers.size());
}
long startTime = System.nanoTime();
Set<String> inactiveBrokers = new HashSet<>();
Set<String> activeBrokers = new HashSet<>(brokers);
Map<String, ServiceUnitStateData> timedOutInFlightStateServiceUnits = new HashMap<>();
int serviceUnitTombstoneCleanupCnt = 0;
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
long now = System.currentTimeMillis();
for (var etr : tableview.entrySet()) {
String serviceUnit = etr.getKey();
ServiceUnitStateData stateData = etr.getValue();
String dstBroker = stateData.dstBroker();
String srcBroker = stateData.sourceBroker();
var state = stateData.state();
if (state == Owned && (StringUtils.isBlank(dstBroker) || !activeBrokers.contains(dstBroker))) {
inactiveBrokers.add(dstBroker);
continue;
}
if (isInFlightState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
inactiveBrokers.add(srcBroker);
continue;
}
if (isInFlightState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
inactiveBrokers.add(dstBroker);
continue;
}
if (isInFlightState(state)
&& now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
timedOutInFlightStateServiceUnits.put(serviceUnit, stateData);
continue;
}
if (!isActiveState(state) && now - stateData.timestamp() > stateTombstoneDelayTimeInMillis) {
log.info("Found semi-terminal states to tombstone"
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, "
+ "cleanupErrorCnt:{}.",
serviceUnit, stateData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
}
});
serviceUnitTombstoneCleanupCnt++;
}
}
if (!inactiveBrokers.isEmpty()) {
for (String inactiveBroker : inactiveBrokers) {
handleBrokerDeletionEvent(inactiveBroker);
}
}
// timedOutInFlightStateServiceUnits are the in-flight ones although their src and dst brokers are known to
// be active.
if (!timedOutInFlightStateServiceUnits.isEmpty()) {
List<CompletableFuture<Void>> overrideFutures = new ArrayList<>();
var iter = timedOutInFlightStateServiceUnits.entrySet().iterator();
while (iter.hasNext()) {
var etr = iter.next();
var orphanServiceUnit = etr.getKey();
var orphanData = etr.getValue();
overrideFutures.add(overrideOwnership(orphanServiceUnit, orphanData, null, false));
tryWaitForOverrides(overrideFutures, !iter.hasNext());
orphanServiceUnitCleanupCnt++;
}
}
try {
tableview.flush(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
} catch (Exception e) {
log.error("Failed to flush the in-flight messages.", e);
}
boolean cleaned = false;
if (serviceUnitTombstoneCleanupCnt > 0) {
this.totalServiceUnitTombstoneCleanupCnt += serviceUnitTombstoneCleanupCnt;
cleaned = true;
}
if (orphanServiceUnitCleanupCnt > 0) {
this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt;
cleaned = true;
}
if (debug || cleaned) {
double monitorTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
log.info("Completed the ownership monitor run in {} ms. "
+ "Scheduled cleanups for inactive brokers:{}. inactiveBrokerCount:{}. "
+ "Published cleanups for orphan service units, orphanServiceUnitCleanupCnt:{}. "
+ "Tombstoned semi-terminal state service units, serviceUnitTombstoneCleanupCnt:{}. "
+ "Approximate cleanupErrorCnt:{}, metrics:{}. ",
monitorTime,
inactiveBrokers, inactiveBrokers.size(),
orphanServiceUnitCleanupCnt,
serviceUnitTombstoneCleanupCnt,
totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}
}