in import/src/main/java/com/google/cloud/healthcare/imaging/dicomadapter/CStoreService.java [179:224]
private void processStream(Executor underlyingExecutor, InputStream inputStream,
List<StreamProcessor> processorList) throws Throwable {
if (processorList.size() == 1) {
StreamProcessor singleProcessor = processorList.get(0);
singleProcessor.process(inputStream, null);
} else if (processorList.size() > 1) {
List<StreamCallable> callableList = new ArrayList<>();
PipedOutputStream pdvPipeOut = new PipedOutputStream();
InputStream nextInputStream = new PipedInputStream(pdvPipeOut);
for(int i=0; i < processorList.size(); i++){
StreamProcessor processor = processorList.get(i);
InputStream processorInput = nextInputStream;
OutputStream processorOutput = null;
if(i < processorList.size() - 1) {
PipedOutputStream pipeOut = new PipedOutputStream();
processorOutput = pipeOut;
nextInputStream = new PipedInputStream(pipeOut);
}
callableList.add(new StreamCallable(processorInput, processorOutput, processor));
}
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(underlyingExecutor);
for(StreamCallable callable : callableList){
ecs.submit(callable);
}
try (pdvPipeOut) {
// PDVInputStream is thread-locked
StreamUtils.copy(inputStream, pdvPipeOut);
} catch (IOException e) {
// causes or is caused by exception in callables, no need to throw this up
log.trace("Error copying inputStream to pdvPipeOut", e);
}
try {
for (int i = 0; i < callableList.size(); i++) {
ecs.take().get();
}
} catch (ExecutionException e) {
throw e.getCause();
}
}
}