public void init()

in samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/HorizontalAMRulesRegressor.java [155:226]


  public void init(TopologyBuilder topologyBuilder, Instances dataset, int parallelism) {

    // Create MODEL PIs
    this.model = new AMRRuleSetProcessor.Builder(dataset)
        .noAnomalyDetection(noAnomalyDetectionOption.isSet())
        .multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
        .univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
        .anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
        .unorderedRules(unorderedRulesOption.isSet())
        .voteType(votingTypeOption.getChosenIndex())
        .build();

    topologyBuilder.addProcessor(model, this.ruleSetParallelismOption.getValue());

    // MODEL PIs streams
    Stream forwardToRootStream = topologyBuilder.createStream(this.model);
    Stream forwardToLearnerStream = topologyBuilder.createStream(this.model);
    this.modelResultStream = topologyBuilder.createStream(this.model);

    this.model.setDefaultRuleStream(forwardToRootStream);
    this.model.setStatisticsStream(forwardToLearnerStream);
    this.model.setResultStream(this.modelResultStream);

    // Create DefaultRule PI
    AMRDefaultRuleProcessor root = new AMRDefaultRuleProcessor.Builder(dataset)
        .threshold(pageHinckleyThresholdOption.getValue())
        .alpha(pageHinckleyAlphaOption.getValue())
        .changeDetection(this.DriftDetectionOption.isSet())
        .predictionFunction(predictionFunctionOption.getChosenIndex())
        .constantLearningRatioDecay(constantLearningRatioDecayOption.isSet())
        .learningRatio(learningRatioOption.getValue())
        .splitConfidence(splitConfidenceOption.getValue())
        .tieThreshold(tieThresholdOption.getValue())
        .gracePeriod(gracePeriodOption.getValue())
        .numericObserver((FIMTDDNumericAttributeClassLimitObserver) numericObserverOption.getValue())
        .build();

    topologyBuilder.addProcessor(root);

    // Default Rule PI streams
    Stream newRuleStream = topologyBuilder.createStream(root);
    this.rootResultStream = topologyBuilder.createStream(root);

    root.setRuleStream(newRuleStream);
    root.setResultStream(this.rootResultStream);

    // Create Learner PIs
    AMRLearnerProcessor learner = new AMRLearnerProcessor.Builder(dataset)
        .splitConfidence(splitConfidenceOption.getValue())
        .tieThreshold(tieThresholdOption.getValue())
        .gracePeriod(gracePeriodOption.getValue())
        .noAnomalyDetection(noAnomalyDetectionOption.isSet())
        .multivariateAnomalyProbabilityThreshold(multivariateAnomalyProbabilityThresholdOption.getValue())
        .univariateAnomalyProbabilityThreshold(univariateAnomalyProbabilityThresholdOption.getValue())
        .anomalyNumberOfInstancesThreshold(anomalyNumInstThresholdOption.getValue())
        .build();

    topologyBuilder.addProcessor(learner, this.learnerParallelismOption.getValue());

    Stream predicateStream = topologyBuilder.createStream(learner);
    learner.setOutputStream(predicateStream);

    // Connect streams
    // to MODEL
    topologyBuilder.connectInputAllStream(newRuleStream, this.model);
    topologyBuilder.connectInputAllStream(predicateStream, this.model);
    // to ROOT
    topologyBuilder.connectInputShuffleStream(forwardToRootStream, root);
    // to LEARNER
    topologyBuilder.connectInputKeyStream(forwardToLearnerStream, learner);
    topologyBuilder.connectInputAllStream(newRuleStream, learner);
  }