in managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java [2719:2930]
void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
if (!factory.isMetadataServiceAvailable()) {
// Defer trimming of ledger if we cannot connect to metadata service
promise.completeExceptionally(new MetaStoreException("Metadata service is not available"));
return;
}
// Ensure only one trimming operation is active
if (!trimmerMutex.tryLock()) {
scheduleDeferredTrimming(isTruncate, promise);
return;
}
List<LedgerInfo> ledgersToDelete = new ArrayList<>();
List<LedgerInfo> offloadedLedgersToDelete = new ArrayList<>();
Optional<OffloadPolicies> optionalOffloadPolicies = getOffloadPoliciesIfAppendable();
synchronized (this) {
if (log.isDebugEnabled()) {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
TOTAL_SIZE_UPDATER.get(this));
}
State currentState = STATE_UPDATER.get(this);
if (currentState == State.Closed) {
log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name);
trimmerMutex.unlock();
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger"));
return;
}
// Allow for FencedForDeletion
if (currentState == State.Fenced) {
log.debug("[{}] Ignoring trimming request since the managed ledger was already fenced", name);
trimmerMutex.unlock();
promise.completeExceptionally(new ManagedLedgerFencedException("Can't trim fenced ledger"));
return;
}
long slowestReaderLedgerId = -1;
final LazyLoadableValue<Long> slowestNonDurationLedgerId =
new LazyLoadableValue(() -> getTheSlowestNonDurationReadPosition().getLedgerId());
final long retentionSizeInMB = config.getRetentionSizeInMB();
final long retentionTimeMs = config.getRetentionTimeMillis();
final long totalSizeOfML = TOTAL_SIZE_UPDATER.get(this);
if (!cursors.hasDurableCursors()) {
// At this point the lastLedger will be pointing to the
// ledger that has just been closed, therefore the +1 to
// include lastLedger in the trimming.
slowestReaderLedgerId = currentLedger.getId() + 1;
} else {
Position slowestReaderPosition = cursors.getSlowestReaderPosition();
if (slowestReaderPosition != null) {
// The slowest reader position is the mark delete position.
// If the slowest reader position point the last entry in the ledger x,
// the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted.
LedgerInfo ledgerInfo = ledgers.get(slowestReaderPosition.getLedgerId());
if (ledgerInfo != null && ledgerInfo.getLedgerId() != currentLedger.getId()
&& ledgerInfo.getEntries() == slowestReaderPosition.getEntryId() + 1) {
slowestReaderLedgerId = slowestReaderPosition.getLedgerId() + 1;
} else {
slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
}
} else {
promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position"));
trimmerMutex.unlock();
return;
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
}
long totalSizeToDelete = 0;
// skip ledger if retention constraint met
Iterator<LedgerInfo> ledgerInfoIterator =
ledgers.headMap(slowestReaderLedgerId, false).values().iterator();
while (ledgerInfoIterator.hasNext()){
LedgerInfo ls = ledgerInfoIterator.next();
// currentLedger can not be deleted
if (ls.getLedgerId() == currentLedger.getId()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
ls.getLedgerId());
}
break;
}
// if truncate, all ledgers besides currentLedger are going to be deleted
if (isTruncate) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} will be truncated with ts {}",
name, ls.getLedgerId(), ls.getTimestamp());
}
ledgersToDelete.add(ls);
continue;
}
totalSizeToDelete += ls.getSize();
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(retentionSizeInMB, totalSizeOfML,
totalSizeToDelete);
boolean expired = hasLedgerRetentionExpired(retentionTimeMs, ls.getTimestamp());
if (log.isDebugEnabled()) {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- "
+ "expired: {} -- over-quota: {} -- current-ledger: {}",
name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
overRetentionQuota, currentLedger.getId());
}
if (expired || overRetentionQuota) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} has expired or over quota, expired is: {}, ts: {}, "
+ "overRetentionQuota is: {}, ledge size: {}",
name, ls.getLedgerId(), expired, ls.getTimestamp(), overRetentionQuota, ls.getSize());
}
ledgersToDelete.add(ls);
} else {
// once retention constraint has been met, skip check
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
}
releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue());
break;
}
}
while (ledgerInfoIterator.hasNext()) {
LedgerInfo ls = ledgerInfoIterator.next();
if (!releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue())) {
break;
}
}
for (LedgerInfo ls : ledgers.values()) {
if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies)
&& !ledgersToDelete.contains(ls)) {
log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
ls.getLedgerId());
offloadedLedgersToDelete.add(ls);
}
}
if (ledgersToDelete.isEmpty() && offloadedLedgersToDelete.isEmpty()) {
trimmerMutex.unlock();
promise.complete(null);
return;
}
if (currentState == State.CreatingLedger // Give up now and schedule a new trimming
|| !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
scheduleDeferredTrimming(isTruncate, promise);
trimmerMutex.unlock();
return;
}
try {
advanceCursorsIfNecessary(ledgersToDelete);
} catch (LedgerNotExistException e) {
log.info("First non deleted Ledger is not found, stop trimming");
metadataMutex.unlock();
trimmerMutex.unlock();
return;
}
doDeleteLedgers(ledgersToDelete);
for (LedgerInfo ls : offloadedLedgersToDelete) {
LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
String driverName = OffloadUtils.getOffloadDriverName(ls,
config.getLedgerOffloader().getOffloadDriverName());
Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
config.getLedgerOffloader().getOffloadDriverMetadata());
OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
}
if (log.isDebugEnabled()) {
log.debug("[{}] Updating of ledgers list after trimming", name);
}
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
ledgersStat = stat;
metadataMutex.unlock();
trimmerMutex.unlock();
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
asyncDeleteLedger(ls.getLedgerId(), ls);
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
ls.getSize());
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
}
promise.complete(null);
}
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
metadataMutex.unlock();
trimmerMutex.unlock();
handleBadVersion(e);
promise.completeExceptionally(e);
}
});
}
}