tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java [240:310]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
      iterations++;
      int numFullGroupsCreated = 0;
      for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
        group.clear();
        groupLocationSet.clear();
        String location = entry.getKey();
        LocationHolder holder = entry.getValue();
        SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
        if (splitHolder == null) {
          // all splits on node processed
          continue;
        }
        int oldHeadIndex = holder.headIndex;
        long groupLength = 0;
        int groupNumSplits = 0;
        do {
          group.add(splitHolder);
          groupLength += splitHolder.split.getLength();
          groupNumSplits++;
          holder.incrementHeadIndex();
          splitHolder = holder.getUnprocessedHeadSplit();
        } while(splitHolder != null  
            && (!groupByLength || 
                (groupLength + splitHolder.split.getLength() <= lengthPerGroup))
            && (!groupByCount || 
                (groupNumSplits + 1 <= numSplitsInGroup)));

        if (holder.isEmpty() 
            && !allowSmallGroups
            && (!groupByLength || groupLength < lengthPerGroup/2)
            && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
          // group too small, reset it
          holder.headIndex = oldHeadIndex;
          continue;
        }
        
        numFullGroupsCreated++;

        // One split group created
        String[] groupLocation = {location};
        if (location == emptyLocation) {
          groupLocation = null;
        } else if (doingRackLocal) {
          for (SplitHolder splitH : group) {
            String[] locations = splitH.split.getLocations();
            if (locations != null) {
              for (String loc : locations) {
                if (loc != null) {
                  groupLocationSet.add(loc);
                }
              }
            }
          }
          groupLocation = groupLocationSet.toArray(groupLocation);
        }
        TezGroupedSplit groupedSplit = 
            new TezGroupedSplit(group.size(), wrappedInputFormatName, 
                groupLocation, 
                // pass rack local hint directly to AM
                ((doingRackLocal && location != emptyLocation)?location:null));
        for (SplitHolder groupedSplitHolder : group) {
          groupedSplit.addSplit(groupedSplitHolder.split);
          Preconditions.checkState(groupedSplitHolder.isProcessed == false, 
              "Duplicates in grouping at location: " + location);
          groupedSplitHolder.isProcessed = true;
          splitsProcessed++;
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("Grouped " + group.size()
              + " length: " + groupedSplit.getLength()
              + " split at: " + location);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java [237:307]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
      iterations++;
      int numFullGroupsCreated = 0;
      for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {
        group.clear();
        groupLocationSet.clear();
        String location = entry.getKey();
        LocationHolder holder = entry.getValue();
        SplitHolder splitHolder = holder.getUnprocessedHeadSplit();
        if (splitHolder == null) {
          // all splits on node processed
          continue;
        }
        int oldHeadIndex = holder.headIndex;
        long groupLength = 0;
        int groupNumSplits = 0;
        do {
          group.add(splitHolder);
          groupLength += splitHolder.split.getLength();
          groupNumSplits++;
          holder.incrementHeadIndex();
          splitHolder = holder.getUnprocessedHeadSplit();
        } while(splitHolder != null  
            && (!groupByLength || 
                (groupLength + splitHolder.split.getLength() <= lengthPerGroup))
            && (!groupByCount || 
                (groupNumSplits + 1 <= numSplitsInGroup)));

        if (holder.isEmpty() 
            && !allowSmallGroups
            && (!groupByLength || groupLength < lengthPerGroup/2)
            && (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {
          // group too small, reset it
          holder.headIndex = oldHeadIndex;
          continue;
        }
        
        numFullGroupsCreated++;

        // One split group created
        String[] groupLocation = {location};
        if (location == emptyLocation) {
          groupLocation = null;
        } else if (doingRackLocal) {
          for (SplitHolder splitH : group) {
            String[] locations = splitH.split.getLocations();
            if (locations != null) {
              for (String loc : locations) {
                if (loc != null) {
                  groupLocationSet.add(loc);
                }
              }
            }
          }
          groupLocation = groupLocationSet.toArray(groupLocation);
        }
        TezGroupedSplit groupedSplit = 
            new TezGroupedSplit(group.size(), wrappedInputFormatName, 
                groupLocation,
                // pass rack local hint directly to AM
                ((doingRackLocal && location != emptyLocation)?location:null));
        for (SplitHolder groupedSplitHolder : group) {
          groupedSplit.addSplit(groupedSplitHolder.split);
          Preconditions.checkState(groupedSplitHolder.isProcessed == false,
              "Duplicates in grouping at location: " + location);
          groupedSplitHolder.isProcessed = true;
          splitsProcessed++;
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("Grouped " + group.size()
              + " length: " + groupedSplit.getLength()
              + " split at: " + location);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



