private static void loadFilesIntoCAS()

in src/main/java/build/buildfarm/Executor.java [232:357]


  private static void loadFilesIntoCAS(String instanceName, Channel channel, Path blobsDir)
      throws Exception {
    ContentAddressableStorageBlockingStub casStub =
        ContentAddressableStorageGrpc.newBlockingStub(channel);
    List<Digest> missingDigests = findMissingBlobs(instanceName, blobsDir, casStub);
    UUID uploadId = UUID.randomUUID();

    int[] bucketSizes = new int[128];
    BatchUpdateBlobsRequest.Builder[] buckets = new BatchUpdateBlobsRequest.Builder[128];
    for (int i = 0; i < 128; i++) {
      bucketSizes[i] = 0;
      buckets[i] = BatchUpdateBlobsRequest.newBuilder().setInstanceName(instanceName);
    }

    ByteStreamStub bsStub = ByteStreamGrpc.newStub(channel);
    for (Digest missingDigest : missingDigests) {
      Path path = blobsDir.resolve(missingDigest.getHash() + "_" + missingDigest.getSizeBytes());
      if (missingDigest.getSizeBytes() < Size.mbToBytes(1)) {
        Request request =
            Request.newBuilder()
                .setDigest(missingDigest)
                .setData(ByteString.copyFrom(Files.readAllBytes(path)))
                .build();
        int maxBucketSize = 0;
        long minBucketSize = Size.mbToBytes(2) + 1;
        int maxBucketIndex = 0;
        int minBucketIndex = -1;
        int size = (int) missingDigest.getSizeBytes() + 48;
        for (int i = 0; i < 128; i++) {
          int newBucketSize = bucketSizes[i] + size;
          if (newBucketSize < Size.mbToBytes(2) && bucketSizes[i] < minBucketSize) {
            minBucketSize = bucketSizes[i];
            minBucketIndex = i;
          }
          if (bucketSizes[i] > maxBucketSize) {
            maxBucketSize = bucketSizes[i];
            maxBucketIndex = i;
          }
        }
        if (minBucketIndex < 0) {
          bucketSizes[maxBucketIndex] = size;
          BatchUpdateBlobsRequest batchRequest = buckets[maxBucketIndex].build();
          Stopwatch stopwatch = Stopwatch.createStarted();
          BatchUpdateBlobsResponse batchResponse = casStub.batchUpdateBlobs(batchRequest);
          long usecs = stopwatch.elapsed(MICROSECONDS);
          checkState(
              batchResponse.getResponsesList().stream()
                  .allMatch(response -> Code.forNumber(response.getStatus().getCode()) == Code.OK));
          System.out.println(
              "Updated "
                  + batchRequest.getRequestsCount()
                  + " blobs in "
                  + (usecs / 1000.0)
                  + "ms");
          buckets[maxBucketIndex] =
              BatchUpdateBlobsRequest.newBuilder()
                  .setInstanceName(instanceName)
                  .addRequests(request);
        } else {
          bucketSizes[minBucketIndex] += size;
          buckets[minBucketIndex].addRequests(request);
        }
      } else {
        Stopwatch stopwatch = Stopwatch.createStarted();
        SettableFuture<WriteResponse> writtenFuture = SettableFuture.create();
        StreamObserver<WriteRequest> requestObserver =
            bsStub.write(
                new StreamObserver<WriteResponse>() {
                  @Override
                  public void onNext(WriteResponse response) {
                    writtenFuture.set(response);
                  }

                  @Override
                  public void onCompleted() {}

                  @Override
                  public void onError(Throwable t) {
                    writtenFuture.setException(t);
                  }
                });
        HashCode hash = HashCode.fromString(missingDigest.getHash());
        String resourceName =
            uploadResourceName(instanceName, uploadId, hash, missingDigest.getSizeBytes());
        try (InputStream in = Files.newInputStream(path)) {
          boolean first = true;
          long writtenBytes = 0;
          byte[] buf = new byte[64 * 1024];
          while (writtenBytes != missingDigest.getSizeBytes()) {
            int len = in.read(buf);
            WriteRequest.Builder request = WriteRequest.newBuilder();
            if (first) {
              request.setResourceName(resourceName);
            }
            request.setData(ByteString.copyFrom(buf, 0, len)).setWriteOffset(writtenBytes);
            if (writtenBytes + len == missingDigest.getSizeBytes()) {
              request.setFinishWrite(true);
            }
            requestObserver.onNext(request.build());
            writtenBytes += len;
            first = false;
          }
          writtenFuture.get();
          System.out.println(
              "Wrote long "
                  + DigestUtil.toString(missingDigest)
                  + " in "
                  + (stopwatch.elapsed(MICROSECONDS) / 1000.0)
                  + "ms");
        }
      }
    }
    for (int i = 0; i < 128; i++) {
      if (bucketSizes[i] > 0) {
        BatchUpdateBlobsRequest batchRequest = buckets[i].build();
        Stopwatch stopwatch = Stopwatch.createStarted();
        BatchUpdateBlobsResponse batchResponse = casStub.batchUpdateBlobs(batchRequest);
        long usecs = stopwatch.elapsed(MICROSECONDS);
        checkState(
            batchResponse.getResponsesList().stream()
                .allMatch(response -> Code.forNumber(response.getStatus().getCode()) == Code.OK));
        System.out.println(
            "Updated " + batchRequest.getRequestsCount() + " blobs in " + (usecs / 1000.0) + "ms");
      }
    }
  }