in samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialCVEvaluation.java [84:163]
public void init() {
// TODO remove the if statement
// theoretically, dynamic binding will work here!
// test later!
// for now, the if statement is used by Storm
if (builder == null) {
builder = new TopologyBuilder();
logger.debug("Successfully instantiating TopologyBuilder");
builder.initTopology(evaluationNameOption.getValue());
logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue());
}
// instantiate PrequentialSourceProcessor and its output stream
// (sourcePiOutputStream)
preqSource = new PrequentialSourceProcessor();
preqSource.setStreamSource((InstanceStream) this.streamTrainOption.getValue());
preqSource.setMaxNumInstances(instanceLimitOption.getValue());
preqSource.setSourceDelay(sourceDelayOption.getValue());
preqSource.setDelayBatchSize(batchDelayOption.getValue());
builder.addEntranceProcessor(preqSource);
logger.debug("Successfully instantiating PrequentialSourceProcessor");
sourcePiOutputStream = builder.createStream(preqSource);
//Add EvaluationDistributorProcessor
int numberFolds = this.foldNumberOption.getValue();
distributorP = new EvaluationDistributorProcessor();
distributorP.setNumberClassifiers(numberFolds);
distributorP.setValidationMethodologyOption(this.validationMethodologyOption.getChosenIndex());
distributorP.setRandomSeed(this.randomSeedOption.getValue());
builder.addProcessor(distributorP, 1);
builder.connectInputAllStream(sourcePiOutputStream, distributorP);
// instantiate classifier
int foldNumber = this.foldNumberOption.getValue();
ensemble = new Learner[foldNumber];
for (int i = 0; i < foldNumber; i++) {
try {
ensemble[i] = (Learner) ClassOption.createObject(learnerOption.getValueAsCLIString(),
learnerOption.getRequiredType());
} catch (Exception e) {
logger.error("Unable to create classifiers for the distributed evaluation. Please check your CLI parameters");
e.printStackTrace();
throw new IllegalArgumentException(e);
}
ensemble[i].init(builder, preqSource.getDataset(), 1); // sequential
}
logger.debug("Successfully instantiating Classifiers");
Stream[] ensembleStreams = new Stream[foldNumber];
for (int i = 0; i < foldNumber; i++) {
ensembleStreams[i] = builder.createStream(distributorP);
builder.connectInputShuffleStream(ensembleStreams[i], ensemble[i].getInputProcessor()); // connect streams one-to-one with ensemble members (the type of connection does not matter)
}
distributorP.setOutputStreams(ensembleStreams);
PerformanceEvaluator evaluatorOptionValue = this.evaluatorOption.getValue();
if (!PrequentialCVEvaluation.isLearnerAndEvaluatorCompatible(ensemble[0], evaluatorOptionValue)) {
evaluatorOptionValue = getDefaultPerformanceEvaluatorForLearner(ensemble[0]);
}
evaluator = new EvaluatorCVProcessor.Builder(evaluatorOptionValue)
.samplingFrequency(sampleFrequencyOption.getValue())
.dumpFile(dumpFileOption.getFile())
.foldNumber(numberFolds).build();
builder.addProcessor(evaluator, 1);
for (Learner member : ensemble) {
for (Stream subResultStream : member.getResultStreams()) { // a learner can have multiple output streams
this.builder.connectInputKeyStream(subResultStream, evaluator); // the key is the instance id to combine predictions
}
}
logger.debug("Successfully instantiating EvaluatorProcessor");
prequentialTopology = builder.build();
logger.debug("Successfully building the topology");
}