in managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java [70:185]
static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgersInfo,
Long lastLedgerId, long lastLedgerTotalEntries,
long lastLedgerTotalSize) {
if (maxSizeBytes <= 0) {
// If the specified maximum size is invalid (e.g., non-positive), return 1
return 1;
}
// If the maximum size is Long.MAX_VALUE, return the maximum number of entries
if (maxSizeBytes == Long.MAX_VALUE) {
return maxEntries;
}
// Adjust the read position to ensure it falls within the valid range of available ledgers.
// This handles special cases such as EARLIEST and LATEST positions by resetting them
// to the first available ledger or the last active ledger, respectively.
if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
readPosition = PositionFactory.create(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
} else if (lastLedgerId == null && readPosition.getLedgerId() > ledgersInfo.lastKey()) {
Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
readPosition =
PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
} else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
readPosition = PositionFactory.create(ledgersInfo.firstKey(), 0);
}
long estimatedEntryCount = 0;
long remainingBytesSize = maxSizeBytes;
long currentAvgSize = 0;
// Get a collection of ledger info starting from the read position
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();
// calculate the estimated entry count based on the remaining bytes and ledger metadata
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
if (remainingBytesSize <= 0 || estimatedEntryCount >= maxEntries) {
// Stop processing if there are no more bytes remaining to allocate for entries
// or if the estimated entry count exceeds the maximum allowed entries
break;
}
long ledgerId = ledgerInfo.getLedgerId();
long ledgerTotalSize = ledgerInfo.getSize();
long ledgerTotalEntries = ledgerInfo.getEntries();
// Adjust ledger size and total entry count if this is the last active ledger since the
// ledger metadata doesn't include the current ledger's size and entry count
// the lastLedgerId is null in ReadOnlyManagedLedgerImpl
if (lastLedgerId != null && ledgerId == lastLedgerId.longValue()
&& lastLedgerTotalSize > 0 && lastLedgerTotalEntries > 0) {
ledgerTotalSize = lastLedgerTotalSize;
ledgerTotalEntries = lastLedgerTotalEntries;
}
// Skip processing ledgers that have no entries or size
if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
continue;
}
// Update the average entry size based on the current ledger's size and entry count
currentAvgSize = Math.max(1, ledgerTotalSize / ledgerTotalEntries)
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
// Calculate the total size of this ledger, inclusive of bookkeeping overhead per entry
long ledgerTotalSizeWithBkOverhead =
ledgerTotalSize + ledgerTotalEntries * BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
// If the remaining bytes are insufficient to read the full ledger, estimate the readable entries
// or when the read position is beyond the first entry in the ledger
if (remainingBytesSize < ledgerTotalSizeWithBkOverhead
|| readPosition.getLedgerId() == ledgerId && readPosition.getEntryId() > 0) {
long entryCount;
if (readPosition.getLedgerId() == ledgerId && readPosition.getEntryId() > 0) {
entryCount = Math.max(ledgerTotalEntries - readPosition.getEntryId(), 1);
} else {
entryCount = ledgerTotalEntries;
}
// Estimate how many entries can fit within the remaining bytes
long entriesToRead = Math.min(Math.max(1, remainingBytesSize / currentAvgSize), entryCount);
estimatedEntryCount += entriesToRead;
remainingBytesSize -= entriesToRead * currentAvgSize;
} else {
// If the full ledger can be read, add all its entries to the count and reduce its size
estimatedEntryCount += ledgerTotalEntries;
remainingBytesSize -= ledgerTotalSizeWithBkOverhead;
}
}
// Add any remaining bytes to the estimated entry count considering the current average entry size
if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
// need to find the previous non-empty ledger to find the average size
if (currentAvgSize == 0) {
Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersBeforeReadPosition =
ledgersInfo.headMap(readPosition.getLedgerId(), false).descendingMap().values();
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
long ledgerTotalSize = ledgerInfo.getSize();
long ledgerTotalEntries = ledgerInfo.getEntries();
// Skip processing ledgers that have no entries or size
if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
continue;
}
// Update the average entry size based on the current ledger's size and entry count
currentAvgSize = Math.max(1, ledgerTotalSize / ledgerTotalEntries)
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
break;
}
}
if (currentAvgSize > 0) {
estimatedEntryCount += remainingBytesSize / currentAvgSize;
}
}
// Ensure at least one entry is always returned as the result
return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
}