in oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java [206:349]
public void purgeBlobsDeleted(long before, @NotNull GarbageCollectableBlobStore blobStore) {
cancelled = false;
long start = clock.getTime();
LOG.info("Starting purge of blobs deleted before {}", before);
long numBlobsDeleted = 0;
long numChunksDeleted = 0;
File idTempDeleteFile = null;
BufferedWriter idTempDeleteWriter = null;
// If blob store support blob tracking
boolean blobIdsTracked = blobStore instanceof BlobTrackingStore;
if (blobIdsTracked) {
try {
idTempDeleteFile = File.createTempFile("idTempDelete", null, rootDirectory);
idTempDeleteWriter = Files.newWriter(idTempDeleteFile, Charsets.UTF_8);
} catch (Exception e) {
LOG.warn("Unable to open a writer to a temp file, will ignore tracker sync");
blobIdsTracked = false;
}
}
long lastCheckedBlobTimestamp = readLastCheckedBlobTimestamp();
long lastDeletedBlobTimestamp = lastCheckedBlobTimestamp;
String currInUseFileName = deletedBlobsFileWriter.inUseFileName;
deletedBlobsFileWriter.releaseInUseFile();
for (File deletedBlobListFile : FileUtils.listFiles(rootDirectory, blobFileNameFilter, null)) {
if (cancelled) {
break;
}
if (deletedBlobListFile.getName().equals(deletedBlobsFileWriter.inUseFileName)) {
continue;
}
LOG.debug("Purging blobs from {}", deletedBlobListFile);
long timestamp;
try {
timestamp = getTimestampFromBlobFileName(deletedBlobListFile.getName());
} catch (IllegalArgumentException iae) {
LOG.warn("Couldn't extract timestamp from filename - " + deletedBlobListFile, iae);
continue;
}
if (timestamp < before) {
LineIterator blobLineIter = null;
try {
blobLineIter = FileUtils.lineIterator(deletedBlobListFile);
while (blobLineIter.hasNext()) {
if (cancelled) {
break;
}
String deletedBlobLine = blobLineIter.next();
String[] parsedDeletedBlobIdLine = deletedBlobLine.split("\\|", 3);
if (parsedDeletedBlobIdLine.length != 3) {
LOG.warn("Unparseable line ({}) in file {}. It won't be retried.",
parsedDeletedBlobIdLine, deletedBlobListFile);
} else {
String deletedBlobId = parsedDeletedBlobIdLine[0];
try {
long blobDeletionTimestamp = Long.valueOf(parsedDeletedBlobIdLine[1]);
if (blobDeletionTimestamp < lastCheckedBlobTimestamp) {
continue;
}
if (blobDeletionTimestamp >= before) {
break;
}
lastDeletedBlobTimestamp = Math.max(lastDeletedBlobTimestamp, blobDeletionTimestamp);
List<String> chunkIds = Lists.newArrayList(blobStore.resolveChunks(deletedBlobId));
if (chunkIds.size() > 0) {
long deleted = blobStore.countDeleteChunks(chunkIds, 0);
if (deleted < 1) {
LOG.warn("Blob {} in file {} not deleted", deletedBlobId, deletedBlobListFile);
} else {
numBlobsDeleted++;
numChunksDeleted += deleted;
if (blobIdsTracked) {
// Save deleted chunkIds to a temporary file
for (String id : chunkIds) {
FileIOUtils.writeAsLine(idTempDeleteWriter, id, true);
}
}
}
}
} catch (NumberFormatException nfe) {
LOG.warn("Couldn't parse blobTimestamp(" + parsedDeletedBlobIdLine[1] +
"). deletedBlobLine - " + deletedBlobLine +
"; file - " + deletedBlobListFile.getName(), nfe);
} catch (DataStoreException dse) {
LOG.debug("Exception occurred while attempting to delete blob " + deletedBlobId, dse);
} catch (Exception e) {
LOG.warn("Exception occurred while attempting to delete blob " + deletedBlobId, e);
}
}
}
} catch (IOException ioe) {
//log error and continue
LOG.warn("Couldn't read deleted blob list file - " + deletedBlobListFile, ioe);
} finally {
LineIterator.closeQuietly(blobLineIter);
}
// OAK-6314 revealed that blobs appended might not be immediately available. So, we'd skip
// the file that was being processed when purge started - next cycle would re-process and
// delete
if (!deletedBlobListFile.getName().equals(currInUseFileName)) {
if (!deletedBlobListFile.delete()) {
LOG.warn("File {} couldn't be deleted while all blobs listed in it have been purged", deletedBlobListFile);
} else {
LOG.debug("File {} deleted", deletedBlobListFile);
}
}
} else {
LOG.debug("Skipping {} as its timestamp is newer than {}", deletedBlobListFile.getName(), before);
}
}
long startBlobTrackerSyncTime = clock.getTime();
// Synchronize deleted blob ids with the blob id tracker
try {
Closeables.close(idTempDeleteWriter, true);
if (blobIdsTracked && numBlobsDeleted > 0) {
BlobTracker tracker = ((BlobTrackingStore) blobStore).getTracker();
if (tracker != null) {
tracker.remove(idTempDeleteFile, Options.ACTIVE_DELETION);
}
}
} catch(Exception e) {
LOG.warn("Error refreshing tracked blob ids", e);
}
long endBlobTrackerSyncTime = clock.getTime();
LOG.info("Synchronizing changes with blob tracker took {} ms", endBlobTrackerSyncTime - startBlobTrackerSyncTime);
if (cancelled) {
LOG.info("Deletion run cancelled by user");
}
long end = clock.getTime();
LOG.info("Deleted {} blobs contained in {} chunks in {} ms", numBlobsDeleted, numChunksDeleted, end - start);
writeOutLastCheckedBlobTimestamp(lastDeletedBlobTimestamp);
}