protected void monitorOwnerships()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java [1648:1781]


    protected void monitorOwnerships(List<String> brokers) {
        try {
            if (!isChannelOwner()) {
                log.warn("This broker is not the leader now. Skipping ownership monitor.");
                return;
            }
        } catch (Exception e) {
            log.error("Failed to monitor ownerships", e);
            return;
        }

        if (brokers == null || brokers.size() == 0) {
            log.error("no active brokers found. Skipping ownership monitor.");
            return;
        }

        var metadataState = getMetadataState();
        if (metadataState != Stable) {
            log.warn("metadata state:{} is not Stable. Skipping ownership monitor.", metadataState);
            return;
        }

        var debug = debug();
        if (debug) {
            log.info("Started the ownership monitor run for activeBrokerCount:{}", brokers.size());
        }

        long startTime = System.nanoTime();
        Set<String> inactiveBrokers = new HashSet<>();
        Set<String> activeBrokers = new HashSet<>(brokers);
        Map<String, ServiceUnitStateData> timedOutInFlightStateServiceUnits = new HashMap<>();
        int serviceUnitTombstoneCleanupCnt = 0;
        int orphanServiceUnitCleanupCnt = 0;
        long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
        long now = System.currentTimeMillis();
        for (var etr : tableview.entrySet()) {
            String serviceUnit = etr.getKey();
            ServiceUnitStateData stateData = etr.getValue();
            String dstBroker = stateData.dstBroker();
            String srcBroker = stateData.sourceBroker();
            var state = stateData.state();

            if (state == Owned && (StringUtils.isBlank(dstBroker) || !activeBrokers.contains(dstBroker))) {
                inactiveBrokers.add(dstBroker);
                continue;
            }

            if (isInFlightState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
                inactiveBrokers.add(srcBroker);
                continue;
            }
            if (isInFlightState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
                inactiveBrokers.add(dstBroker);
                continue;
            }

            if (isInFlightState(state)
                    && now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
                timedOutInFlightStateServiceUnits.put(serviceUnit, stateData);
                continue;
            }


            if (!isActiveState(state) && now - stateData.timestamp() > stateTombstoneDelayTimeInMillis) {
                log.info("Found semi-terminal states to tombstone"
                        + " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
                tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
                    if (e != null) {
                        log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, "
                                        + "cleanupErrorCnt:{}.",
                                serviceUnit, stateData,
                                totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
                    }
                });
                serviceUnitTombstoneCleanupCnt++;
            }
        }


        if (!inactiveBrokers.isEmpty()) {
            for (String inactiveBroker : inactiveBrokers) {
                handleBrokerDeletionEvent(inactiveBroker);
            }
        }

        // timedOutInFlightStateServiceUnits are the in-flight ones although their src and dst brokers are known to
        // be active.
        if (!timedOutInFlightStateServiceUnits.isEmpty()) {
            List<CompletableFuture<Void>> overrideFutures = new ArrayList<>();
            var iter = timedOutInFlightStateServiceUnits.entrySet().iterator();
            while (iter.hasNext()) {
                var etr = iter.next();
                var orphanServiceUnit = etr.getKey();
                var orphanData = etr.getValue();
                overrideFutures.add(overrideOwnership(orphanServiceUnit, orphanData, null, false));
                tryWaitForOverrides(overrideFutures, !iter.hasNext());
                orphanServiceUnitCleanupCnt++;
            }
        }

        try {
            tableview.flush(OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
        } catch (Exception e) {
            log.error("Failed to flush the in-flight messages.", e);
        }

        boolean cleaned = false;
        if (serviceUnitTombstoneCleanupCnt > 0) {
            this.totalServiceUnitTombstoneCleanupCnt += serviceUnitTombstoneCleanupCnt;
            cleaned = true;
        }

        if (orphanServiceUnitCleanupCnt > 0) {
            this.totalOrphanServiceUnitCleanupCnt += orphanServiceUnitCleanupCnt;
            cleaned = true;
        }

        if (debug || cleaned) {
            double monitorTime = TimeUnit.NANOSECONDS
                    .toMillis((System.nanoTime() - startTime));
            log.info("Completed the ownership monitor run in {} ms. "
                            + "Scheduled cleanups for inactive brokers:{}. inactiveBrokerCount:{}. "
                            + "Published cleanups for orphan service units, orphanServiceUnitCleanupCnt:{}. "
                            + "Tombstoned semi-terminal state service units, serviceUnitTombstoneCleanupCnt:{}. "
                            + "Approximate cleanupErrorCnt:{}, metrics:{}. ",
                    monitorTime,
                    inactiveBrokers, inactiveBrokers.size(),
                    orphanServiceUnitCleanupCnt,
                    serviceUnitTombstoneCleanupCnt,
                    totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
                    printCleanupMetrics());
        }

    }