in bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java [374:484]
public void accept(Versioned<LedgerMetadata> metadataVer, Throwable exception) {
if (exception != null) {
if (BKException
.getExceptionCode(exception) == BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring replicas check of already deleted ledger {}",
ledgerInRange);
}
mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
return;
} else {
LOG.warn("Unable to read the ledger: {} information", ledgerInRange, exception);
mcbForThisLedgerRange.processResult(BKException.getExceptionCode(exception), null, null);
return;
}
}
LedgerMetadata metadata = metadataVer.getValue();
if (!metadata.isClosed()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} is not yet closed, "
+ "so skipping the replicas check analysis for now",
ledgerInRange);
}
mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
return;
}
final long lastEntryId = metadata.getLastEntryId();
if (lastEntryId == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} is closed but it doesn't has any entries, "
+ "so skipping the replicas check", ledgerInRange);
}
mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
return;
}
int writeQuorumSize = metadata.getWriteQuorumSize();
int ackQuorumSize = metadata.getAckQuorumSize();
int ensembleSize = metadata.getEnsembleSize();
RoundRobinDistributionSchedule distributionSchedule = new RoundRobinDistributionSchedule(writeQuorumSize,
ackQuorumSize, ensembleSize);
List<Entry<Long, ? extends List<BookieId>>> segments = new LinkedList<>(
metadata.getAllEnsembles().entrySet());
/*
* since there are multiple segments, MultiCallback should be
* created for (ensembleSize * segments.size()) calls.
*/
MultiCallback mcbForThisLedger = new MultiCallback(ensembleSize * segments.size(),
mcbForThisLedgerRange, null, BKException.Code.OK, BKException.Code.ReadException);
HashMap<BookieId, List<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoMap =
new HashMap<BookieId, List<BookieExpectedToContainSegmentInfo>>();
for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
final Entry<Long, ? extends List<BookieId>> segmentEnsemble = segments.get(segmentNum);
final List<BookieId> ensembleOfSegment = segmentEnsemble.getValue();
final long startEntryIdOfSegment = segmentEnsemble.getKey();
final boolean lastSegment = (segmentNum == (segments.size() - 1));
final long lastEntryIdOfSegment = lastSegment ? lastEntryId
: segments.get(segmentNum + 1).getKey() - 1;
/*
* Segment can be empty. If last segment is empty, then
* startEntryIdOfSegment of it will be greater than lastEntryId
* of the ledger. If the segment in middle is empty, then its
* startEntry will be same as startEntry of the following
* segment.
*/
final boolean emptySegment = lastSegment ? (startEntryIdOfSegment > lastEntryId)
: (startEntryIdOfSegment == segments.get(segmentNum + 1).getKey());
for (int bookieIndex = 0; bookieIndex < ensembleOfSegment.size(); bookieIndex++) {
final BookieId bookieInEnsemble = ensembleOfSegment.get(bookieIndex);
final BitSet entriesStripedToThisBookie = emptySegment ? EMPTY_BITSET
: distributionSchedule.getEntriesStripedToTheBookie(bookieIndex, startEntryIdOfSegment,
lastEntryIdOfSegment);
if (entriesStripedToThisBookie.cardinality() == 0) {
/*
* if no entry is expected to contain in this bookie,
* then there is no point in making
* getListOfEntriesOfLedger call for this bookie. So
* instead callback with success result.
*/
if (LOG.isDebugEnabled()) {
LOG.debug(
"For ledger: {}, in Segment: {}, no entry is expected to contain in"
+ " this bookie: {}. So skipping getListOfEntriesOfLedger call",
ledgerInRange, segmentEnsemble, bookieInEnsemble);
}
mcbForThisLedger.processResult(BKException.Code.OK, null, null);
continue;
}
List<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = bookiesSegmentInfoMap
.get(bookieInEnsemble);
if (bookieSegmentInfoList == null) {
bookieSegmentInfoList = new ArrayList<BookieExpectedToContainSegmentInfo>();
bookiesSegmentInfoMap.put(bookieInEnsemble, bookieSegmentInfoList);
}
bookieSegmentInfoList.add(new BookieExpectedToContainSegmentInfo(startEntryIdOfSegment,
lastEntryIdOfSegment, segmentEnsemble, entriesStripedToThisBookie));
}
}
for (Entry<BookieId, List<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoTuple :
bookiesSegmentInfoMap.entrySet()) {
final BookieId bookieInEnsemble = bookiesSegmentInfoTuple.getKey();
final List<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = bookiesSegmentInfoTuple
.getValue();
admin.asyncGetListOfEntriesOfLedger(bookieInEnsemble, ledgerInRange)
.whenComplete(new GetListOfEntriesOfLedgerCallbackForReplicasCheck(ledgerInRange, ensembleSize,
writeQuorumSize, ackQuorumSize, bookieInEnsemble, bookieSegmentInfoList,
ledgersWithMissingEntries, ledgersWithUnavailableBookies, mcbForThisLedger));
}
}