public void init()

in samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialCVEvaluation.java [84:163]


  public void init() {
    // TODO remove the if statement
    // theoretically, dynamic binding will work here!
    // test later!
    // for now, the if statement is used by Storm

    if (builder == null) {
      builder = new TopologyBuilder();
      logger.debug("Successfully instantiating TopologyBuilder");

      builder.initTopology(evaluationNameOption.getValue());
      logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue());
    }

    // instantiate PrequentialSourceProcessor and its output stream
    // (sourcePiOutputStream)
    preqSource = new PrequentialSourceProcessor();
    preqSource.setStreamSource((InstanceStream) this.streamTrainOption.getValue());
    preqSource.setMaxNumInstances(instanceLimitOption.getValue());
    preqSource.setSourceDelay(sourceDelayOption.getValue());
    preqSource.setDelayBatchSize(batchDelayOption.getValue());
    builder.addEntranceProcessor(preqSource);
    logger.debug("Successfully instantiating PrequentialSourceProcessor");

    sourcePiOutputStream = builder.createStream(preqSource);

    //Add EvaluationDistributorProcessor
    int numberFolds = this.foldNumberOption.getValue();
    distributorP = new EvaluationDistributorProcessor();
    distributorP.setNumberClassifiers(numberFolds);
    distributorP.setValidationMethodologyOption(this.validationMethodologyOption.getChosenIndex());
    distributorP.setRandomSeed(this.randomSeedOption.getValue());
    builder.addProcessor(distributorP, 1);
    builder.connectInputAllStream(sourcePiOutputStream, distributorP);

    // instantiate classifier
    int foldNumber = this.foldNumberOption.getValue();
    ensemble = new Learner[foldNumber];
    for (int i = 0; i < foldNumber; i++) {
      try {
        ensemble[i] = (Learner) ClassOption.createObject(learnerOption.getValueAsCLIString(),
                learnerOption.getRequiredType());
      } catch (Exception e) {
        logger.error("Unable to create classifiers for the distributed evaluation. Please check your CLI parameters");
        e.printStackTrace();
        throw new IllegalArgumentException(e);
      }
      ensemble[i].init(builder, preqSource.getDataset(), 1); // sequential
    }
    logger.debug("Successfully instantiating Classifiers");

    Stream[] ensembleStreams = new Stream[foldNumber];
    for (int i = 0; i < foldNumber; i++) {
      ensembleStreams[i] = builder.createStream(distributorP);
      builder.connectInputShuffleStream(ensembleStreams[i], ensemble[i].getInputProcessor()); // connect streams one-to-one with ensemble members (the type of connection does not matter)
    }
    distributorP.setOutputStreams(ensembleStreams);

    PerformanceEvaluator evaluatorOptionValue = this.evaluatorOption.getValue();
    if (!PrequentialCVEvaluation.isLearnerAndEvaluatorCompatible(ensemble[0], evaluatorOptionValue)) {
      evaluatorOptionValue = getDefaultPerformanceEvaluatorForLearner(ensemble[0]);
    }
    evaluator = new EvaluatorCVProcessor.Builder(evaluatorOptionValue)
            .samplingFrequency(sampleFrequencyOption.getValue())
            .dumpFile(dumpFileOption.getFile())
            .foldNumber(numberFolds).build();

    builder.addProcessor(evaluator, 1);

    for (Learner member : ensemble) {
      for (Stream subResultStream : member.getResultStreams()) { // a learner can have multiple output streams
        this.builder.connectInputKeyStream(subResultStream, evaluator); // the key is the instance id to combine predictions
      }
    }

    logger.debug("Successfully instantiating EvaluatorProcessor");

    prequentialTopology = builder.build();
    logger.debug("Successfully building the topology");
  }