in samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java [118:181]
public boolean process(ContentEvent event) {
// Poll the blocking queue shared between ModelAggregator and the time-out
// threads
Long timedOutSplitId = timedOutSplittingNodes.poll();
if (timedOutSplitId != null) { // time out has been reached!
SplittingNodeInfo splittingNode = splittingNodes.get(timedOutSplitId);
if (splittingNode != null) {
this.splittingNodes.remove(timedOutSplitId);
this.continueAttemptToSplit(splittingNode.activeLearningNode, splittingNode.foundNode);
}
}
// Receive a new instance from source
if (event instanceof InstancesContentEvent) {
InstancesContentEvent instancesEvent = (InstancesContentEvent) event;
this.processInstanceContentEvent(instancesEvent);
// Send information to local-statistic PI
// for each of the nodes
if (this.foundNodeSet != null) {
for (FoundNode foundNode : this.foundNodeSet) {
ActiveLearningNode leafNode = (ActiveLearningNode) foundNode.getNode();
AttributeBatchContentEvent[] abce = leafNode.getAttributeBatchContentEvent();
if (abce != null) {
for (int i = 0; i < this.dataset.numAttributes() - 1; i++) {
this.sendToAttributeStream(abce[i]);
}
}
leafNode.setAttributeBatchContentEvent(null);
// this.sendToControlStream(event); //split information
// See if we can ask for splits
if (!leafNode.isSplitting()) {
double weightSeen = leafNode.getWeightSeen();
// check whether it is the time for splitting
if (weightSeen - leafNode.getWeightSeenAtLastSplitEvaluation() >= this.gracePeriod) {
attemptToSplit(leafNode, foundNode);
}
}
}
}
this.foundNodeSet = null;
} else if (event instanceof LocalResultContentEvent) {
LocalResultContentEvent lrce = (LocalResultContentEvent) event;
Long lrceSplitId = lrce.getSplitId();
SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId);
if (splittingNodeInfo != null) { // if null, that means
// activeLearningNode has been
// removed by timeout thread
ActiveLearningNode activeLearningNode = splittingNodeInfo.activeLearningNode;
activeLearningNode.addDistributedSuggestions(lrce.getBestSuggestion(), lrce.getSecondBestSuggestion());
if (activeLearningNode.isAllSuggestionsCollected()) {
splittingNodeInfo.scheduledFuture.cancel(false);
this.splittingNodes.remove(lrceSplitId);
this.continueAttemptToSplit(activeLearningNode, splittingNodeInfo.foundNode);
}
}
}
return false;
}