in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java [210:266]
public StreamsTask createConnectedTask(int timeout) {
StreamsTask task;
if(this.processor != null) {
if(this.numTasks > 1) {
task = new StreamsProcessorTask(SerializationUtil.cloneBySerialization(this.processor), streamConfig);
task.addInputQueue(this.inQueue);
for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
task.addOutputQueue(q);
}
} else {
task = new StreamsProcessorTask(this.processor, streamConfig);
task.addInputQueue(this.inQueue);
for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
task.addOutputQueue(q);
}
}
}
else if(this.writer != null) {
if(this.numTasks > 1) {
task = new StreamsPersistWriterTask(SerializationUtil.cloneBySerialization(this.writer), streamConfig);
task.addInputQueue(this.inQueue);
} else {
task = new StreamsPersistWriterTask(this.writer, streamConfig);
task.addInputQueue(this.inQueue);
}
}
else if(this.provider != null) {
StreamsProvider prov;
if(this.numTasks > 1) {
prov = SerializationUtil.cloneBySerialization(this.provider);
} else {
prov = this.provider;
}
if(this.dateRange == null && this.sequence == null)
task = new StreamsProviderTask(prov, this.perpetual, streamConfig);
else if(this.sequence != null)
task = new StreamsProviderTask(prov, this.sequence, streamConfig);
else
task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1], streamConfig);
//Adjust the timeout if necessary
if(timeout != 0) {
((StreamsProviderTask)task).setTimeout(timeout);
}
for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
task.addOutputQueue(q);
}
}
else {
throw new InvalidStreamException("Underlying StreamComponoent was NULL.");
}
if(task != null) {
tasks.add(task);
}
return task;
}