in indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java [174:290]
private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context) throws IOException {
LOGGER.debug("Reading MT index...");
Instant start = Instant.now();
long timestamp = readHeader();
int n = 0;
final Document theEnd = new Document();
Set<String> rootGroups = ConcurrentHashMap.newKeySet();
Set<String> allGroups = ConcurrentHashMap.newKeySet();
ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);
ExecutorService executorService = Executors.newFixedThreadPool(threads);
List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
List<FSDirectory> siloDirectories = new ArrayList<>(threads);
List<IndexWriter> siloWriters = new ArrayList<>(threads);
AtomicBoolean stopEarly = new AtomicBoolean(false);
LOGGER.debug("Creating {} silo writer threads...", threads);
for (int i = 0; i < threads; i++) {
final int silo = i;
FSDirectory siloDirectory = tempDirectory("silo" + i);
siloDirectories.add(siloDirectory);
siloWriters.add(tempWriter(siloDirectory));
executorService.execute(() -> {
LOGGER.debug("Starting thread {}", Thread.currentThread().getName());
try {
while (true) {
try {
Document doc = queue.take();
if (doc == theEnd) {
break;
}
addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
} catch (Throwable e) {
errors.add(e);
if (stopEarly.compareAndSet(false, true)) {
queue.clear(); // unblock producer
executorService.shutdownNow(); // unblock consumers
}
break;
}
}
} finally {
LOGGER.debug("Done thread {}", Thread.currentThread().getName());
}
});
}
LOGGER.debug("Loading up documents into silos");
try {
Document doc;
while (!stopEarly.get() && (doc = readDocument()) != null) {
queue.put(doc);
n++;
}
LOGGER.debug("Signalling END");
for (int i = 0; i < threads; i++) {
queue.put(theEnd);
}
LOGGER.debug("Shutting down threads");
executorService.shutdown();
executorService.awaitTermination(5L, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new IOException("Interrupted", e);
}
if (!errors.isEmpty()) {
if (errors.stream().allMatch(ex -> ex instanceof IOException || ex instanceof InterruptedException)) {
IOException exception = new IOException("Error during load of index");
errors.forEach(exception::addSuppressed);
throw exception;
} else {
RuntimeException exception = new RuntimeException("Error during load of index");
errors.forEach(exception::addSuppressed);
throw exception;
}
}
LOGGER.debug("Silos loaded...");
Date date = null;
if (timestamp != -1) {
date = new Date(timestamp);
IndexUtils.updateTimestamp(w.getDirectory(), date);
}
LOGGER.debug("Closing silo writers...");
for (IndexWriter siloWriter : siloWriters) {
siloWriter.commit();
siloWriter.close();
}
LOGGER.debug("Merging silo directories...");
w.addIndexes(siloDirectories.toArray(new Directory[0]));
LOGGER.debug("Cleanup of silo directories...");
for (FSDirectory siloDirectory : siloDirectories) {
File dir = siloDirectory.getDirectory().toFile();
siloDirectory.close();
IndexUtils.delete(dir);
}
LOGGER.debug("Finalizing...");
w.commit();
IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount(n);
result.setTimestamp(date);
result.setRootGroups(rootGroups);
result.setAllGroups(allGroups);
LOGGER.debug(
"Reading MT index done in {} sec",
Duration.between(start, Instant.now()).getSeconds());
return result;
}