in src/main/java/build/buildfarm/cas/cfc/CASFileCache.java [2045:2210]
private ListenableFuture<Path> putDirectorySynchronized(
Path path, Digest digest, Map<Digest, Directory> directoriesByDigest, ExecutorService service)
throws IOException {
logger.log(Level.FINE, format("directory %s has been locked", path.getFileName()));
ListenableFuture<Void> expireFuture;
synchronized (this) {
DirectoryEntry e = directoryStorage.get(digest);
if (e == null) {
expireFuture = immediateFuture(null);
} else {
ImmutableList.Builder<String> inputsBuilder = ImmutableList.builder();
for (String input : directoriesIndex.directoryEntries(digest)) {
Entry fileEntry = storage.get(input);
if (fileEntry == null) {
logger.log(
Level.SEVERE,
format(
"CASFileCache::putDirectory(%s) exists, but input %s does not, purging it with fire and resorting to fetch",
DigestUtil.toString(digest), input));
e = null;
break;
}
if (fileEntry.incrementReference()) {
unreferencedEntryCount--;
}
checkNotNull(input);
inputsBuilder.add(input);
}
if (e != null) {
logger.log(Level.FINE, format("found existing entry for %s", path.getFileName()));
if (directoryEntryExists(path, e, directoriesByDigest)) {
return immediateFuture(path);
}
logger.log(
Level.SEVERE,
format(
"directory %s does not exist in cache, purging it with fire and resorting to fetch",
path.getFileName()));
}
decrementReferencesSynchronized(inputsBuilder.build(), ImmutableList.of());
expireFuture = expireDirectory(digest, service);
logger.log(Level.FINE, format("expiring existing entry for %s", path.getFileName()));
}
}
ListenableFuture<Void> deindexFuture =
transformAsync(
expireFuture,
result -> {
try {
directoriesIndex.remove(digest);
} catch (IOException e) {
return immediateFailedFuture(e);
}
return immediateFuture(null);
},
service);
ImmutableList.Builder<String> inputsBuilder = ImmutableList.builder();
ListenableFuture<Void> fetchFuture =
transformAsync(
deindexFuture,
result -> {
logger.log(Level.FINE, format("expiry complete, fetching %s", path.getFileName()));
ImmutableList.Builder<ListenableFuture<Path>> putFuturesBuilder =
ImmutableList.builder();
fetchDirectory(
path, digest, directoriesByDigest, inputsBuilder, putFuturesBuilder, service);
ImmutableList<ListenableFuture<Path>> putFutures = putFuturesBuilder.build();
// is this better suited for whenAllComplete?
return transformAsync(
successfulAsList(putFutures),
paths -> {
ImmutableList.Builder<Throwable> failures = ImmutableList.builder();
boolean failed = false;
for (int i = 0; i < paths.size(); i++) {
Path putPath = paths.get(i);
if (putPath == null) {
failed = true;
try {
putFutures.get(i).get();
// should never get here
} catch (Throwable t) {
failures.add(t);
}
}
}
if (failed) {
return immediateFailedFuture(
new PutDirectoryException(path, digest, failures.build()));
}
return immediateFuture(null);
},
service);
},
service);
ListenableFuture<Void> chmodAndIndexFuture =
transformAsync(
fetchFuture,
(result) -> {
try {
disableAllWriteAccess(path);
} catch (IOException e) {
logger.log(Level.SEVERE, "error while disabling write permissions on " + path, e);
return immediateFailedFuture(e);
}
try {
directoriesIndex.put(digest, inputsBuilder.build());
} catch (IOException e) {
logger.log(Level.SEVERE, "error while indexing " + path, e);
return immediateFailedFuture(e);
}
return immediateFuture(null);
},
service);
ListenableFuture<Void> rollbackFuture =
catchingAsync(
chmodAndIndexFuture,
Throwable.class,
e -> {
ImmutableList<String> inputs = inputsBuilder.build();
directoriesIndex.remove(digest);
synchronized (this) {
try {
decrementReferencesSynchronized(inputs, ImmutableList.of());
} catch (IOException ioEx) {
e.addSuppressed(ioEx);
}
}
try {
logger.log(Level.FINE, "removing directory to roll back " + path);
Directories.remove(path);
} catch (IOException removeException) {
logger.log(
Level.SEVERE,
"error during directory removal after fetch failure of " + path,
removeException);
e.addSuppressed(removeException);
}
return immediateFailedFuture(e);
},
service);
return transform(
rollbackFuture,
(results) -> {
logger.log(
Level.FINE, format("directory fetch complete, inserting %s", path.getFileName()));
DirectoryEntry e =
new DirectoryEntry(
// might want to have this treatment ahead of this
digest.getSizeBytes() == 0
? Directory.getDefaultInstance()
: directoriesByDigest.get(digest),
Deadline.after(10, SECONDS));
directoryStorage.put(digest, e);
return path;
},
service);
}