public void accept()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java [374:484]


        public void accept(Versioned<LedgerMetadata> metadataVer, Throwable exception) {
            if (exception != null) {
                if (BKException
                        .getExceptionCode(exception) == BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Ignoring replicas check of already deleted ledger {}",
                                ledgerInRange);
                    }
                    mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
                    return;
                } else {
                    LOG.warn("Unable to read the ledger: {} information", ledgerInRange, exception);
                    mcbForThisLedgerRange.processResult(BKException.getExceptionCode(exception), null, null);
                    return;
                }
            }

            LedgerMetadata metadata = metadataVer.getValue();
            if (!metadata.isClosed()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Ledger: {} is not yet closed, "
                                    + "so skipping the replicas check analysis for now",
                            ledgerInRange);
                }
                mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
                return;
            }

            final long lastEntryId = metadata.getLastEntryId();
            if (lastEntryId == -1) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Ledger: {} is closed but it doesn't has any entries, "
                            + "so skipping the replicas check", ledgerInRange);
                }
                mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
                return;
            }

            int writeQuorumSize = metadata.getWriteQuorumSize();
            int ackQuorumSize = metadata.getAckQuorumSize();
            int ensembleSize = metadata.getEnsembleSize();
            RoundRobinDistributionSchedule distributionSchedule = new RoundRobinDistributionSchedule(writeQuorumSize,
                    ackQuorumSize, ensembleSize);
            List<Entry<Long, ? extends List<BookieId>>> segments = new LinkedList<>(
                    metadata.getAllEnsembles().entrySet());
            /*
             * since there are multiple segments, MultiCallback should be
             * created for (ensembleSize * segments.size()) calls.
             */
            MultiCallback mcbForThisLedger = new MultiCallback(ensembleSize * segments.size(),
                    mcbForThisLedgerRange, null, BKException.Code.OK, BKException.Code.ReadException);
            HashMap<BookieId, List<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoMap =
                    new HashMap<BookieId, List<BookieExpectedToContainSegmentInfo>>();
            for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
                final Entry<Long, ? extends List<BookieId>> segmentEnsemble = segments.get(segmentNum);
                final List<BookieId> ensembleOfSegment = segmentEnsemble.getValue();
                final long startEntryIdOfSegment = segmentEnsemble.getKey();
                final boolean lastSegment = (segmentNum == (segments.size() - 1));
                final long lastEntryIdOfSegment = lastSegment ? lastEntryId
                        : segments.get(segmentNum + 1).getKey() - 1;
                /*
                 * Segment can be empty. If last segment is empty, then
                 * startEntryIdOfSegment of it will be greater than lastEntryId
                 * of the ledger. If the segment in middle is empty, then its
                 * startEntry will be same as startEntry of the following
                 * segment.
                 */
                final boolean emptySegment = lastSegment ? (startEntryIdOfSegment > lastEntryId)
                        : (startEntryIdOfSegment == segments.get(segmentNum + 1).getKey());
                for (int bookieIndex = 0; bookieIndex < ensembleOfSegment.size(); bookieIndex++) {
                    final BookieId bookieInEnsemble = ensembleOfSegment.get(bookieIndex);
                    final BitSet entriesStripedToThisBookie = emptySegment ? EMPTY_BITSET
                            : distributionSchedule.getEntriesStripedToTheBookie(bookieIndex, startEntryIdOfSegment,
                            lastEntryIdOfSegment);
                    if (entriesStripedToThisBookie.cardinality() == 0) {
                        /*
                         * if no entry is expected to contain in this bookie,
                         * then there is no point in making
                         * getListOfEntriesOfLedger call for this bookie. So
                         * instead callback with success result.
                         */
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(
                                    "For ledger: {}, in Segment: {}, no entry is expected to contain in"
                                            + " this bookie: {}. So skipping getListOfEntriesOfLedger call",
                                    ledgerInRange, segmentEnsemble, bookieInEnsemble);
                        }
                        mcbForThisLedger.processResult(BKException.Code.OK, null, null);
                        continue;
                    }
                    List<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = bookiesSegmentInfoMap
                            .get(bookieInEnsemble);
                    if (bookieSegmentInfoList == null) {
                        bookieSegmentInfoList = new ArrayList<BookieExpectedToContainSegmentInfo>();
                        bookiesSegmentInfoMap.put(bookieInEnsemble, bookieSegmentInfoList);
                    }
                    bookieSegmentInfoList.add(new BookieExpectedToContainSegmentInfo(startEntryIdOfSegment,
                            lastEntryIdOfSegment, segmentEnsemble, entriesStripedToThisBookie));
                }
            }
            for (Entry<BookieId, List<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoTuple :
                    bookiesSegmentInfoMap.entrySet()) {
                final BookieId bookieInEnsemble = bookiesSegmentInfoTuple.getKey();
                final List<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = bookiesSegmentInfoTuple
                        .getValue();
                admin.asyncGetListOfEntriesOfLedger(bookieInEnsemble, ledgerInRange)
                        .whenComplete(new GetListOfEntriesOfLedgerCallbackForReplicasCheck(ledgerInRange, ensembleSize,
                                writeQuorumSize, ackQuorumSize, bookieInEnsemble, bookieSegmentInfoList,
                                ledgersWithMissingEntries, ledgersWithUnavailableBookies, mcbForThisLedger));
            }
        }