in src/main/java/build/buildfarm/cas/cfc/CASFileCache.java [2524:2699]
private CancellableOutputStream putOrReferenceGuarded(
String key,
UUID writeId,
Supplier<Boolean> writeWinner,
long blobSizeInBytes,
boolean isExecutable,
Runnable onInsert,
AtomicBoolean requiresDischarge,
boolean isReset)
throws IOException, InterruptedException {
if (blobSizeInBytes > maxEntrySizeInBytes) {
throw new EntryLimitException(blobSizeInBytes, maxEntrySizeInBytes);
}
if (!charge(key, blobSizeInBytes, requiresDischarge)) {
return DUPLICATE_OUTPUT_STREAM;
}
String writeKey = key + "." + writeId;
Path writePath = getPath(key).resolveSibling(writeKey);
final long committedSize;
final HashingOutputStream hashOut;
if (!isReset && Files.exists(writePath)) {
committedSize = Files.size(writePath);
try (InputStream in = Files.newInputStream(writePath)) {
SkipOutputStream skipStream =
new SkipOutputStream(Files.newOutputStream(writePath, APPEND), committedSize);
hashOut = digestUtil.newHashingOutputStream(skipStream);
ByteStreams.copy(in, hashOut);
checkState(skipStream.isSkipped());
}
} else {
committedSize = 0;
hashOut = digestUtil.newHashingOutputStream(Files.newOutputStream(writePath, CREATE));
}
return new CancellableOutputStream(hashOut) {
long written = committedSize;
final Digest expectedDigest = keyToDigest(key, blobSizeInBytes, digestUtil);
@Override
public long getWritten() {
return written;
}
@Override
public Path getPath() {
return writePath;
}
@Override
public void cancel() throws IOException {
try {
hashOut.close();
Files.delete(writePath);
} finally {
dischargeAndNotify(blobSizeInBytes);
}
}
@Override
public void write(int b) throws IOException {
if (written >= blobSizeInBytes) {
throw new IOException(
format("attempted overwrite at %d by 1 byte for %s", written, writeKey));
}
hashOut.write(b);
written++;
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (written + len > blobSizeInBytes) {
throw new IOException(
format("attempted overwrite at %d by %d bytes for %s", written, len, writeKey));
}
hashOut.write(b, off, len);
written += len;
}
@Override
public void close() throws IOException {
// has some trouble with multiple closes, fortunately we have something above to handle this
long size = getWritten();
hashOut.close(); // should probably discharge here as well
if (size > blobSizeInBytes) {
String hash = hashOut.hash().toString();
try {
Files.delete(writePath);
} finally {
dischargeAndNotify(blobSizeInBytes);
}
Digest actual = Digest.newBuilder().setHash(hash).setSizeBytes(size).build();
throw new DigestMismatchException(actual, expectedDigest);
}
if (size != blobSizeInBytes) {
throw new IncompleteBlobException(writePath, key, size, blobSizeInBytes);
}
commit();
}
void commit() throws IOException {
String hash = hashOut.hash().toString();
String fileName = writePath.getFileName().toString();
if (!fileName.startsWith(hash)) {
dischargeAndNotify(blobSizeInBytes);
Digest actual = Digest.newBuilder().setHash(hash).setSizeBytes(getWritten()).build();
throw new DigestMismatchException(actual, expectedDigest);
}
try {
setReadOnlyPerms(writePath, isExecutable, fileStore);
} catch (IOException e) {
dischargeAndNotify(blobSizeInBytes);
throw e;
}
Entry entry = new Entry(key, blobSizeInBytes, Deadline.after(10, SECONDS));
Entry existingEntry = null;
boolean inserted = false;
try {
Files.createLink(CASFileCache.this.getPath(key), writePath);
existingEntry = storage.putIfAbsent(key, entry);
inserted = existingEntry == null;
} catch (FileAlreadyExistsException e) {
logger.log(
Level.FINE, "file already exists for " + key + ", nonexistent entry will fail");
} finally {
Files.delete(writePath);
if (!inserted) {
dischargeAndNotify(blobSizeInBytes);
}
}
int attempts = 10;
if (!inserted) {
while (existingEntry == null && attempts-- != 0) {
existingEntry = storage.get(key);
try {
MILLISECONDS.sleep(10);
} catch (InterruptedException intEx) {
throw new IOException(intEx);
}
}
if (existingEntry == null) {
throw new IOException("existing entry did not appear for " + key);
}
}
if (existingEntry != null) {
logger.log(Level.FINE, "lost the race to insert " + key);
if (!referenceIfExists(key)) {
// we would lose our accountability and have a presumed reference if we returned
throw new IllegalStateException("storage conflict with existing key for " + key);
}
} else if (writeWinner.get()) {
logger.log(Level.FINE, "won the race to insert " + key);
try {
onInsert.run();
} catch (RuntimeException e) {
throw new IOException(e);
}
} else {
logger.log(Level.FINE, "did not win the race to insert " + key);
}
}
};
}