in managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java [2351:2514]
public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallback callback, Object ctx) {
if (isClosed()) {
callback.deleteFailed(new ManagedLedgerException
.CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}
Position newMarkDeletePosition = null;
lock.writeLock().lock();
try {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}",
ledger.getName(), name, positions, individualDeletedMessages, markDeletePosition);
}
for (Position pos : positions) {
Position position = requireNonNull(pos);
if (ledger.getLastConfirmedEntry().compareTo(position) < 0) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} "
+ "for cursor [{}]", ledger.getName(), position, ledger.getLastConfirmedEntry(), name);
}
callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
return;
}
if (internalIsMessageDeleted(position)) {
if (batchDeletedIndexes != null) {
batchDeletedIndexes.remove(position);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
}
continue;
}
long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position);
if (ackSet == null) {
if (batchDeletedIndexes != null) {
batchDeletedIndexes.remove(position);
}
// Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will
// make the RangeSet recognize the "continuity" between adjacent Positions.
// Before https://github.com/apache/pulsar/pull/21105 is merged, the range does not support crossing
// multi ledgers, so the first position's entryId maybe "-1".
Position previousPosition;
if (position.getEntryId() == 0) {
previousPosition = PositionFactory.create(position.getLedgerId(), -1);
} else {
previousPosition = ledger.getPreviousPosition(position);
}
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Individually deleted messages: {}", ledger.getName(), name,
individualDeletedMessages);
}
} else if (batchDeletedIndexes != null) {
final var givenBitSet = BitSet.valueOf(ackSet);
final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet);
if (givenBitSet != bitSet) {
bitSet.and(givenBitSet);
}
if (bitSet.isEmpty()) {
Position previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),
previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
batchDeletedIndexes.remove(position);
}
}
}
if (individualDeletedMessages.isEmpty()) {
// No changes to individually deleted messages, so nothing to do at this point
return;
}
// If the lower bound of the range set is the current mark delete position, then we can trigger a new
// mark-delete to the upper bound of the first range segment
Range<Position> range = individualDeletedMessages.firstRange();
// If the upper bound is before the mark-delete position, we need to move ahead as these
// individualDeletedMessages are now irrelevant
if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
range = individualDeletedMessages.firstRange();
}
if (range == null) {
// The set was completely cleaned up now
return;
}
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
name, range);
}
newMarkDeletePosition = range.upperEndpoint();
}
if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}
} catch (Exception e) {
log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
e.getMessage(), e);
callback.deleteFailed(getManagedLedgerException(e), ctx);
return;
} finally {
boolean empty = individualDeletedMessages.isEmpty();
lock.writeLock().unlock();
if (empty) {
callback.deleteComplete(ctx);
}
}
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null);
callback.deleteComplete(ctx);
return;
}
try {
Map<String, Long> properties = lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties
: Collections.emptyMap();
internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
callback.deleteComplete(ctx);
}
@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
callback.deleteFailed(exception, ctx);
}
}, ctx);
} catch (Exception e) {
log.warn("[{}] [{}] Error doing asyncDelete [{}]", ledger.getName(), name, e.getMessage(), e);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor asyncDelete error, counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
}
callback.deleteFailed(new ManagedLedgerException(e), ctx);
}
}