static int internalEstimateEntryCountByBytesSize()

in managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCountEstimator.java [70:185]


    static int internalEstimateEntryCountByBytesSize(int maxEntries, long maxSizeBytes, Position readPosition,
                                                     NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
                                                             ledgersInfo,
                                                     Long lastLedgerId, long lastLedgerTotalEntries,
                                                     long lastLedgerTotalSize) {
        if (maxSizeBytes <= 0) {
            // If the specified maximum size is invalid (e.g., non-positive), return 1
            return 1;
        }

        // If the maximum size is Long.MAX_VALUE, return the maximum number of entries
        if (maxSizeBytes == Long.MAX_VALUE) {
            return maxEntries;
        }

        // Adjust the read position to ensure it falls within the valid range of available ledgers.
        // This handles special cases such as EARLIEST and LATEST positions by resetting them
        // to the first available ledger or the last active ledger, respectively.
        if (lastLedgerId != null && readPosition.getLedgerId() > lastLedgerId.longValue()) {
            readPosition = PositionFactory.create(lastLedgerId, Math.max(lastLedgerTotalEntries - 1, 0));
        } else if (lastLedgerId == null && readPosition.getLedgerId() > ledgersInfo.lastKey()) {
            Map.Entry<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> lastEntry = ledgersInfo.lastEntry();
            readPosition =
                    PositionFactory.create(lastEntry.getKey(), Math.max(lastEntry.getValue().getEntries() - 1, 0));
        } else if (readPosition.getLedgerId() < ledgersInfo.firstKey()) {
            readPosition = PositionFactory.create(ledgersInfo.firstKey(), 0);
        }

        long estimatedEntryCount = 0;
        long remainingBytesSize = maxSizeBytes;
        long currentAvgSize = 0;
        // Get a collection of ledger info starting from the read position
        Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersAfterReadPosition =
                ledgersInfo.tailMap(readPosition.getLedgerId(), true).values();

        // calculate the estimated entry count based on the remaining bytes and ledger metadata
        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersAfterReadPosition) {
            if (remainingBytesSize <= 0 || estimatedEntryCount >= maxEntries) {
                // Stop processing if there are no more bytes remaining to allocate for entries
                // or if the estimated entry count exceeds the maximum allowed entries
                break;
            }
            long ledgerId = ledgerInfo.getLedgerId();
            long ledgerTotalSize = ledgerInfo.getSize();
            long ledgerTotalEntries = ledgerInfo.getEntries();

            // Adjust ledger size and total entry count if this is the last active ledger since the
            // ledger metadata doesn't include the current ledger's size and entry count
            // the lastLedgerId is null in ReadOnlyManagedLedgerImpl
            if (lastLedgerId != null && ledgerId == lastLedgerId.longValue()
                    && lastLedgerTotalSize > 0 && lastLedgerTotalEntries > 0) {
                ledgerTotalSize = lastLedgerTotalSize;
                ledgerTotalEntries = lastLedgerTotalEntries;
            }

            // Skip processing ledgers that have no entries or size
            if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
                continue;
            }

            // Update the average entry size based on the current ledger's size and entry count
            currentAvgSize = Math.max(1, ledgerTotalSize / ledgerTotalEntries)
                    + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;

            // Calculate the total size of this ledger, inclusive of bookkeeping overhead per entry
            long ledgerTotalSizeWithBkOverhead =
                    ledgerTotalSize + ledgerTotalEntries * BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;

            // If the remaining bytes are insufficient to read the full ledger, estimate the readable entries
            // or when the read position is beyond the first entry in the ledger
            if (remainingBytesSize < ledgerTotalSizeWithBkOverhead
                    || readPosition.getLedgerId() == ledgerId && readPosition.getEntryId() > 0) {
                long entryCount;
                if (readPosition.getLedgerId() == ledgerId && readPosition.getEntryId() > 0) {
                    entryCount = Math.max(ledgerTotalEntries - readPosition.getEntryId(), 1);
                } else {
                    entryCount = ledgerTotalEntries;
                }
                // Estimate how many entries can fit within the remaining bytes
                long entriesToRead = Math.min(Math.max(1, remainingBytesSize / currentAvgSize), entryCount);
                estimatedEntryCount += entriesToRead;
                remainingBytesSize -= entriesToRead * currentAvgSize;
            } else {
                // If the full ledger can be read, add all its entries to the count and reduce its size
                estimatedEntryCount += ledgerTotalEntries;
                remainingBytesSize -= ledgerTotalSizeWithBkOverhead;
            }
        }

        // Add any remaining bytes to the estimated entry count considering the current average entry size
        if (remainingBytesSize > 0 && estimatedEntryCount < maxEntries) {
            // need to find the previous non-empty ledger to find the average size
            if (currentAvgSize == 0) {
                Collection<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersBeforeReadPosition =
                        ledgersInfo.headMap(readPosition.getLedgerId(), false).descendingMap().values();
                for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgersBeforeReadPosition) {
                    long ledgerTotalSize = ledgerInfo.getSize();
                    long ledgerTotalEntries = ledgerInfo.getEntries();
                    // Skip processing ledgers that have no entries or size
                    if (ledgerTotalEntries == 0 || ledgerTotalSize == 0) {
                        continue;
                    }
                    // Update the average entry size based on the current ledger's size and entry count
                    currentAvgSize = Math.max(1, ledgerTotalSize / ledgerTotalEntries)
                            + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
                    break;
                }
            }
            if (currentAvgSize > 0) {
                estimatedEntryCount += remainingBytesSize / currentAvgSize;
            }
        }

        // Ensure at least one entry is always returned as the result
        return Math.max((int) Math.min(estimatedEntryCount, maxEntries), 1);
    }