in samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/HorizontalAMRulesRegressor.java [155:226]
public void init(TopologyBuilder topologyBuilder, Instances dataset, int parallelism) {
// Create MODEL PIs
this.model = new AMRRuleSetProcessor.Builder(dataset)
.noAnomalyDetection(noAnomalyDetectionOption.isSet())
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
.unorderedRules(unorderedRulesOption.isSet())
.voteType(votingTypeOption.getChosenIndex())
.build();
topologyBuilder.addProcessor(model, this.ruleSetParallelismOption.getValue());
// MODEL PIs streams
Stream forwardToRootStream = topologyBuilder.createStream(this.model);
Stream forwardToLearnerStream = topologyBuilder.createStream(this.model);
this.modelResultStream = topologyBuilder.createStream(this.model);
this.model.setDefaultRuleStream(forwardToRootStream);
this.model.setStatisticsStream(forwardToLearnerStream);
this.model.setResultStream(this.modelResultStream);
// Create DefaultRule PI
AMRDefaultRuleProcessor root = new AMRDefaultRuleProcessor.Builder(dataset)
.threshold(pageHinckleyThresholdOption.getValue())
.alpha(pageHinckleyAlphaOption.getValue())
.changeDetection(this.DriftDetectionOption.isSet())
.predictionFunction(predictionFunctionOption.getChosenIndex())
.constantLearningRatioDecay(constantLearningRatioDecayOption.isSet())
.learningRatio(learningRatioOption.getValue())
.splitConfidence(splitConfidenceOption.getValue())
.tieThreshold(tieThresholdOption.getValue())
.gracePeriod(gracePeriodOption.getValue())
.numericObserver((FIMTDDNumericAttributeClassLimitObserver) numericObserverOption.getValue())
.build();
topologyBuilder.addProcessor(root);
// Default Rule PI streams
Stream newRuleStream = topologyBuilder.createStream(root);
this.rootResultStream = topologyBuilder.createStream(root);
root.setRuleStream(newRuleStream);
root.setResultStream(this.rootResultStream);
// Create Learner PIs
AMRLearnerProcessor learner = new AMRLearnerProcessor.Builder(dataset)
.splitConfidence(splitConfidenceOption.getValue())
.tieThreshold(tieThresholdOption.getValue())
.gracePeriod(gracePeriodOption.getValue())
.noAnomalyDetection(noAnomalyDetectionOption.isSet())
.multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
.univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
.anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
.build();
topologyBuilder.addProcessor(learner, this.learnerParallelismOption.getValue());
Stream predicateStream = topologyBuilder.createStream(learner);
learner.setOutputStream(predicateStream);
// Connect streams
// to MODEL
topologyBuilder.connectInputAllStream(newRuleStream, this.model);
topologyBuilder.connectInputAllStream(predicateStream, this.model);
// to ROOT
topologyBuilder.connectInputShuffleStream(forwardToRootStream, root);
// to LEARNER
topologyBuilder.connectInputKeyStream(forwardToLearnerStream, learner);
topologyBuilder.connectInputAllStream(newRuleStream, learner);
}