in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java [358:397]
private void shutDownTask(StreamComponent comp, Map<String, List<StreamsTask>> streamTasks) throws InterruptedException {
List<StreamsTask> tasks = streamTasks.get(comp.getId());
if(tasks != null) { //not a StreamProvider
boolean parentsShutDown = true;
for(StreamComponent parent : comp.getUpStreamComponents()) {
List<StreamsTask> parentTasks = streamTasks.get(parent.getId());
//if parentTask == null, its a provider and is not running anymore
if(parentTasks != null) {
for(StreamsTask task : parentTasks) {
parentsShutDown = parentsShutDown && !task.isRunning();
}
}
}
if(parentsShutDown) {
for(StreamsTask task : tasks) {
task.stopTask();
if(task.isWaiting()) {
this.futures.get(task).cancel(true); // no data to process, interrupt block queue
}
}
for(StreamsTask task : tasks) {
int count = 0;
while(count < localRuntimeConfiguration.getTaskTimeoutMs() / 1000 && task.isRunning()) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
count++;
}
if(task.isRunning()) {
LOGGER.warn("Task {} failed to terminate in allotted timeframe", task.toString());
}
}
}
}
Collection<StreamComponent> children = comp.getDownStreamComponents();
if(children != null) {
for(StreamComponent child : comp.getDownStreamComponents()) {
shutDownTask(child, streamTasks);
}
}
}