Pair analyzeMessage()

in compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/SkewRunTimePass.java [112:195]


  Pair<PartitionSetProperty, ResourceAntiAffinityProperty> analyzeMessage(final Map<Object, Long> keyToCountMap,
                                                                          final HashPartitioner partitioner,
                                                                          final int numOfPartitions,
                                                                          final int dstParallelism) {
    final Map<Integer, Long> partitionKeyToPartitionCount = new HashMap<>();
    int lastKey = numOfPartitions - 1;
    // Aggregate the counts per each "partition key" assigned by Partitioner.

    for (final Map.Entry<Object, Long> entry : keyToCountMap.entrySet()) {
      final int partitionKey = partitioner.partition(entry.getKey());
      partitionKeyToPartitionCount.compute(partitionKey,
        (existPartitionKey, prevCount) -> (prevCount == null) ? entry.getValue() : prevCount + entry.getValue());
    }

    final List<Long> partitionSizeList = new ArrayList<>(lastKey + 1);
    for (int i = 0; i <= lastKey; i++) {
      final long countsForKey = partitionKeyToPartitionCount.getOrDefault(i, 0L);
      partitionSizeList.add(countsForKey);
    }

    // Identify skewed sizes, which is top numSkewedKeys number of keys.
    final List<Long> topNSizes = getTopNLargeKeySizes(partitionSizeList);
    LOG.info("Top {} sizes: {}", numSkewedKeys, topNSizes);

    // Calculate the ideal size for each destination task.
    final Long totalSize = partitionSizeList.stream().mapToLong(n -> n).sum(); // get total size
    final Long idealSizePerTask = totalSize / dstParallelism; // and derive the ideal size per task

    int startingKey = 0;
    int finishingKey = 1;
    Long currentAccumulatedSize = partitionSizeList.get(startingKey);
    Long prevAccumulatedSize = 0L;
    final ArrayList<KeyRange> keyRanges = new ArrayList<>();

    final HashSet<Integer> skewedTaskIndices = new HashSet<>();
    for (int dstTaskIndex = 0; dstTaskIndex < dstParallelism; dstTaskIndex++) {
      if (dstTaskIndex < (dstParallelism - 1)) {
        // Ideal accumulated partition size for this task.
        final Long idealAccumulatedSize = idealSizePerTask * (dstTaskIndex + 1);
        // By adding partition sizes, find the accumulated size nearest to the given ideal size.
        while (currentAccumulatedSize < idealAccumulatedSize) {
          currentAccumulatedSize += partitionSizeList.get(finishingKey);
          finishingKey++;
        }

        final Long oneStepBack =
          currentAccumulatedSize - partitionSizeList.get(finishingKey - 1);
        final Long diffFromIdeal = currentAccumulatedSize - idealAccumulatedSize;
        final Long diffFromIdealOneStepBack = idealAccumulatedSize - oneStepBack;
        // Go one step back if we came too far.
        if (diffFromIdeal > diffFromIdealOneStepBack) {
          finishingKey--;
          currentAccumulatedSize -= partitionSizeList.get(finishingKey);
        }

        boolean isSkewedKey = containsSkewedSize(partitionSizeList, topNSizes, startingKey, finishingKey);
        if (isSkewedKey) {
          skewedTaskIndices.add(dstTaskIndex);
        }
        keyRanges.add(dstTaskIndex, HashRange.of(startingKey, finishingKey));
        LOG.debug("KeyRange {}~{}, Size {}", startingKey, finishingKey - 1,
          currentAccumulatedSize - prevAccumulatedSize);

        prevAccumulatedSize = currentAccumulatedSize;
        startingKey = finishingKey;
      } else { // last one: we put the range of the rest.
        boolean isSkewedKey = containsSkewedSize(partitionSizeList, topNSizes, startingKey, finishingKey);
        if (isSkewedKey) {
          skewedTaskIndices.add(dstTaskIndex);
        }
        keyRanges.add(dstTaskIndex, HashRange.of(startingKey, lastKey + 1));

        while (finishingKey <= lastKey) {
          currentAccumulatedSize += partitionSizeList.get(finishingKey);
          finishingKey++;
        }
        LOG.debug("KeyRange {}~{}, Size {}", startingKey, lastKey + 1,
          currentAccumulatedSize - prevAccumulatedSize);
      }

    }

    return Pair.of(PartitionSetProperty.of(keyRanges), ResourceAntiAffinityProperty.of(skewedTaskIndices));
  }