public File createSortedStoreFile()

in oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java [236:422]


    public File createSortedStoreFile() throws IOException {
        int mongoDocBlockQueueSize = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE, DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE);
        Preconditions.checkArgument(mongoDocBlockQueueSize > 0,
                "Invalid value for property " + OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE + ": " + mongoDocBlockQueueSize + ". Must be > 0");

        int mongoBatchSize = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE, DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE);
        Preconditions.checkArgument(mongoBatchSize > 0,
                "Invalid value for property " + OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE + ": " + mongoBatchSize + ". Must be > 0");

        int transformThreads = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_TRANSFORM_THREADS, DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS);
        Preconditions.checkArgument(transformThreads > 0,
                "Invalid value for property " + OAK_INDEXER_PIPELINED_TRANSFORM_THREADS + ": " + transformThreads + ". Must be > 0");

        int workingMemoryMB = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB, DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB);
        Preconditions.checkArgument(workingMemoryMB >= 0,
                "Invalid value for property " + OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB + ": " + workingMemoryMB + ". Must be >= 0");
        if (workingMemoryMB == 0) {
            workingMemoryMB = autodetectWorkingMemoryMB();
        }

        int numberOfThreads = 1 + transformThreads + 1 + 1; // dump, transform, sort threads, sorted files merge
        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfThreads,
                new ThreadFactoryBuilder().setNameFormat("mongo-dump").setDaemon(true).build()
        );
        // This executor can wait for several tasks at the same time. We use this below to wait at the same time for
        // all the tasks, so that if one of them fails, we can abort the whole pipeline. Otherwise, if we wait on
        // Future instances, we can only wait on one of them, so that if any of the others fail, we have no easy way
        // to detect this failure.
        ExecutorCompletionService ecs = new ExecutorCompletionService(threadPool);
        ScheduledExecutorService monitorThreadPool = Executors.newScheduledThreadPool(1,
                new ThreadFactoryBuilder().setNameFormat("monitor").setDaemon(true).build()
        );
        try {
            int numberOfBuffers = 1 + transformThreads;
            // The extra memory arena is to account for the memory taken by the SortKey entries that are part of the
            // entry batches.
            int memoryArenas = numberOfBuffers + 1;
            int memoryForEntriesMB = workingMemoryMB / memoryArenas;
            int maxNumberOfEntries = memoryForEntriesMB * 1024 * 4; // Assuming that 1KB is enough for 4 entries.
            int maxNumberOfEntriesPerBuffer = maxNumberOfEntries / numberOfBuffers;

            // A ByteBuffer can be at most Integer.MAX_VALUE bytes
            int bufferSizeBytes = limitToIntegerRange((workingMemoryMB * FileUtils.ONE_MB) / memoryArenas);
            if (bufferSizeBytes < MIN_ENTRY_BATCH_BUFFER_SIZE_MB * FileUtils.ONE_MB) {
                throw new IllegalArgumentException("Entry batch buffer size too small: " + bufferSizeBytes +
                        " bytes. Must be at least " + MIN_ENTRY_BATCH_BUFFER_SIZE_MB + " MB. " +
                        "To increase the size of the buffers, either increase the size of the working memory region " +
                        "(system property" + OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB + ") or decrease the number of transform " +
                        "threads (" + OAK_INDEXER_PIPELINED_TRANSFORM_THREADS + ")");
            }
            LOG.info("Working memory: {} MB, number of buffers: {}, size of each buffer: {} MB, number of entries per buffer: {}",
                    workingMemoryMB, numberOfBuffers, bufferSizeBytes / FileUtils.ONE_MB, maxNumberOfEntriesPerBuffer);

            // download -> transform thread.
            ArrayBlockingQueue<BasicDBObject[]> mongoDocQueue = new ArrayBlockingQueue<>(mongoDocBlockQueueSize);

            // transform <-> sort and save threads
            // Queue with empty buffers, used by the transform task
            ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue = new ArrayBlockingQueue<>(numberOfBuffers);
            // Queue with buffers filled by the transform task, used by the sort and save task. +1 for the SENTINEL
            ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new ArrayBlockingQueue<>(numberOfBuffers + 1);

            // Queue between sort-and-save thread and the merge-sorted-files thread
            ArrayBlockingQueue<File> sortedFilesQueue = new ArrayBlockingQueue<>(64);

            TransformStageStatistics transformStageStatistics = new TransformStageStatistics();

            ScheduledFuture<?> monitorFuture = monitorThreadPool.scheduleWithFixedDelay(
                    new MonitorTask(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics),
                    10, 30, TimeUnit.SECONDS);

            // Create empty buffers
            for (int i = 0; i < numberOfBuffers; i++) {
                emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(bufferSizeBytes, maxNumberOfEntriesPerBuffer));
            }

            Stopwatch start = Stopwatch.createStarted();
            MongoCollection<BasicDBObject> dbCollection = MongoDocumentStoreHelper.getDBCollection(docStore, Collection.NODES);
            PipelinedMongoDownloadTask downloadTask = new PipelinedMongoDownloadTask(dbCollection, mongoBatchSize, mongoDocQueue);
            ecs.submit(downloadTask);

            File flatFileStore = null;

            for (int i = 0; i < transformThreads; i++) {
                NodeStateEntryWriter entryWriter = new NodeStateEntryWriter(blobStore);
                PipelinedTransformTask transformTask = new PipelinedTransformTask(
                        docStore,
                        documentNodeStore,
                        Collection.NODES,
                        rootRevision,
                        pathPredicate,
                        entryWriter,
                        mongoDocQueue,
                        emptyBatchesQueue,
                        nonEmptyBatchesQueue,
                        transformStageStatistics
                );
                ecs.submit(transformTask);
            }

            PipelinedSortBatchTask sortTask = new PipelinedSortBatchTask(
                    storeDir, pathComparator, algorithm, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue
            );
            ecs.submit(sortTask);

            PipelinedMergeSortTask mergeSortTask = new PipelinedMergeSortTask(storeDir, pathComparator, algorithm, sortedFilesQueue);
            ecs.submit(mergeSortTask);


            try {
                LOG.info("Waiting for tasks to complete.");
                int tasksFinished = 0;
                int transformTasksFinished = 0;
                while (tasksFinished < numberOfThreads) {
                    Future<?> completedTask = ecs.take();
                    try {
                        Object result = completedTask.get();
                        if (result instanceof PipelinedMongoDownloadTask.Result) {
                            PipelinedMongoDownloadTask.Result downloadResult = (PipelinedMongoDownloadTask.Result) result;
                            LOG.info("Download task finished. Documents downloaded: {}", downloadResult.getDocumentsDownloaded());
                            // Signal the end of documents to the transform threads.
                            for (int i = 0; i < transformThreads; i++) {
                                mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
                            }

                        } else if (result instanceof PipelinedTransformTask.Result) {
                            PipelinedTransformTask.Result transformResult = (PipelinedTransformTask.Result) result;
                            transformTasksFinished++;
                            entryCount += transformResult.getEntryCount();
                            LOG.info("Transform thread {} finished. Entries processed: {}",
                                    transformResult.getThreadId(), transformResult.getEntryCount());
                            if (transformTasksFinished == transformThreads) {
                                LOG.info("All transform tasks finished. Node states retrieved: {}", entryCount);
                                // No need to keep monitoring the queues, the download and transform threads are done.
                                monitorFuture.cancel(false);
                                // Terminate the sort thread.
                                nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
                            }

                        } else if (result instanceof PipelinedSortBatchTask.Result) {
                            PipelinedSortBatchTask.Result sortTaskResult = (PipelinedSortBatchTask.Result) result;
                            LOG.info("Sort task finished. Entries processed: {}", sortTaskResult.getTotalEntries());
                            printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
                            sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);

                        } else if (result instanceof PipelinedMergeSortTask.Result) {
                            PipelinedMergeSortTask.Result mergeSortedFilesTask = (PipelinedMergeSortTask.Result) result;
                            File ffs = mergeSortedFilesTask.getFlatFileStoreFile();
                            LOG.info("Sort task finished. FFS: {}, Size: {}", ffs, humanReadableByteCountBin(ffs.length()));
                            flatFileStore = mergeSortedFilesTask.getFlatFileStoreFile();

                        } else {
                            throw new RuntimeException("Unknown result type: " + result);
                        }
                        tasksFinished++;
                    } catch (ExecutionException ex) {
                        LOG.warn("Execution error dumping from MongoDB: " + ex + ". Shutting down all threads.");
                        threadPool.shutdownNow();
                        boolean terminated = threadPool.awaitTermination(5, TimeUnit.SECONDS);
                        if (!terminated) {
                            LOG.warn("Thread pool failed to terminate");
                        }
                        throw new RuntimeException(ex.getCause());
                    } catch (Throwable ex) {
                        LOG.warn("Error dumping from MongoDB: " + ex);
                        threadPool.shutdownNow();
                        boolean terminated = threadPool.awaitTermination(5, TimeUnit.SECONDS);
                        if (!terminated) {
                            LOG.warn("Thread pool failed to terminate");
                        }
                        throw new RuntimeException(ex);
                    }
                }
                LOG.info("Dumped {} nodestates in json format in {}", entryCount, start);
                printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                // No longer need to monitor the size of the queues,
                monitorFuture.cancel(true);
            }
            return flatFileStore;
        } finally {
            threadPool.shutdown();
            monitorThreadPool.shutdown();
        }
    }