public void asyncDelete()

in managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java [2351:2514]


    public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallback callback, Object ctx) {
        if (isClosed()) {
            callback.deleteFailed(new ManagedLedgerException
                    .CursorAlreadyClosedException("Cursor was already closed"), ctx);
            return;
        }

        Position newMarkDeletePosition = null;

        lock.writeLock().lock();

        try {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
                        ledger.getName(), name, positions, individualDeletedMessages, markDeletePosition);
            }

            for (Position pos : positions) {
                Position position  = requireNonNull(pos);
                if (ledger.getLastConfirmedEntry().compareTo(position) < 0) {
                    if (log.isDebugEnabled()) {
                        log.debug(
                            "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} "
                            + "for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
                    }
                    callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
                    return;
                }

                if (internalIsMessageDeleted(position)) {
                    if (batchDeletedIndexes != null) {
                        batchDeletedIndexes.remove(position);
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
                    }
                    continue;
                }
                long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position);
                if (ackSet == null) {
                    if (batchDeletedIndexes != null) {
                        batchDeletedIndexes.remove(position);
                    }
                    // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
                    // make the RangeSet recognize the "continuity" between adjacent Positions.
                    // Before https://github.com/apache/pulsar/pull/21105 is merged, the range does not support crossing
                    // multi ledgers, so the first position's entryId maybe "-1".
                    Position previousPosition;
                    if (position.getEntryId() == 0) {
                        previousPosition = PositionFactory.create(position.getLedgerId(), -1);
                    } else {
                        previousPosition = ledger.getPreviousPosition(position);
                    }
                    individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
                        previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
                    MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);

                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
                            individualDeletedMessages);
                    }
                } else if (batchDeletedIndexes != null) {
                    final var givenBitSet = BitSet.valueOf(ackSet);
                    final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet);
                    if (givenBitSet != bitSet) {
                        bitSet.and(givenBitSet);
                    }
                    if (bitSet.isEmpty()) {
                        Position previousPosition = ledger.getPreviousPosition(position);
                        individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
                            previousPosition.getEntryId(),
                            position.getLedgerId(), position.getEntryId());
                        MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
                        batchDeletedIndexes.remove(position);
                    }
                }
            }

            if (individualDeletedMessages.isEmpty()) {
                // No changes to individually deleted messages, so nothing to do at this point
                return;
            }

            // If the lower bound of the range set is the current mark delete position, then we can trigger a new
            // mark-delete to the upper bound of the first range segment
            Range<Position> range = individualDeletedMessages.firstRange();

            // If the upper bound is before the mark-delete position, we need to move ahead as these
            // individualDeletedMessages are now irrelevant
            if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
                individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
                        markDeletePosition.getEntryId());
                range = individualDeletedMessages.firstRange();
            }

            if (range == null) {
                // The set was completely cleaned up now
                return;
            }

            // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
            if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
                    .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {

                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
                            name, range);
                }

                newMarkDeletePosition = range.upperEndpoint();
            }

            if (newMarkDeletePosition != null) {
                newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
            } else {
                newMarkDeletePosition = markDeletePosition;
            }
        } catch (Exception e) {
            log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
                    e.getMessage(), e);
            callback.deleteFailed(getManagedLedgerException(e), ctx);
            return;
        } finally {
            boolean empty = individualDeletedMessages.isEmpty();
            lock.writeLock().unlock();
            if (empty) {
                callback.deleteComplete(ctx);
            }
        }

        // Apply rate limiting to mark-delete operations
        if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
            isDirty = true;
            updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null);
            callback.deleteComplete(ctx);
            return;
        }

        try {
            Map<String, Long> properties = lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties
                    : Collections.emptyMap();

            internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() {
                @Override
                public void markDeleteComplete(Object ctx) {
                    callback.deleteComplete(ctx);
                }

                @Override
                public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                    callback.deleteFailed(exception, ctx);
                }

            }, ctx);

        } catch (Exception e) {
            log.warn("[{}] [{}] Error doing asyncDelete [{}]", ledger.getName(), name, e.getMessage(), e);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer {} cursor asyncDelete error, counters: consumed {} mdPos {} rdPos {}",
                        ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
            }
            callback.deleteFailed(new ManagedLedgerException(e), ctx);
        }
    }