in compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/runtime/SkewRunTimePass.java [112:195]
Pair<PartitionSetProperty, ResourceAntiAffinityProperty> analyzeMessage(final Map<Object, Long> keyToCountMap,
final HashPartitioner partitioner,
final int numOfPartitions,
final int dstParallelism) {
final Map<Integer, Long> partitionKeyToPartitionCount = new HashMap<>();
int lastKey = numOfPartitions - 1;
// Aggregate the counts per each "partition key" assigned by Partitioner.
for (final Map.Entry<Object, Long> entry : keyToCountMap.entrySet()) {
final int partitionKey = partitioner.partition(entry.getKey());
partitionKeyToPartitionCount.compute(partitionKey,
(existPartitionKey, prevCount) -> (prevCount == null) ? entry.getValue() : prevCount + entry.getValue());
}
final List<Long> partitionSizeList = new ArrayList<>(lastKey + 1);
for (int i = 0; i <= lastKey; i++) {
final long countsForKey = partitionKeyToPartitionCount.getOrDefault(i, 0L);
partitionSizeList.add(countsForKey);
}
// Identify skewed sizes, which is top numSkewedKeys number of keys.
final List<Long> topNSizes = getTopNLargeKeySizes(partitionSizeList);
LOG.info("Top {} sizes: {}", numSkewedKeys, topNSizes);
// Calculate the ideal size for each destination task.
final Long totalSize = partitionSizeList.stream().mapToLong(n -> n).sum(); // get total size
final Long idealSizePerTask = totalSize / dstParallelism; // and derive the ideal size per task
int startingKey = 0;
int finishingKey = 1;
Long currentAccumulatedSize = partitionSizeList.get(startingKey);
Long prevAccumulatedSize = 0L;
final ArrayList<KeyRange> keyRanges = new ArrayList<>();
final HashSet<Integer> skewedTaskIndices = new HashSet<>();
for (int dstTaskIndex = 0; dstTaskIndex < dstParallelism; dstTaskIndex++) {
if (dstTaskIndex < (dstParallelism - 1)) {
// Ideal accumulated partition size for this task.
final Long idealAccumulatedSize = idealSizePerTask * (dstTaskIndex + 1);
// By adding partition sizes, find the accumulated size nearest to the given ideal size.
while (currentAccumulatedSize < idealAccumulatedSize) {
currentAccumulatedSize += partitionSizeList.get(finishingKey);
finishingKey++;
}
final Long oneStepBack =
currentAccumulatedSize - partitionSizeList.get(finishingKey - 1);
final Long diffFromIdeal = currentAccumulatedSize - idealAccumulatedSize;
final Long diffFromIdealOneStepBack = idealAccumulatedSize - oneStepBack;
// Go one step back if we came too far.
if (diffFromIdeal > diffFromIdealOneStepBack) {
finishingKey--;
currentAccumulatedSize -= partitionSizeList.get(finishingKey);
}
boolean isSkewedKey = containsSkewedSize(partitionSizeList, topNSizes, startingKey, finishingKey);
if (isSkewedKey) {
skewedTaskIndices.add(dstTaskIndex);
}
keyRanges.add(dstTaskIndex, HashRange.of(startingKey, finishingKey));
LOG.debug("KeyRange {}~{}, Size {}", startingKey, finishingKey - 1,
currentAccumulatedSize - prevAccumulatedSize);
prevAccumulatedSize = currentAccumulatedSize;
startingKey = finishingKey;
} else { // last one: we put the range of the rest.
boolean isSkewedKey = containsSkewedSize(partitionSizeList, topNSizes, startingKey, finishingKey);
if (isSkewedKey) {
skewedTaskIndices.add(dstTaskIndex);
}
keyRanges.add(dstTaskIndex, HashRange.of(startingKey, lastKey + 1));
while (finishingKey <= lastKey) {
currentAccumulatedSize += partitionSizeList.get(finishingKey);
finishingKey++;
}
LOG.debug("KeyRange {}~{}, Size {}", startingKey, lastKey + 1,
currentAccumulatedSize - prevAccumulatedSize);
}
}
return Pair.of(PartitionSetProperty.of(keyRanges), ResourceAntiAffinityProperty.of(skewedTaskIndices));
}