public StreamsTask createConnectedTask()

in streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java [210:266]


  public StreamsTask createConnectedTask(int timeout) {
    StreamsTask task;
    if(this.processor != null) {
      if(this.numTasks > 1) {
        task =  new StreamsProcessorTask(SerializationUtil.cloneBySerialization(this.processor), streamConfig);
        task.addInputQueue(this.inQueue);
        for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
          task.addOutputQueue(q);
        }
      } else {
        task = new StreamsProcessorTask(this.processor, streamConfig);
        task.addInputQueue(this.inQueue);
        for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
          task.addOutputQueue(q);
        }
      }
    }
    else if(this.writer != null) {
      if(this.numTasks > 1) {
        task = new StreamsPersistWriterTask(SerializationUtil.cloneBySerialization(this.writer), streamConfig);
        task.addInputQueue(this.inQueue);
      } else {
        task = new StreamsPersistWriterTask(this.writer, streamConfig);
        task.addInputQueue(this.inQueue);
      }
    }
    else if(this.provider != null) {
      StreamsProvider prov;
      if(this.numTasks > 1) {
        prov = SerializationUtil.cloneBySerialization(this.provider);
      } else {
        prov = this.provider;
      }
      if(this.dateRange == null && this.sequence == null)
        task = new StreamsProviderTask(prov, this.perpetual, streamConfig);
      else if(this.sequence != null)
        task = new StreamsProviderTask(prov, this.sequence, streamConfig);
      else
        task = new StreamsProviderTask(prov, this.dateRange[0], this.dateRange[1], streamConfig);
      //Adjust the timeout if necessary
      if(timeout != 0) {
        ((StreamsProviderTask)task).setTimeout(timeout);
      }
      for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
        task.addOutputQueue(q);
      }
    }
    else {
      throw new InvalidStreamException("Underlying StreamComponoent was NULL.");
    }

    if(task != null) {
      tasks.add(task);
    }

    return task;
  }