public File createSortedStoreFile()

in oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java [355:554]


    public File createSortedStoreFile() throws IOException {
        int numberOfThreads = 1 + numberOfTransformThreads + 1 + 1; // dump, transform, sort threads, sorted files merge
        ThreadMonitor threadMonitor = new ThreadMonitor();
        var threadFactory = new ThreadMonitor.AutoRegisteringThreadFactory(threadMonitor, new ThreadFactoryBuilder().setDaemon(true).build());
        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfThreads, threadFactory);
        MongoDocumentFilter documentFilter = new MongoDocumentFilter(filteredPath, suffixesToSkip);
        NodeDocumentCodec nodeDocumentCodec = new NodeDocumentCodec(docStore, Collection.NODES, documentFilter, MongoClientSettings.getDefaultCodecRegistry());
        // This executor can wait for several tasks at the same time. We use this below to wait at the same time for
        // all the tasks, so that if one of them fails, we can abort the whole pipeline. Otherwise, if we wait on
        // Future instances, we can only wait on one of them, so that if any of the others fail, we have no easy way
        // to detect this failure.
        ExecutorCompletionService ecs = new ExecutorCompletionService<>(threadPool);
        try {
            // download -> transform thread.
            ArrayBlockingQueue<RawBsonDocument[]> mongoDocQueue = new ArrayBlockingQueue<>(mongoDocQueueSize);

            // transform <-> sort and save threads
            // Queue with empty buffers, used by the transform task
            ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue = new ArrayBlockingQueue<>(nseBuffersCount);
            // Queue with buffers filled by the transform task, used by the sort and save task. +1 for the SENTINEL
            ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new ArrayBlockingQueue<>(nseBuffersCount + 1);

            // Queue between sort-and-save thread and the merge-sorted-files thread
            ArrayBlockingQueue<Path> sortedFilesQueue = new ArrayBlockingQueue<>(64);

            TransformStageStatistics transformStageStatistics = new TransformStageStatistics();

            // Create empty buffers
            for (int i = 0; i < nseBuffersCount; i++) {
                // No limits on the number of entries, only on their total size. This might be revised later.
                emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes, Integer.MAX_VALUE));
            }

            INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
            Stopwatch start = Stopwatch.createStarted();

            Future<PipelinedMongoDownloadTask.Result> downloadFuture = ecs.submit(new PipelinedMongoDownloadTask(
                    mongoClientURI,
                    docStore,
                    (int) (mongoDocBatchMaxSizeMB * FileUtils.ONE_MB),
                    mongoDocBatchMaxNumberOfDocuments,
                    mongoDocQueue,
                    pathFilters,
                    statisticsProvider,
                    indexingReporter,
                    threadFactory
            ));

            ArrayList<Future<PipelinedTransformTask.Result>> transformFutures = new ArrayList<>(numberOfTransformThreads);
            for (int i = 0; i < numberOfTransformThreads; i++) {
                NodeStateEntryWriter entryWriter = new NodeStateEntryWriter(blobStore);
                transformFutures.add(ecs.submit(new PipelinedTransformTask(
                        docStore,
                        documentNodeStore,
                        nodeDocumentCodec,
                        rootRevision,
                        this.getPathPredicate(),
                        entryWriter,
                        mongoDocQueue,
                        emptyBatchesQueue,
                        nonEmptyBatchesQueue,
                        transformStageStatistics
                )));
            }

            Future<PipelinedSortBatchTask.Result> sortBatchFuture = ecs.submit(new PipelinedSortBatchTask(
                    this.getStoreDir().toPath(),
                    pathComparator,
                    this.getAlgorithm(),
                    emptyBatchesQueue,
                    nonEmptyBatchesQueue,
                    sortedFilesQueue,
                    statisticsProvider,
                    indexingReporter
            ));

            PipelinedMergeSortTask mergeSortTask = new PipelinedMergeSortTask(
                    this.getStoreDir().toPath(),
                    pathComparator,
                    this.getAlgorithm(),
                    sortedFilesQueue,
                    statisticsProvider,
                    indexingReporter);

            Future<PipelinedMergeSortTask.Result> mergeSortFuture = ecs.submit(mergeSortTask);

            Path flatFileStore = null;
            try {
                LOG.info("Waiting for tasks to complete");
                int tasksFinished = 0;
                int transformTasksFinished = 0;
                boolean monitorQueues = true;
                threadMonitor.start();
                while (tasksFinished < numberOfThreads) {
                    // Wait with a timeout to print statistics periodically
                    Future<?> completedTask = ecs.poll(60, TimeUnit.SECONDS);
                    if (completedTask == null) {
                        // Timeout waiting for a task to complete
                        if (monitorQueues) {
                            try {
                                threadMonitor.printStatistics();
                                printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, false);
                            } catch (Exception e) {
                                LOG.warn("Error while logging queue sizes", e);
                            }
                            LOG.info("Documents filtered: docsFiltered: {}, longPathsFiltered: {}, filteredRenditionsTotal (top 10): {}",
                                    documentFilter.getSkippedFields(), documentFilter.getLongPathSkipped(), documentFilter.formatTopK(10));
                        }
                    } else {
                        try {
                            Object result = completedTask.get();
                            if (result instanceof PipelinedMongoDownloadTask.Result) {
                                PipelinedMongoDownloadTask.Result downloadResult = (PipelinedMongoDownloadTask.Result) result;
                                LOG.info("Download finished. Documents downloaded: {}", downloadResult.getDocumentsDownloaded());
                                mergeSortTask.stopEagerMerging();
                                downloadFuture = null;

                            } else if (result instanceof PipelinedTransformTask.Result) {
                                PipelinedTransformTask.Result transformResult = (PipelinedTransformTask.Result) result;
                                transformTasksFinished++;
                                nodeStateEntriesExtracted += transformResult.getEntryCount();
                                LOG.info("Transform task {} finished. Entries processed: {}",
                                        transformResult.getThreadId(), transformResult.getEntryCount());
                                if (transformTasksFinished == numberOfTransformThreads) {
                                    LOG.info("All transform tasks finished. Total entries processed: {}", nodeStateEntriesExtracted);
                                    // No need to keep monitoring the queues, the download and transform threads are done.
                                    monitorQueues = false;
                                    // Terminate the sort thread.
                                    nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
                                    transformStageStatistics.publishStatistics(statisticsProvider, indexingReporter);
                                    transformFutures.clear();
                                }

                            } else if (result instanceof PipelinedSortBatchTask.Result) {
                                PipelinedSortBatchTask.Result sortTaskResult = (PipelinedSortBatchTask.Result) result;
                                LOG.info("Sort batch task finished. Entries processed: {}", sortTaskResult.getTotalEntries());
                                sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
                                // The buffers between transform and merge sort tasks are no longer needed, so remove them
                                // from the queues so they can be garbage collected.
                                // These buffers can be very large, so this is important to avoid running out of memory in
                                // the merge-sort phase
                                if (!nonEmptyBatchesQueue.isEmpty()) {
                                    LOG.warn("emptyBatchesQueue is not empty. Size: {}", emptyBatchesQueue.size());
                                }
                                emptyBatchesQueue.clear();
                                printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
                                sortBatchFuture = null;

                            } else if (result instanceof PipelinedMergeSortTask.Result) {
                                PipelinedMergeSortTask.Result mergeSortedFilesTask = (PipelinedMergeSortTask.Result) result;
                                Path ffs = mergeSortedFilesTask.getFlatFileStoreFile();
                                LOG.info("Merge-sort sort task finished. FFS: {}, Size: {}", ffs, humanReadableByteCountBin(Files.size(ffs)));
                                flatFileStore = mergeSortedFilesTask.getFlatFileStoreFile();
                                mergeSortFuture = null;

                            } else {
                                throw new RuntimeException("Unknown result type: " + result);
                            }
                            tasksFinished++;
                        } catch (ExecutionException ex) {
                            throw new RuntimeException(ex.getCause());
                        } catch (Throwable ex) {
                            throw new RuntimeException(ex);
                        }
                    }
                }
                long elapsedSeconds = start.elapsed(TimeUnit.SECONDS);
                INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:END] Metrics: {}", MetricsFormatter.newBuilder()
                        .add("duration", FormattingUtils.formatToSeconds(elapsedSeconds))
                        .add("durationSeconds", elapsedSeconds)
                        .add("nodeStateEntriesExtracted", nodeStateEntriesExtracted)
                        .build());
                indexingReporter.addTiming("Build FFS (Dump+Merge)", FormattingUtils.formatToSeconds(elapsedSeconds));
                // Unique heading to make it easier to find in the logs
                threadMonitor.printStatistics("Final Thread/Memory report");

                LOG.info("Documents filtered: docsFiltered: {}, longPathsFiltered: {}, filteredRenditionsTotal (top 10): {}",
                        documentFilter.getSkippedFields(), documentFilter.getLongPathSkipped(), documentFilter.formatTopK(10));

                LOG.info("[INDEXING_REPORT:BUILD_FFS]\n{}", indexingReporter.generateReport());
            } catch (Throwable e) {
                INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:FAIL] Metrics: {}, Error: {}",
                        MetricsFormatter.createMetricsWithDurationOnly(start), e.toString()
                );
                LOG.warn("Error dumping from MongoDB. Cancelling all tasks. Error: {}", e.toString());
                // Cancel in order
                cancelFuture(downloadFuture);
                for (Future<?> transformTask : transformFutures) {
                    cancelFuture(transformTask);
                }
                cancelFuture(sortBatchFuture);
                cancelFuture(mergeSortFuture);
                throw new RuntimeException(e);
            }
            return flatFileStore.toFile();
        } finally {
            LOG.info("Shutting down build FFS thread pool");
            new ExecutorCloser(threadPool).close();
        }
    }