in oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java [355:554]
public File createSortedStoreFile() throws IOException {
int numberOfThreads = 1 + numberOfTransformThreads + 1 + 1; // dump, transform, sort threads, sorted files merge
ThreadMonitor threadMonitor = new ThreadMonitor();
var threadFactory = new ThreadMonitor.AutoRegisteringThreadFactory(threadMonitor, new ThreadFactoryBuilder().setDaemon(true).build());
ExecutorService threadPool = Executors.newFixedThreadPool(numberOfThreads, threadFactory);
MongoDocumentFilter documentFilter = new MongoDocumentFilter(filteredPath, suffixesToSkip);
NodeDocumentCodec nodeDocumentCodec = new NodeDocumentCodec(docStore, Collection.NODES, documentFilter, MongoClientSettings.getDefaultCodecRegistry());
// This executor can wait for several tasks at the same time. We use this below to wait at the same time for
// all the tasks, so that if one of them fails, we can abort the whole pipeline. Otherwise, if we wait on
// Future instances, we can only wait on one of them, so that if any of the others fail, we have no easy way
// to detect this failure.
ExecutorCompletionService ecs = new ExecutorCompletionService<>(threadPool);
try {
// download -> transform thread.
ArrayBlockingQueue<RawBsonDocument[]> mongoDocQueue = new ArrayBlockingQueue<>(mongoDocQueueSize);
// transform <-> sort and save threads
// Queue with empty buffers, used by the transform task
ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue = new ArrayBlockingQueue<>(nseBuffersCount);
// Queue with buffers filled by the transform task, used by the sort and save task. +1 for the SENTINEL
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new ArrayBlockingQueue<>(nseBuffersCount + 1);
// Queue between sort-and-save thread and the merge-sorted-files thread
ArrayBlockingQueue<Path> sortedFilesQueue = new ArrayBlockingQueue<>(64);
TransformStageStatistics transformStageStatistics = new TransformStageStatistics();
// Create empty buffers
for (int i = 0; i < nseBuffersCount; i++) {
// No limits on the number of entries, only on their total size. This might be revised later.
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes, Integer.MAX_VALUE));
}
INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build FFS");
Stopwatch start = Stopwatch.createStarted();
Future<PipelinedMongoDownloadTask.Result> downloadFuture = ecs.submit(new PipelinedMongoDownloadTask(
mongoClientURI,
docStore,
(int) (mongoDocBatchMaxSizeMB * FileUtils.ONE_MB),
mongoDocBatchMaxNumberOfDocuments,
mongoDocQueue,
pathFilters,
statisticsProvider,
indexingReporter,
threadFactory
));
ArrayList<Future<PipelinedTransformTask.Result>> transformFutures = new ArrayList<>(numberOfTransformThreads);
for (int i = 0; i < numberOfTransformThreads; i++) {
NodeStateEntryWriter entryWriter = new NodeStateEntryWriter(blobStore);
transformFutures.add(ecs.submit(new PipelinedTransformTask(
docStore,
documentNodeStore,
nodeDocumentCodec,
rootRevision,
this.getPathPredicate(),
entryWriter,
mongoDocQueue,
emptyBatchesQueue,
nonEmptyBatchesQueue,
transformStageStatistics
)));
}
Future<PipelinedSortBatchTask.Result> sortBatchFuture = ecs.submit(new PipelinedSortBatchTask(
this.getStoreDir().toPath(),
pathComparator,
this.getAlgorithm(),
emptyBatchesQueue,
nonEmptyBatchesQueue,
sortedFilesQueue,
statisticsProvider,
indexingReporter
));
PipelinedMergeSortTask mergeSortTask = new PipelinedMergeSortTask(
this.getStoreDir().toPath(),
pathComparator,
this.getAlgorithm(),
sortedFilesQueue,
statisticsProvider,
indexingReporter);
Future<PipelinedMergeSortTask.Result> mergeSortFuture = ecs.submit(mergeSortTask);
Path flatFileStore = null;
try {
LOG.info("Waiting for tasks to complete");
int tasksFinished = 0;
int transformTasksFinished = 0;
boolean monitorQueues = true;
threadMonitor.start();
while (tasksFinished < numberOfThreads) {
// Wait with a timeout to print statistics periodically
Future<?> completedTask = ecs.poll(60, TimeUnit.SECONDS);
if (completedTask == null) {
// Timeout waiting for a task to complete
if (monitorQueues) {
try {
threadMonitor.printStatistics();
printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, false);
} catch (Exception e) {
LOG.warn("Error while logging queue sizes", e);
}
LOG.info("Documents filtered: docsFiltered: {}, longPathsFiltered: {}, filteredRenditionsTotal (top 10): {}",
documentFilter.getSkippedFields(), documentFilter.getLongPathSkipped(), documentFilter.formatTopK(10));
}
} else {
try {
Object result = completedTask.get();
if (result instanceof PipelinedMongoDownloadTask.Result) {
PipelinedMongoDownloadTask.Result downloadResult = (PipelinedMongoDownloadTask.Result) result;
LOG.info("Download finished. Documents downloaded: {}", downloadResult.getDocumentsDownloaded());
mergeSortTask.stopEagerMerging();
downloadFuture = null;
} else if (result instanceof PipelinedTransformTask.Result) {
PipelinedTransformTask.Result transformResult = (PipelinedTransformTask.Result) result;
transformTasksFinished++;
nodeStateEntriesExtracted += transformResult.getEntryCount();
LOG.info("Transform task {} finished. Entries processed: {}",
transformResult.getThreadId(), transformResult.getEntryCount());
if (transformTasksFinished == numberOfTransformThreads) {
LOG.info("All transform tasks finished. Total entries processed: {}", nodeStateEntriesExtracted);
// No need to keep monitoring the queues, the download and transform threads are done.
monitorQueues = false;
// Terminate the sort thread.
nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
transformStageStatistics.publishStatistics(statisticsProvider, indexingReporter);
transformFutures.clear();
}
} else if (result instanceof PipelinedSortBatchTask.Result) {
PipelinedSortBatchTask.Result sortTaskResult = (PipelinedSortBatchTask.Result) result;
LOG.info("Sort batch task finished. Entries processed: {}", sortTaskResult.getTotalEntries());
sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
// The buffers between transform and merge sort tasks are no longer needed, so remove them
// from the queues so they can be garbage collected.
// These buffers can be very large, so this is important to avoid running out of memory in
// the merge-sort phase
if (!nonEmptyBatchesQueue.isEmpty()) {
LOG.warn("emptyBatchesQueue is not empty. Size: {}", emptyBatchesQueue.size());
}
emptyBatchesQueue.clear();
printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
sortBatchFuture = null;
} else if (result instanceof PipelinedMergeSortTask.Result) {
PipelinedMergeSortTask.Result mergeSortedFilesTask = (PipelinedMergeSortTask.Result) result;
Path ffs = mergeSortedFilesTask.getFlatFileStoreFile();
LOG.info("Merge-sort sort task finished. FFS: {}, Size: {}", ffs, humanReadableByteCountBin(Files.size(ffs)));
flatFileStore = mergeSortedFilesTask.getFlatFileStoreFile();
mergeSortFuture = null;
} else {
throw new RuntimeException("Unknown result type: " + result);
}
tasksFinished++;
} catch (ExecutionException ex) {
throw new RuntimeException(ex.getCause());
} catch (Throwable ex) {
throw new RuntimeException(ex);
}
}
}
long elapsedSeconds = start.elapsed(TimeUnit.SECONDS);
INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(elapsedSeconds))
.add("durationSeconds", elapsedSeconds)
.add("nodeStateEntriesExtracted", nodeStateEntriesExtracted)
.build());
indexingReporter.addTiming("Build FFS (Dump+Merge)", FormattingUtils.formatToSeconds(elapsedSeconds));
// Unique heading to make it easier to find in the logs
threadMonitor.printStatistics("Final Thread/Memory report");
LOG.info("Documents filtered: docsFiltered: {}, longPathsFiltered: {}, filteredRenditionsTotal (top 10): {}",
documentFilter.getSkippedFields(), documentFilter.getLongPathSkipped(), documentFilter.formatTopK(10));
LOG.info("[INDEXING_REPORT:BUILD_FFS]\n{}", indexingReporter.generateReport());
} catch (Throwable e) {
INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:FAIL] Metrics: {}, Error: {}",
MetricsFormatter.createMetricsWithDurationOnly(start), e.toString()
);
LOG.warn("Error dumping from MongoDB. Cancelling all tasks. Error: {}", e.toString());
// Cancel in order
cancelFuture(downloadFuture);
for (Future<?> transformTask : transformFutures) {
cancelFuture(transformTask);
}
cancelFuture(sortBatchFuture);
cancelFuture(mergeSortFuture);
throw new RuntimeException(e);
}
return flatFileStore.toFile();
} finally {
LOG.info("Shutting down build FFS thread pool");
new ExecutorCloser(threadPool).close();
}
}