in managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java [1203:1420]
public void calculateCursorBacklogs(final TopicName topicName,
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers,
final PersistentOfflineTopicStats offlineTopicStats, boolean accurate,
BookKeeper.DigestType digestType, byte[] password) throws Exception {
if (ledgers.isEmpty()) {
return;
}
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = getMetaStore();
BookKeeper bk = getBookKeeper().get();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
final var ledgerRetryMap = new ConcurrentHashMap<String, Long>();
final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
final Position lastLedgerPosition =
PositionFactory.create(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
if (log.isDebugEnabled()) {
log.debug("[{}] Last ledger position {}", managedLedgerName, lastLedgerPosition);
}
store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback<List<String>>() {
@Override
public void operationComplete(List<String> cursors, Stat v) {
// Load existing cursors
if (log.isDebugEnabled()) {
log.debug("[{}] Found {} cursors", managedLedgerName, cursors.size());
}
if (cursors.isEmpty()) {
allCursorsCounter.countDown();
return;
}
final CountDownLatch cursorCounter = new CountDownLatch(cursors.size());
for (final String cursorName : cursors) {
// determine subscription position from cursor ledger
if (log.isDebugEnabled()) {
log.debug("[{}] Loading cursor {}", managedLedgerName, cursorName);
}
AsyncCallback.OpenCallback cursorLedgerOpenCb = (rc, lh, ctx1) -> {
long ledgerId = lh.getId();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened cursor ledger {} for cursor {}. rc={}", managedLedgerName, ledgerId,
cursorName, rc);
}
if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", managedLedgerName,
ledgerId, cursorName, BKException.getMessage(rc));
cursorCounter.countDown();
return;
}
long lac = lh.getLastAddConfirmed();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} LAC {} read from ledger {}", managedLedgerName, cursorName, lac,
ledgerId);
}
if (lac == LedgerHandle.INVALID_ENTRY_ID) {
// save the ledger id and cursor to retry outside of this call back
// since we are trying to read the same cursor ledger, we will block until
// this current callback completes, since an attempt to read the entry
// will block behind this current operation to complete
ledgerRetryMap.put(cursorName, ledgerId);
log.info("[{}] Cursor {} LAC {} read from ledger {}", managedLedgerName, cursorName, lac,
ledgerId);
cursorCounter.countDown();
return;
}
final long entryId = lac;
// read last acked message position for subscription
lh.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
Object ctx) {
try {
if (log.isDebugEnabled()) {
log.debug("readComplete rc={} entryId={}", rc, entryId);
}
if (rc != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}",
managedLedgerName, ledgerId, cursorName, BKException.getMessage(rc));
// indicate that this cursor should be excluded
offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor,
lh.getId());
} else {
LedgerEntry entry = seq.nextElement();
MLDataFormats.PositionInfo positionInfo;
try {
positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntry());
} catch (InvalidProtocolBufferException e) {
log.warn(
"[{}] Error reading position from metadata ledger {} for cursor "
+ "{}: {}", managedLedgerName, ledgerId, cursorName, e);
offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor,
lh.getId());
return;
}
final Position lastAckedMessagePosition =
PositionFactory.create(positionInfo.getLedgerId(),
positionInfo.getEntryId());
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} MD {} read last ledger position {}",
managedLedgerName, cursorName, lastAckedMessagePosition,
lastLedgerPosition);
}
// calculate cursor backlog
Range<Position> range = Range.openClosed(lastAckedMessagePosition,
lastLedgerPosition);
if (log.isDebugEnabled()) {
log.debug("[{}] Calculating backlog for cursor {} using range {}",
managedLedgerName, cursorName, range);
}
long cursorBacklog = getNumberOfEntries(range, ledgers);
offlineTopicStats.messageBacklog += cursorBacklog;
offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, lh.getId());
}
} finally {
cursorCounter.countDown();
}
}
}, null);
}; // end of cursor meta read callback
store.asyncGetCursorInfo(managedLedgerName, cursorName,
new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedCursorInfo info,
Stat stat) {
long cursorLedgerId = info.getCursorsLedgerId();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} meta-data read ledger id {}", managedLedgerName,
cursorName, cursorLedgerId);
}
if (cursorLedgerId != -1) {
bk.asyncOpenLedgerNoRecovery(cursorLedgerId, digestType, password,
cursorLedgerOpenCb, null);
} else {
Position lastAckedMessagePosition = PositionFactory.create(
info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
Range<Position> range = Range.openClosed(lastAckedMessagePosition,
lastLedgerPosition);
if (log.isDebugEnabled()) {
log.debug("[{}] Calculating backlog for cursor {} using range {}",
managedLedgerName, cursorName, range);
}
long cursorBacklog = getNumberOfEntries(range, ledgers);
offlineTopicStats.messageBacklog += cursorBacklog;
offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, cursorLedgerId);
cursorCounter.countDown();
}
}
@Override
public void operationFailed(ManagedLedgerException.MetaStoreException e) {
log.warn("[{}] Unable to obtain cursor ledger for cursor {}: {}", managedLedgerName,
cursorName, e);
cursorCounter.countDown();
}
});
} // for every cursor find backlog
try {
if (accurate) {
cursorCounter.await();
} else {
cursorCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
} catch (Exception e) {
log.warn("[{}] Error reading subscription positions{}", managedLedgerName, e);
} finally {
allCursorsCounter.countDown();
}
}
@Override
public void operationFailed(ManagedLedgerException.MetaStoreException e) {
log.warn("[{}] Failed to get the cursors list", managedLedgerName, e);
allCursorsCounter.countDown();
}
});
if (accurate) {
allCursorsCounter.await();
} else {
allCursorsCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
// go through ledgers where LAC was -1
if (accurate && ledgerRetryMap.size() > 0) {
ledgerRetryMap.forEach((cursorName, ledgerId) -> {
if (log.isDebugEnabled()) {
log.debug("Cursor {} Ledger {} Trying to obtain MD from BkAdmin", cursorName, ledgerId);
}
Position lastAckedMessagePosition = tryGetMDPosition(bk, ledgerId, cursorName);
if (lastAckedMessagePosition == null) {
log.warn("[{}] Cursor {} read from ledger {}. Unable to determine cursor position",
managedLedgerName, cursorName, ledgerId);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} read from ledger using bk admin {}. position {}", managedLedgerName,
cursorName, ledgerId, lastAckedMessagePosition);
}
// calculate cursor backlog
Range<Position> range = Range.openClosed(lastAckedMessagePosition, lastLedgerPosition);
if (log.isDebugEnabled()) {
log.debug("[{}] Calculating backlog for cursor {} using range {}", managedLedgerName,
cursorName, range);
}
long cursorBacklog = getNumberOfEntries(range, ledgers);
offlineTopicStats.messageBacklog += cursorBacklog;
offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, ledgerId);
}
});
}
}