in oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedStrategy.java [236:422]
public File createSortedStoreFile() throws IOException {
int mongoDocBlockQueueSize = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE, DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE);
Preconditions.checkArgument(mongoDocBlockQueueSize > 0,
"Invalid value for property " + OAK_INDEXER_PIPELINED_MONGO_DOC_QUEUE_SIZE + ": " + mongoDocBlockQueueSize + ". Must be > 0");
int mongoBatchSize = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE, DEFAULT_OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE);
Preconditions.checkArgument(mongoBatchSize > 0,
"Invalid value for property " + OAK_INDEXER_PIPELINED_MONGO_DOC_BATCH_SIZE + ": " + mongoBatchSize + ". Must be > 0");
int transformThreads = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_TRANSFORM_THREADS, DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS);
Preconditions.checkArgument(transformThreads > 0,
"Invalid value for property " + OAK_INDEXER_PIPELINED_TRANSFORM_THREADS + ": " + transformThreads + ". Must be > 0");
int workingMemoryMB = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB, DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB);
Preconditions.checkArgument(workingMemoryMB >= 0,
"Invalid value for property " + OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB + ": " + workingMemoryMB + ". Must be >= 0");
if (workingMemoryMB == 0) {
workingMemoryMB = autodetectWorkingMemoryMB();
}
int numberOfThreads = 1 + transformThreads + 1 + 1; // dump, transform, sort threads, sorted files merge
ExecutorService threadPool = Executors.newFixedThreadPool(numberOfThreads,
new ThreadFactoryBuilder().setNameFormat("mongo-dump").setDaemon(true).build()
);
// This executor can wait for several tasks at the same time. We use this below to wait at the same time for
// all the tasks, so that if one of them fails, we can abort the whole pipeline. Otherwise, if we wait on
// Future instances, we can only wait on one of them, so that if any of the others fail, we have no easy way
// to detect this failure.
ExecutorCompletionService ecs = new ExecutorCompletionService(threadPool);
ScheduledExecutorService monitorThreadPool = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("monitor").setDaemon(true).build()
);
try {
int numberOfBuffers = 1 + transformThreads;
// The extra memory arena is to account for the memory taken by the SortKey entries that are part of the
// entry batches.
int memoryArenas = numberOfBuffers + 1;
int memoryForEntriesMB = workingMemoryMB / memoryArenas;
int maxNumberOfEntries = memoryForEntriesMB * 1024 * 4; // Assuming that 1KB is enough for 4 entries.
int maxNumberOfEntriesPerBuffer = maxNumberOfEntries / numberOfBuffers;
// A ByteBuffer can be at most Integer.MAX_VALUE bytes
int bufferSizeBytes = limitToIntegerRange((workingMemoryMB * FileUtils.ONE_MB) / memoryArenas);
if (bufferSizeBytes < MIN_ENTRY_BATCH_BUFFER_SIZE_MB * FileUtils.ONE_MB) {
throw new IllegalArgumentException("Entry batch buffer size too small: " + bufferSizeBytes +
" bytes. Must be at least " + MIN_ENTRY_BATCH_BUFFER_SIZE_MB + " MB. " +
"To increase the size of the buffers, either increase the size of the working memory region " +
"(system property" + OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB + ") or decrease the number of transform " +
"threads (" + OAK_INDEXER_PIPELINED_TRANSFORM_THREADS + ")");
}
LOG.info("Working memory: {} MB, number of buffers: {}, size of each buffer: {} MB, number of entries per buffer: {}",
workingMemoryMB, numberOfBuffers, bufferSizeBytes / FileUtils.ONE_MB, maxNumberOfEntriesPerBuffer);
// download -> transform thread.
ArrayBlockingQueue<BasicDBObject[]> mongoDocQueue = new ArrayBlockingQueue<>(mongoDocBlockQueueSize);
// transform <-> sort and save threads
// Queue with empty buffers, used by the transform task
ArrayBlockingQueue<NodeStateEntryBatch> emptyBatchesQueue = new ArrayBlockingQueue<>(numberOfBuffers);
// Queue with buffers filled by the transform task, used by the sort and save task. +1 for the SENTINEL
ArrayBlockingQueue<NodeStateEntryBatch> nonEmptyBatchesQueue = new ArrayBlockingQueue<>(numberOfBuffers + 1);
// Queue between sort-and-save thread and the merge-sorted-files thread
ArrayBlockingQueue<File> sortedFilesQueue = new ArrayBlockingQueue<>(64);
TransformStageStatistics transformStageStatistics = new TransformStageStatistics();
ScheduledFuture<?> monitorFuture = monitorThreadPool.scheduleWithFixedDelay(
new MonitorTask(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics),
10, 30, TimeUnit.SECONDS);
// Create empty buffers
for (int i = 0; i < numberOfBuffers; i++) {
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(bufferSizeBytes, maxNumberOfEntriesPerBuffer));
}
Stopwatch start = Stopwatch.createStarted();
MongoCollection<BasicDBObject> dbCollection = MongoDocumentStoreHelper.getDBCollection(docStore, Collection.NODES);
PipelinedMongoDownloadTask downloadTask = new PipelinedMongoDownloadTask(dbCollection, mongoBatchSize, mongoDocQueue);
ecs.submit(downloadTask);
File flatFileStore = null;
for (int i = 0; i < transformThreads; i++) {
NodeStateEntryWriter entryWriter = new NodeStateEntryWriter(blobStore);
PipelinedTransformTask transformTask = new PipelinedTransformTask(
docStore,
documentNodeStore,
Collection.NODES,
rootRevision,
pathPredicate,
entryWriter,
mongoDocQueue,
emptyBatchesQueue,
nonEmptyBatchesQueue,
transformStageStatistics
);
ecs.submit(transformTask);
}
PipelinedSortBatchTask sortTask = new PipelinedSortBatchTask(
storeDir, pathComparator, algorithm, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue
);
ecs.submit(sortTask);
PipelinedMergeSortTask mergeSortTask = new PipelinedMergeSortTask(storeDir, pathComparator, algorithm, sortedFilesQueue);
ecs.submit(mergeSortTask);
try {
LOG.info("Waiting for tasks to complete.");
int tasksFinished = 0;
int transformTasksFinished = 0;
while (tasksFinished < numberOfThreads) {
Future<?> completedTask = ecs.take();
try {
Object result = completedTask.get();
if (result instanceof PipelinedMongoDownloadTask.Result) {
PipelinedMongoDownloadTask.Result downloadResult = (PipelinedMongoDownloadTask.Result) result;
LOG.info("Download task finished. Documents downloaded: {}", downloadResult.getDocumentsDownloaded());
// Signal the end of documents to the transform threads.
for (int i = 0; i < transformThreads; i++) {
mongoDocQueue.put(SENTINEL_MONGO_DOCUMENT);
}
} else if (result instanceof PipelinedTransformTask.Result) {
PipelinedTransformTask.Result transformResult = (PipelinedTransformTask.Result) result;
transformTasksFinished++;
entryCount += transformResult.getEntryCount();
LOG.info("Transform thread {} finished. Entries processed: {}",
transformResult.getThreadId(), transformResult.getEntryCount());
if (transformTasksFinished == transformThreads) {
LOG.info("All transform tasks finished. Node states retrieved: {}", entryCount);
// No need to keep monitoring the queues, the download and transform threads are done.
monitorFuture.cancel(false);
// Terminate the sort thread.
nonEmptyBatchesQueue.put(SENTINEL_NSE_BUFFER);
}
} else if (result instanceof PipelinedSortBatchTask.Result) {
PipelinedSortBatchTask.Result sortTaskResult = (PipelinedSortBatchTask.Result) result;
LOG.info("Sort task finished. Entries processed: {}", sortTaskResult.getTotalEntries());
printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
sortedFilesQueue.put(SENTINEL_SORTED_FILES_QUEUE);
} else if (result instanceof PipelinedMergeSortTask.Result) {
PipelinedMergeSortTask.Result mergeSortedFilesTask = (PipelinedMergeSortTask.Result) result;
File ffs = mergeSortedFilesTask.getFlatFileStoreFile();
LOG.info("Sort task finished. FFS: {}, Size: {}", ffs, humanReadableByteCountBin(ffs.length()));
flatFileStore = mergeSortedFilesTask.getFlatFileStoreFile();
} else {
throw new RuntimeException("Unknown result type: " + result);
}
tasksFinished++;
} catch (ExecutionException ex) {
LOG.warn("Execution error dumping from MongoDB: " + ex + ". Shutting down all threads.");
threadPool.shutdownNow();
boolean terminated = threadPool.awaitTermination(5, TimeUnit.SECONDS);
if (!terminated) {
LOG.warn("Thread pool failed to terminate");
}
throw new RuntimeException(ex.getCause());
} catch (Throwable ex) {
LOG.warn("Error dumping from MongoDB: " + ex);
threadPool.shutdownNow();
boolean terminated = threadPool.awaitTermination(5, TimeUnit.SECONDS);
if (!terminated) {
LOG.warn("Thread pool failed to terminate");
}
throw new RuntimeException(ex);
}
}
LOG.info("Dumped {} nodestates in json format in {}", entryCount, start);
printStatistics(mongoDocQueue, emptyBatchesQueue, nonEmptyBatchesQueue, sortedFilesQueue, transformStageStatistics, true);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// No longer need to monitor the size of the queues,
monitorFuture.cancel(true);
}
return flatFileStore;
} finally {
threadPool.shutdown();
monitorThreadPool.shutdown();
}
}