samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/AdaptiveBagging.java [114:144]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    for (int i = 0; i < ensembleSize; i++) {
      ensembleStreams[i] = builder.createStream(distributorP);
      builder.connectInputShuffleStream(ensembleStreams[i], ensemble[i].getInputProcessor()); // connect streams one-to-one with ensemble members (the type of connection does not matter)
    }

    distributorP.setOutputStreams(ensembleStreams);
  }

  /** The builder. */
  private TopologyBuilder builder;

  @Override
  public void init(TopologyBuilder builder, Instances dataset, int parallelism) {
    this.builder = builder;
    this.dataset = dataset;
    this.setLayout();
  }

  @Override
  public Processor getInputProcessor() {
    return distributorP;
  }

  /*
   * (non-Javadoc)
   * 
   * @see samoa.learners.Learner#getResultStreams()
   */
  @Override
  public Set<Stream> getResultStreams() {
    return ImmutableSet.of(this.resultStream);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/Bagging.java [111:141]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    for (int i = 0; i < ensembleSize; i++) {
      ensembleStreams[i] = builder.createStream(distributorP);
      builder.connectInputShuffleStream(ensembleStreams[i], ensemble[i].getInputProcessor()); // connect streams one-to-one with ensemble members (the type of connection does not matter)
    }

    distributorP.setOutputStreams(ensembleStreams);
  }

  /** The builder. */
  private TopologyBuilder builder;

  @Override
  public void init(TopologyBuilder builder, Instances dataset, int parallelism) {
    this.builder = builder;
    this.dataset = dataset;
    this.setLayout();
  }

  @Override
  public Processor getInputProcessor() {
    return distributorP;
  }

  /*
   * (non-Javadoc)
   * 
   * @see samoa.learners.Learner#getResultStreams()
   */
  @Override
  public Set<Stream> getResultStreams() {
    return ImmutableSet.of(this.resultStream);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



