in flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorterBuilder.java [244:450]
private ExternalSorter<T> doBuild(ReadingStageFactory<T> readingStageFactory)
throws MemoryAllocationException {
final List<MemorySegment> memory;
if (this.memorySegments != null) {
memory = this.memorySegments;
} else {
memory =
memoryManager.allocatePages(
parentTask, memoryManager.computeNumberOfPages(memoryFraction));
}
// adjust the memory quotas to the page size
final int numPagesTotal = memory.size();
if (numPagesTotal < MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS) {
throw new IllegalArgumentException(
"Too little memory provided to sorter to perform task. "
+ "Required are at least "
+ (MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS)
+ " pages. Current page size is "
+ memoryManager.getPageSize()
+ " bytes.");
}
// determine how many buffers to use for writing
final int numWriteBuffers;
final int numLargeRecordBuffers;
if (noSpillingMemory && !handleLargeRecords) {
numWriteBuffers = 0;
numLargeRecordBuffers = 0;
} else {
int numConsumers = (noSpillingMemory ? 0 : 1) + (handleLargeRecords ? 2 : 0);
// determine how many buffers we have when we do a full merge with maximal fan-in
final int minBuffersForMerging =
maxNumFileHandles + numConsumers * MIN_NUM_WRITE_BUFFERS;
if (minBuffersForMerging > numPagesTotal) {
numWriteBuffers = noSpillingMemory ? 0 : MIN_NUM_WRITE_BUFFERS;
numLargeRecordBuffers = handleLargeRecords ? 2 * MIN_NUM_WRITE_BUFFERS : 0;
maxNumFileHandles = numPagesTotal - numConsumers * MIN_NUM_WRITE_BUFFERS;
if (LOG.isDebugEnabled()) {
LOG.debug(
"Reducing maximal merge fan-in to "
+ maxNumFileHandles
+ " due to limited memory availability during merge");
}
} else {
// we are free to choose. make sure that we do not eat up too much memory for
// writing
final int fractionalAuxBuffers = numPagesTotal / (numConsumers * 100);
if (fractionalAuxBuffers >= MAX_NUM_WRITE_BUFFERS) {
numWriteBuffers = noSpillingMemory ? 0 : MAX_NUM_WRITE_BUFFERS;
numLargeRecordBuffers = handleLargeRecords ? 2 * MAX_NUM_WRITE_BUFFERS : 0;
} else {
numWriteBuffers =
noSpillingMemory
? 0
: Math.max(
MIN_NUM_WRITE_BUFFERS,
fractionalAuxBuffers); // at least the lower bound
numLargeRecordBuffers = handleLargeRecords ? 2 * MIN_NUM_WRITE_BUFFERS : 0;
}
}
}
final int sortMemPages = numPagesTotal - numWriteBuffers - numLargeRecordBuffers;
final long sortMemory = ((long) sortMemPages) * memoryManager.getPageSize();
// decide how many sort buffers to use
if (numSortBuffers < 1) {
if (sortMemory > 100 * 1024 * 1024) {
numSortBuffers = 2;
} else {
numSortBuffers = 1;
}
}
final int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;
LOG.debug(
String.format(
"Instantiating sorter with %d pages of sorting memory (="
+ "%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d"
+ " buffers for writing sorted results and merging maximally %d streams at once. "
+ "Using %d memory segments for large record spilling.",
sortMemPages,
sortMemory,
numSortBuffers,
numSegmentsPerSortBuffer,
numWriteBuffers,
maxNumFileHandles,
numLargeRecordBuffers));
List<MemorySegment> writeMemory = new ArrayList<>(numWriteBuffers);
LargeRecordHandler<T> largeRecordHandler;
// move some pages from the sort memory to the write memory
if (numWriteBuffers > 0) {
for (int i = 0; i < numWriteBuffers; i++) {
writeMemory.add(memory.remove(memory.size() - 1));
}
}
if (numLargeRecordBuffers > 0) {
List<MemorySegment> mem = new ArrayList<>();
for (int i = 0; i < numLargeRecordBuffers; i++) {
mem.add(memory.remove(memory.size() - 1));
}
largeRecordHandler =
new LargeRecordHandler<>(
serializer,
comparator.duplicate(),
ioManager,
memoryManager,
mem,
parentTask,
maxNumFileHandles,
executionConfig);
} else {
largeRecordHandler = null;
}
// circular queues pass buffers between the threads
final CircularQueues<T> circularQueues = new CircularQueues<>();
final List<InMemorySorter<T>> inMemorySorters = new ArrayList<>(numSortBuffers);
// allocate the sort buffers and fill empty queue with them
final Iterator<MemorySegment> segments = memory.iterator();
for (int i = 0; i < numSortBuffers; i++) {
// grab some memory
final List<MemorySegment> sortSegments = new ArrayList<>(numSegmentsPerSortBuffer);
for (int k = (i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer);
k > 0 && segments.hasNext();
k--) {
sortSegments.add(segments.next());
}
final InMemorySorter<T> inMemorySorter = inMemorySorterFactory.create(sortSegments);
inMemorySorters.add(inMemorySorter);
// add to empty queue
CircularElement<T> element = new CircularElement<>(i, inMemorySorter, sortSegments);
circularQueues.send(StageRunner.SortStage.READ, element);
}
// exception handling
ExceptionHandler<IOException> exceptionHandler =
exception -> circularQueues.getIteratorFuture().completeExceptionally(exception);
SpillChannelManager spillChannelManager = new SpillChannelManager();
// start the thread that reads the input channels
StageRunner readingThread =
readingStageFactory.createReadingStage(
exceptionHandler,
circularQueues,
largeRecordHandler,
((long) (startSpillingFraction * sortMemory)));
// start the thread that sorts the buffers
StageRunner sortingStage = new SortingThread<>(exceptionHandler, circularQueues);
// start the thread that handles spilling to secondary storage
final SpillingThread.SpillingBehaviour<T> spillingBehaviour;
if (combineFunction != null) {
spillingBehaviour =
new CombiningSpillingBehaviour<>(
combineFunction, serializer, comparator, objectReuseEnabled, udfConfig);
} else {
spillingBehaviour = new DefaultSpillingBehaviour<>(objectReuseEnabled, serializer);
}
StageRunner spillingStage =
new SpillingThread<>(
exceptionHandler,
circularQueues,
memoryManager,
ioManager,
serializer,
comparator,
memory,
writeMemory,
maxNumFileHandles,
spillChannelManager,
largeRecordHandler,
spillingBehaviour,
MIN_NUM_WRITE_BUFFERS,
MAX_NUM_WRITE_BUFFERS);
return new ExternalSorter<>(
readingThread,
sortingStage,
spillingStage,
memory,
writeMemory,
memoryManager,
largeRecordHandler,
spillChannelManager,
inMemorySorters,
circularQueues);
}