in oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreStrategy.java [343:509]
public File createSortedStoreFile() throws IOException {
int numberOfThreads = 1 + numberOfTransformThreads + 1; // dump, transform, sort threads
ExecutorService threadPool = Executors.newFixedThreadPool(numberOfThreads,
new ThreadFactoryBuilder().setDaemon(true).build()
);
MongoDocumentFilter documentFilter = new MongoDocumentFilter(filteredPath, suffixesToSkip);
NodeDocumentCodec nodeDocumentCodec = new NodeDocumentCodec(docStore, Collection.NODES, documentFilter, MongoClientSettings.getDefaultCodecRegistry());
// This executor can wait for several tasks at the same time. We use this below to wait at the same time for
// all the tasks, so that if one of them fails, we can abort the whole pipeline. Otherwise, if we wait on
// Future instances, we can only wait on one of them, so that if any of the others fail, we have no easy way
// to detect this failure.
@SuppressWarnings("rawtypes")
ExecutorCompletionService ecs = new ExecutorCompletionService<>(threadPool);
File resultDir = getStoreDir();
TreeStore treeStore = new TreeStore("dump", resultDir, null, 1);
treeStore.getSession().init();
try {
// download -> transform thread.
ArrayBlockingQueue<RawBsonDocument[]> mongoDocQueue = new ArrayBlockingQueue<>(mongoDocQueueSize);
// transform <-> sort and save threads
// Queue with empty buffers, used by the transform task
ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue = new ArrayBlockingQueue<>(nseBuffersCount);
// Queue with buffers filled by the transform task, used by the sort and save task. +1 for the SENTINEL
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new ArrayBlockingQueue<>(nseBuffersCount + 1);
TransformStageStatistics transformStageStatistics = new TransformStageStatistics();
// Create empty buffers
for (int i = 0; i < nseBuffersCount; i++) {
// No limits on the number of entries, only on their total size. This might be revised later.
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes, Integer.MAX_VALUE));
}
INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build TreeStore with minModified {}", minModified);
Stopwatch start = Stopwatch.createStarted();
@SuppressWarnings("unchecked")
Future<PipelinedMongoDownloadTask.Result> downloadFuture = ecs.submit(new PipelinedMongoDownloadTask(
mongoClientURI,
docStore,
(int) (mongoDocBatchMaxSizeMB * FileUtils.ONE_MB),
mongoDocBatchMaxNumberOfDocuments,
mongoDocQueue,
pathFilters,
statisticsProvider,
indexingReporter,
new ThreadFactoryBuilder().setDaemon(true).build(),
minModified
));
ArrayList<Future<PipelinedTransformTask.Result>> transformFutures = new ArrayList<>(numberOfTransformThreads);
for (int i = 0; i < numberOfTransformThreads; i++) {
NodeStateEntryWriter entryWriter = new NodeStateEntryWriter(blobStore);
@SuppressWarnings("unchecked")
Future<PipelinedTransformTask.Result> future = ecs.submit(new PipelinedTransformTask(
docStore,
documentNodeStore,
nodeDocumentCodec,
rootRevision,
this.getPathPredicate(),
entryWriter,
mongoDocQueue,
emptyBatchesQueue,
nonEmptyBatchesQueue,
transformStageStatistics
));
transformFutures.add(future);
}
@SuppressWarnings("unchecked")
Future<PipelinedSortBatchTask.Result> sortBatchFuture = ecs.submit(new PipelinedTreeStoreTask(
treeStore,
emptyBatchesQueue,
nonEmptyBatchesQueue,
statisticsProvider,
indexingReporter
));
try {
int tasksFinished = 0;
int transformTasksFinished = 0;
boolean monitorQueues = true;
while (tasksFinished < numberOfThreads) {
// Wait with a timeout to print statistics periodically
Future<?> completedTask = ecs.poll(30, TimeUnit.SECONDS);
if (completedTask == null) {
// Timeout waiting for a task to complete
if (monitorQueues) {
try {
printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, transformStageStatistics, false);
} catch (Exception e) {
LOG.warn("Error while logging queue sizes", e);
}
}
} else {
try {
Object result = completedTask.get();
if (result instanceof PipelinedMongoDownloadTask.Result) {
PipelinedMongoDownloadTask.Result downloadResult = (PipelinedMongoDownloadTask.Result) result;
downloadFuture = null;
} else if (result instanceof PipelinedTransformTask.Result) {
PipelinedTransformTask.Result transformResult = (PipelinedTransformTask.Result) result;
transformTasksFinished++;
nodeStateEntriesExtracted += transformResult.getEntryCount();
if (transformTasksFinished == numberOfTransformThreads) {
// No need to keep monitoring the queues, the download and transform threads are done.
monitorQueues = false;
// Terminate the sort thread.
nonEmptyBatchesQueue.put(PipelinedStrategy.SENTINEL_NSE_BUFFER);
transformStageStatistics.publishStatistics(statisticsProvider, indexingReporter);
transformFutures.clear();
}
} else if (result instanceof PipelinedSortBatchTask.Result) {
PipelinedSortBatchTask.Result sortTaskResult = (PipelinedSortBatchTask.Result) result;
// The buffers between transform and merge sort tasks are no longer needed, so remove them
// from the queues so they can be garbage collected.
// These buffers can be very large, so this is important to avoid running out of memory in
// the merge-sort phase
if (!nonEmptyBatchesQueue.isEmpty()) {
LOG.warn("emptyBatchesQueue is not empty. Size: {}", emptyBatchesQueue.size());
}
emptyBatchesQueue.clear();
printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, transformStageStatistics, true);
sortBatchFuture = null;
} else {
throw new RuntimeException("Unknown result type: " + result);
}
tasksFinished++;
} catch (ExecutionException ex) {
throw new RuntimeException(ex.getCause());
} catch (Throwable ex) {
throw new RuntimeException(ex);
}
}
}
long elapsedSeconds = start.elapsed(TimeUnit.SECONDS);
INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:END] Metrics: {}", MetricsFormatter.newBuilder()
.add("duration", FormattingUtils.formatToSeconds(elapsedSeconds))
.add("durationSeconds", elapsedSeconds)
.add("nodeStateEntriesExtracted", nodeStateEntriesExtracted)
.build());
indexingReporter.addTiming("Build TreeStore (Dump+Merge)", FormattingUtils.formatToSeconds(elapsedSeconds));
LOG.info("[INDEXING_REPORT:BUILD_TREE_STORE]\n{}", indexingReporter.generateReport());
} catch (Throwable e) {
INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:FAIL] Metrics: {}, Error: {}",
MetricsFormatter.createMetricsWithDurationOnly(start), e.toString()
);
LOG.warn("Error dumping from MongoDB. Cancelling all tasks. Error: {}", e.toString());
// Cancel in order
cancelFuture(downloadFuture);
for (Future<?> transformTask : transformFutures) {
cancelFuture(transformTask);
}
cancelFuture(sortBatchFuture);
throw new RuntimeException(e);
}
treeStore.close();
return resultDir;
} finally {
new ExecutorCloser(threadPool).close();
}
}