in apis/openstack-swift/src/main/java/org/jclouds/openstack/swift/v1/blobstore/RegionScopedSwiftBlobStore.java [815:887]
public InputStream streamBlob(final String container, final String name, final ExecutorService executor) {
final ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
// User will receive the Input end of the piped stream
final PipedOutputStream output;
final PipedInputStream input;
try {
output = new PipedOutputStream();
input = new PipedInputStream(output,
getMinimumMultipartPartSize() * 5 > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) getMinimumMultipartPartSize() * 5);
} catch (IOException e) {
throw new RuntimeException(e);
}
// The total length of the file to download is needed to determine ranges
// It has to be obtainable without downloading the whole file
final long contentLength = api
.getObjectApi(regionId, container)
.getWithoutBody(name)
.getPayload()
.getContentMetadata()
.getContentLength();
// Determine download buffer size, smaller means less memory usage; larger is faster as long as threads are saturated
final long partSize = getMinimumMultipartPartSize();
// Used to communicate between the producer and consumer threads
final LinkedBlockingQueue<ListenableFuture<byte[]>> results = new LinkedBlockingQueue<ListenableFuture<byte[]>>();
listeningExecutor.submit(new Runnable() {
@Override
public void run() {
ListenableFuture<byte[]> result;
long from;
try {
for (from = 0; from < contentLength; from = from + partSize) {
logger.debug(Thread.currentThread() + " writing to output");
result = results.take();
if (result == null) {
output.close();
input.close();
throw new RuntimeException("Error downloading file part to stream");
}
output.write(result.get());
}
} catch (Exception e) {
logger.debug(e.toString());
// Close pipe so client is notified of an exception
Closeables2.closeQuietly(input);
throw new RuntimeException(e);
} finally {
// Finished writing results to stream
Closeables2.closeQuietly(output);
}
}
});
listeningExecutor.submit(new Runnable() {
@Override
public void run() {
long from;
long to;
// Loop through ranges within the file
for (from = 0; from < contentLength; from = from + partSize) {
to = (from + partSize >= contentLength) ? contentLength - 1 : from + partSize - 1;
BlobStreamDownloader b = new BlobStreamDownloader(container, name, from, to);
results.add(listeningExecutor.submit(b));
}
}
});
return input;
}