private ExternalSorter doBuild()

in flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorterBuilder.java [244:450]


    private ExternalSorter<T> doBuild(ReadingStageFactory<T> readingStageFactory)
            throws MemoryAllocationException {

        final List<MemorySegment> memory;
        if (this.memorySegments != null) {
            memory = this.memorySegments;
        } else {
            memory =
                    memoryManager.allocatePages(
                            parentTask, memoryManager.computeNumberOfPages(memoryFraction));
        }

        // adjust the memory quotas to the page size
        final int numPagesTotal = memory.size();

        if (numPagesTotal < MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS) {
            throw new IllegalArgumentException(
                    "Too little memory provided to sorter to perform task. "
                            + "Required are at least "
                            + (MIN_NUM_WRITE_BUFFERS + MIN_NUM_SORT_MEM_SEGMENTS)
                            + " pages. Current page size is "
                            + memoryManager.getPageSize()
                            + " bytes.");
        }

        // determine how many buffers to use for writing
        final int numWriteBuffers;
        final int numLargeRecordBuffers;

        if (noSpillingMemory && !handleLargeRecords) {
            numWriteBuffers = 0;
            numLargeRecordBuffers = 0;
        } else {
            int numConsumers = (noSpillingMemory ? 0 : 1) + (handleLargeRecords ? 2 : 0);

            // determine how many buffers we have when we do a full merge with maximal fan-in
            final int minBuffersForMerging =
                    maxNumFileHandles + numConsumers * MIN_NUM_WRITE_BUFFERS;

            if (minBuffersForMerging > numPagesTotal) {
                numWriteBuffers = noSpillingMemory ? 0 : MIN_NUM_WRITE_BUFFERS;
                numLargeRecordBuffers = handleLargeRecords ? 2 * MIN_NUM_WRITE_BUFFERS : 0;

                maxNumFileHandles = numPagesTotal - numConsumers * MIN_NUM_WRITE_BUFFERS;
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                            "Reducing maximal merge fan-in to "
                                    + maxNumFileHandles
                                    + " due to limited memory availability during merge");
                }
            } else {
                // we are free to choose. make sure that we do not eat up too much memory for
                // writing
                final int fractionalAuxBuffers = numPagesTotal / (numConsumers * 100);

                if (fractionalAuxBuffers >= MAX_NUM_WRITE_BUFFERS) {
                    numWriteBuffers = noSpillingMemory ? 0 : MAX_NUM_WRITE_BUFFERS;
                    numLargeRecordBuffers = handleLargeRecords ? 2 * MAX_NUM_WRITE_BUFFERS : 0;
                } else {
                    numWriteBuffers =
                            noSpillingMemory
                                    ? 0
                                    : Math.max(
                                            MIN_NUM_WRITE_BUFFERS,
                                            fractionalAuxBuffers); // at least the lower bound

                    numLargeRecordBuffers = handleLargeRecords ? 2 * MIN_NUM_WRITE_BUFFERS : 0;
                }
            }
        }

        final int sortMemPages = numPagesTotal - numWriteBuffers - numLargeRecordBuffers;
        final long sortMemory = ((long) sortMemPages) * memoryManager.getPageSize();

        // decide how many sort buffers to use
        if (numSortBuffers < 1) {
            if (sortMemory > 100 * 1024 * 1024) {
                numSortBuffers = 2;
            } else {
                numSortBuffers = 1;
            }
        }
        final int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;

        LOG.debug(
                String.format(
                        "Instantiating sorter with %d pages of sorting memory (="
                                + "%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d"
                                + " buffers for writing sorted results and merging maximally %d streams at once. "
                                + "Using %d memory segments for large record spilling.",
                        sortMemPages,
                        sortMemory,
                        numSortBuffers,
                        numSegmentsPerSortBuffer,
                        numWriteBuffers,
                        maxNumFileHandles,
                        numLargeRecordBuffers));

        List<MemorySegment> writeMemory = new ArrayList<>(numWriteBuffers);

        LargeRecordHandler<T> largeRecordHandler;
        // move some pages from the sort memory to the write memory
        if (numWriteBuffers > 0) {
            for (int i = 0; i < numWriteBuffers; i++) {
                writeMemory.add(memory.remove(memory.size() - 1));
            }
        }
        if (numLargeRecordBuffers > 0) {
            List<MemorySegment> mem = new ArrayList<>();
            for (int i = 0; i < numLargeRecordBuffers; i++) {
                mem.add(memory.remove(memory.size() - 1));
            }

            largeRecordHandler =
                    new LargeRecordHandler<>(
                            serializer,
                            comparator.duplicate(),
                            ioManager,
                            memoryManager,
                            mem,
                            parentTask,
                            maxNumFileHandles,
                            executionConfig);
        } else {
            largeRecordHandler = null;
        }

        // circular queues pass buffers between the threads
        final CircularQueues<T> circularQueues = new CircularQueues<>();

        final List<InMemorySorter<T>> inMemorySorters = new ArrayList<>(numSortBuffers);

        // allocate the sort buffers and fill empty queue with them
        final Iterator<MemorySegment> segments = memory.iterator();
        for (int i = 0; i < numSortBuffers; i++) {
            // grab some memory
            final List<MemorySegment> sortSegments = new ArrayList<>(numSegmentsPerSortBuffer);
            for (int k = (i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer);
                    k > 0 && segments.hasNext();
                    k--) {
                sortSegments.add(segments.next());
            }

            final InMemorySorter<T> inMemorySorter = inMemorySorterFactory.create(sortSegments);
            inMemorySorters.add(inMemorySorter);

            // add to empty queue
            CircularElement<T> element = new CircularElement<>(i, inMemorySorter, sortSegments);
            circularQueues.send(StageRunner.SortStage.READ, element);
        }

        // exception handling
        ExceptionHandler<IOException> exceptionHandler =
                exception -> circularQueues.getIteratorFuture().completeExceptionally(exception);

        SpillChannelManager spillChannelManager = new SpillChannelManager();

        // start the thread that reads the input channels
        StageRunner readingThread =
                readingStageFactory.createReadingStage(
                        exceptionHandler,
                        circularQueues,
                        largeRecordHandler,
                        ((long) (startSpillingFraction * sortMemory)));

        // start the thread that sorts the buffers
        StageRunner sortingStage = new SortingThread<>(exceptionHandler, circularQueues);

        // start the thread that handles spilling to secondary storage
        final SpillingThread.SpillingBehaviour<T> spillingBehaviour;
        if (combineFunction != null) {
            spillingBehaviour =
                    new CombiningSpillingBehaviour<>(
                            combineFunction, serializer, comparator, objectReuseEnabled, udfConfig);
        } else {
            spillingBehaviour = new DefaultSpillingBehaviour<>(objectReuseEnabled, serializer);
        }

        StageRunner spillingStage =
                new SpillingThread<>(
                        exceptionHandler,
                        circularQueues,
                        memoryManager,
                        ioManager,
                        serializer,
                        comparator,
                        memory,
                        writeMemory,
                        maxNumFileHandles,
                        spillChannelManager,
                        largeRecordHandler,
                        spillingBehaviour,
                        MIN_NUM_WRITE_BUFFERS,
                        MAX_NUM_WRITE_BUFFERS);

        return new ExternalSorter<>(
                readingThread,
                sortingStage,
                spillingStage,
                memory,
                writeMemory,
                memoryManager,
                largeRecordHandler,
                spillChannelManager,
                inMemorySorters,
                circularQueues);
    }