in src/main/java/build/buildfarm/cas/cfc/CASFileCache.java [761:897]
Write newWrite(BlobWriteKey key, ListenableFuture<Long> future) {
Write write =
new Write() {
CancellableOutputStream out = null;
boolean isReset = false;
SettableFuture<Void> closedFuture = null;
long fileCommittedSize = -1;
@Override
public synchronized void reset() {
try {
if (out != null) {
out.cancel();
}
} catch (IOException e) {
logger.log(
Level.SEVERE,
"could not reset write "
+ DigestUtil.toString(key.getDigest())
+ ":"
+ key.getIdentifier(),
e);
} finally {
isReset = true;
}
}
@Override
public synchronized long getCommittedSize() {
long committedSize = getCommittedSizeFromOutOrDisk();
if (committedSize == 0 && out == null) {
isReset = true;
}
return committedSize;
}
long getCommittedSizeFromOutOrDisk() {
if (isComplete()) {
return key.getDigest().getSizeBytes();
}
return getCommittedSizeFromOut();
}
synchronized long getCommittedSizeFromOut() {
if (out == null) {
if (fileCommittedSize < 0) {
// we need to cache this from disk until an out stream is acquired
String blobKey = getKey(key.getDigest(), false);
Path blobKeyPath = getPath(blobKey);
try {
fileCommittedSize =
Files.size(blobKeyPath.resolveSibling(blobKey + "." + key.getIdentifier()));
} catch (IOException e) {
fileCommittedSize = 0;
}
}
return fileCommittedSize;
}
return out.getWritten();
}
@Override
public synchronized boolean isComplete() {
return getFuture().isDone()
|| ((closedFuture == null || closedFuture.isDone())
&& containsLocal(key.getDigest(), /* result=*/ null, (key) -> {}));
}
@Override
public synchronized ListenableFuture<FeedbackOutputStream> getOutputFuture(
long deadlineAfter, TimeUnit deadlineAfterUnits, Runnable onReadyHandler) {
if (closedFuture == null || closedFuture.isDone()) {
try {
// this isn't great, and will block when there are multiple requesters
return immediateFuture(
getOutput(deadlineAfter, deadlineAfterUnits, onReadyHandler));
} catch (IOException e) {
return immediateFailedFuture(e);
}
}
return transformAsync(
closedFuture,
result -> getOutputFuture(deadlineAfter, deadlineAfterUnits, onReadyHandler),
directExecutor());
}
@Override
public synchronized FeedbackOutputStream getOutput(
long deadlineAfter, TimeUnit deadlineAfterUnits, Runnable onReadyHandler)
throws IOException {
// caller will be the exclusive owner of this write stream. all other requests
// will block until it is returned via a close.
if (closedFuture != null) {
try {
closedFuture.get();
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} catch (InterruptedException e) {
throw new IOException(e);
}
}
SettableFuture<Void> outClosedFuture = SettableFuture.create();
UniqueWriteOutputStream uniqueOut =
createUniqueWriteOutput(
out,
key.getDigest(),
UUID.fromString(key.getIdentifier()),
() -> outClosedFuture.set(null),
this::isComplete,
isReset);
commitOpenState(uniqueOut.delegate(), outClosedFuture);
return uniqueOut;
}
private void commitOpenState(
CancellableOutputStream out, SettableFuture<Void> closedFuture) {
// transition the Write to an open state, and modify all internal state required
// atomically
// this function must. not. throw.
this.out = out;
this.closedFuture = closedFuture;
// they will likely write to this, so we can no longer assume isReset.
// might want to subscribe to a write event on the stream
isReset = false;
// our cached file committed size is now invalid
fileCommittedSize = -1;
}
@Override
public ListenableFuture<Long> getFuture() {
return future;
}
};
write.getFuture().addListener(write::reset, directExecutor());
return write;
}