public InputStream streamBlob()

in apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java [815:887]


   public InputStream streamBlob(final String container, final String name, final ExecutorService executor) {

      final ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
      // User will receive the Input end of the piped stream
      final PipedOutputStream output;
      final PipedInputStream input;
      try {
         output = new PipedOutputStream();
         input = new PipedInputStream(output,
               getMinimumMultipartPartSize() * 5 > Integer.MAX_VALUE ?
                     Integer.MAX_VALUE : (int) getMinimumMultipartPartSize() * 5);
      } catch (IOException e) {
         throw new RuntimeException(e);
      }

      // The total length of the file to download is needed to determine ranges
      // It has to be obtainable without downloading the whole file
      final long contentLength = api
            .getObjectApi(regionId, container)
            .getWithoutBody(name)
            .getPayload()
            .getContentMetadata()
            .getContentLength();

      // Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
      final long partSize = getMinimumMultipartPartSize();

      // Used to communicate between the producer and consumer threads
      final LinkedBlockingQueue<ListenableFuture<byte[]>> results = new LinkedBlockingQueue<ListenableFuture<byte[]>>();

      listeningExecutor.submit(new Runnable() {
         @Override
         public void run() {
            ListenableFuture<byte[]> result;
            long from;
            try {
               for (from = 0; from < contentLength; from = from + partSize) {
                  logger.debug(Thread.currentThread() + " writing to output");
                  result = results.take();
                  if (result == null) {
                     output.close();
                     input.close();
                     throw new RuntimeException("Error downloading file part to stream");
                  }
                  output.write(result.get());
               }
            } catch (Exception e) {
               logger.debug(e.toString());
               // Close pipe so client is notified of an exception
               Closeables2.closeQuietly(input);
               throw new RuntimeException(e);
            } finally {
               // Finished writing results to stream
               Closeables2.closeQuietly(output);
            }
         }
      });

      listeningExecutor.submit(new Runnable() {
         @Override
         public void run() {
            long from;
            long to;
            // Loop through ranges within the file
            for (from = 0; from < contentLength; from = from + partSize) {
               to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
               BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to);
               results.add(listeningExecutor.submit(b));
            }
         }
      });
      return input;
   }