in blobstore/src/main/java/org/jclouds/blobstore/strategy/internal/DeleteAllKeysInList.java [221:301]
private void deleteBlobsAndEmptyDirs(final String containerName,
ListContainerOptions options,
PageSet<? extends StorageMetadata> listing, final Semaphore semaphore,
final AtomicBoolean deleteFailure,
final Set<ListenableFuture<Void>> outstandingFutures)
throws TimeoutException {
for (final StorageMetadata md : listing) {
final String fullPath = parentIsFolder(options, md) ? options.getDir()
+ "/" + md.getName() : md.getName();
// Attempt to acquire a semaphore within the time limit. At least
// one outstanding future should complete within this period for the
// semaphore to be acquired.
try {
if (!semaphore.tryAcquire(maxTime, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timeout waiting for semaphore");
}
} catch (InterruptedException ie) {
logger.debug("Interrupted while deleting blobs");
Thread.currentThread().interrupt();
}
final ListenableFuture<Void> blobDelFuture;
switch (md.getType()) {
case FOLDER:
case BLOB:
blobDelFuture = executorService.submit(new Callable<Void>() {
@Override
public Void call() {
blobStore.removeBlob(containerName, fullPath);
return null;
}
});
break;
case RELATIVE_PATH:
blobDelFuture = deleteDirectory(options, containerName,
md.getName());
break;
case CONTAINER:
throw new IllegalArgumentException("Container type not supported");
default:
blobDelFuture = null;
}
// If a future to delete a blob/directory actually got created above,
// keep a reference of that in the outstandingFutures list. This is
// useful in case of a timeout exception. All outstanding futures can
// then be cancelled.
if (blobDelFuture != null) {
outstandingFutures.add(blobDelFuture);
// Add a callback to release the semaphore. This is required for
// other threads waiting to acquire a semaphore above to make
// progress.
Futures.addCallback(blobDelFuture, new FutureCallback<Object>() {
@Override
public void onSuccess(final Object o) {
outstandingFutures.remove(blobDelFuture);
semaphore.release();
}
@Override
public void onFailure(final Throwable t) {
// Make a note the fact that some blob/directory could not be
// deleted successfully. This is used for retrying later.
deleteFailure.set(true);
outstandingFutures.remove(blobDelFuture);
semaphore.release();
}
}, MoreExecutors.directExecutor());
} else {
// It is possible above to acquire a semaphore but not submit any
// task to the executorService. For e.g. if the listing contains
// an object of type 'FOLDER' and the ListContianerOptions are *not*
// recursive. In this case, there is no blobDelFuture and therefore
// no FutureCallback to release the semaphore. This semaphore is
// released here.
semaphore.release();
}
}
}