in samoa-api/src/main/java/org/apache/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java [142:211]
private void processInstanceEvent(InstanceContentEvent instanceEvent) {
Instance instance = instanceEvent.getInstance();
boolean predictionCovered = false;
boolean trainingCovered = false;
boolean continuePrediction = instanceEvent.isTesting();
boolean continueTraining = instanceEvent.isTraining();
ErrorWeightedVote errorWeightedVote = newErrorWeightedVote();
Iterator<PassiveRule> ruleIterator = this.ruleSet.iterator();
while (ruleIterator.hasNext()) {
if (!continuePrediction && !continueTraining)
break;
PassiveRule rule = ruleIterator.next();
if (rule.isCovering(instance) == true) {
predictionCovered = true;
if (continuePrediction) {
double[] vote = rule.getPrediction(instance);
double error = rule.getCurrentError();
errorWeightedVote.addVote(vote, error);
if (!this.unorderedRules)
continuePrediction = false;
}
if (continueTraining) {
if (!isAnomaly(instance, rule)) {
trainingCovered = true;
rule.updateStatistics(instance);
// Send instance to statistics PIs
sendInstanceToRule(instance, rule.getRuleNumberID());
if (!this.unorderedRules)
continueTraining = false;
}
}
}
}
if (predictionCovered) {
// Combined prediction
ResultContentEvent rce = newResultContentEvent(errorWeightedVote.computeWeightedVote(), instanceEvent);
resultStream.put(rce);
}
else if (instanceEvent.isTesting()) {
// predict with default rule
double[] vote = defaultRule.getPrediction(instance);
ResultContentEvent rce = newResultContentEvent(vote, instanceEvent);
resultStream.put(rce);
}
if (!trainingCovered && instanceEvent.isTraining()) {
// train default rule with this instance
defaultRule.updateStatistics(instance);
if (defaultRule.getInstancesSeen() % this.gracePeriod == 0.0) {
if (defaultRule.tryToExpand(this.splitConfidence, this.tieThreshold) == true) {
ActiveRule newDefaultRule = newRule(defaultRule.getRuleNumberID(),
(RuleActiveRegressionNode) defaultRule.getLearningNode(),
((RuleActiveRegressionNode) defaultRule.getLearningNode()).getStatisticsOtherBranchSplit()); // other branch
defaultRule.split();
defaultRule.setRuleNumberID(++ruleNumberID);
this.ruleSet.add(new PassiveRule(this.defaultRule));
// send to statistics PI
sendAddRuleEvent(defaultRule.getRuleNumberID(), this.defaultRule);
defaultRule = newDefaultRule;
}
}
}
}