public boolean process()

in samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java [118:181]


  public boolean process(ContentEvent event) {

    // Poll the blocking queue shared between ModelAggregator and the time-out
    // threads
    Long timedOutSplitId = timedOutSplittingNodes.poll();
    if (timedOutSplitId != null) { // time out has been reached!
      SplittingNodeInfo splittingNode = splittingNodes.get(timedOutSplitId);
      if (splittingNode != null) {
        this.splittingNodes.remove(timedOutSplitId);
        this.continueAttemptToSplit(splittingNode.activeLearningNode, splittingNode.foundNode);

      }

    }

    // Receive a new instance from source
    if (event instanceof InstancesContentEvent) {
      InstancesContentEvent instancesEvent = (InstancesContentEvent) event;
      this.processInstanceContentEvent(instancesEvent);
      // Send information to local-statistic PI
      // for each of the nodes
      if (this.foundNodeSet != null) {
        for (FoundNode foundNode : this.foundNodeSet) {
          ActiveLearningNode leafNode = (ActiveLearningNode) foundNode.getNode();
          AttributeBatchContentEvent[] abce = leafNode.getAttributeBatchContentEvent();
          if (abce != null) {
            for (int i = 0; i < this.dataset.numAttributes() - 1; i++) {
              this.sendToAttributeStream(abce[i]);
            }
          }
          leafNode.setAttributeBatchContentEvent(null);
          // this.sendToControlStream(event); //split information
          // See if we can ask for splits
          if (!leafNode.isSplitting()) {
            double weightSeen = leafNode.getWeightSeen();
            // check whether it is the time for splitting
            if (weightSeen - leafNode.getWeightSeenAtLastSplitEvaluation() >= this.gracePeriod) {
              attemptToSplit(leafNode, foundNode);
            }
          }
        }
      }
      this.foundNodeSet = null;
    } else if (event instanceof LocalResultContentEvent) {
      LocalResultContentEvent lrce = (LocalResultContentEvent) event;
      Long lrceSplitId = lrce.getSplitId();
      SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId);

      if (splittingNodeInfo != null) { // if null, that means
        // activeLearningNode has been
        // removed by timeout thread
        ActiveLearningNode activeLearningNode = splittingNodeInfo.activeLearningNode;

        activeLearningNode.addDistributedSuggestions(lrce.getBestSuggestion(), lrce.getSecondBestSuggestion());

        if (activeLearningNode.isAllSuggestionsCollected()) {
          splittingNodeInfo.scheduledFuture.cancel(false);
          this.splittingNodes.remove(lrceSplitId);
          this.continueAttemptToSplit(activeLearningNode, splittingNodeInfo.foundNode);
        }
      }
    }
    return false;
  }