in tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java [215:325]
private ParallelFileProcessingResult shutdown(ExecutorService ex, CompletionService<IFileProcessorFutureResult> completionService, TimeoutChecker timeoutChecker, State state) {
if (reporter != null) {
reporter.setIsShuttingDown(true);
}
int added = fileResourceCrawler.getAdded();
int considered = fileResourceCrawler.getConsidered();
//TODO: figure out safe way to shutdown resource crawler
//if it isn't. Does it need to add poison at this point?
//fileResourceCrawler.pleaseShutdown();
LOG.trace("about to shutdown");
//Step 1: prevent uncalled threads from being started
ex.shutdown();
//Step 2: ask consumers to shutdown politely.
//Under normal circumstances, they should all have completed by now.
for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
consumer.pleaseShutdown();
}
//The resourceCrawler should shutdown now. No need for poison.
fileResourceCrawler.shutDownNoPoison();
//if there are any active/asked-to-shutdown consumers, wait
//a bit for those parsers to finish.
//This can happen if the parent process dies
//of if the crawler stops early, or ...
politelyAwaitTermination(state.causeForTermination);
//Step 3: Gloves come off. We've tried to ask kindly before.
//Now it is time shut down. This will corrupt
//nio channels via thread interrupts! Hopefully, everything
//has shut down by now.
LOG.trace("About to shutdownNow()");
List<Runnable> neverCalled = ex.shutdownNow();
LOG.trace("TERMINATED {} : {} : {}", ex.isTerminated(), state.consumersRemoved, state.crawlersRemoved);
int end = state.numConsumers + state.numNonConsumers - state.removed - neverCalled.size();
for (int t = 0; t < end; t++) {
Future<IFileProcessorFutureResult> future = null;
try {
future = completionService.poll(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("thread interrupt while polling in final shutdown loop");
break;
}
LOG.trace("In while future==null loop in final shutdown loop");
if (future == null) {
break;
}
try {
IFileProcessorFutureResult result = future.get();
if (result instanceof FileConsumerFutureResult) {
FileConsumerFutureResult consumerResult = (FileConsumerFutureResult) result;
FileStarted fileStarted = consumerResult.getFileStarted();
LOG.trace("file started " + fileStarted);
if (fileStarted != null && fileStarted.getElapsedMillis() > timeoutThresholdMillis) {
LOG.warn("{} caused a file processor to hang or crash. You may need to remove " + "this file from your input set and rerun.", fileStarted.getResourceId());
}
} else if (result instanceof FileResourceCrawlerFutureResult) {
FileResourceCrawlerFutureResult crawlerResult = (FileResourceCrawlerFutureResult) result;
considered += crawlerResult.getConsidered();
added += crawlerResult.getAdded();
} //else ...we don't care about anything else stopping at this point
} catch (ExecutionException e) {
LOG.error("Execution exception trying to shutdown after shutdownNow", e);
} catch (InterruptedException e) {
LOG.error("Interrupted exception trying to shutdown after shutdownNow", e);
}
}
//do we need to restart?
String restartMsg = null;
if (state.causeForTermination == CAUSE_FOR_TERMINATION.PARENT_SHUTDOWN || state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
//do not restart!!!
} else if (state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION) {
restartMsg = "Uncaught consumer throwable";
} else if (state.causeForTermination == CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER) {
if (areResourcesPotentiallyRemaining()) {
restartMsg = "Consumer timed out with resources remaining";
}
} else if (state.causeForTermination == CAUSE_FOR_TERMINATION.BATCH_PROCESS_ALIVE_TOO_LONG) {
restartMsg = BATCH_CONSTANTS.BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME.toString();
} else if (state.causeForTermination == CAUSE_FOR_TERMINATION.CRAWLER_TIMED_OUT) {
restartMsg = "Crawler timed out.";
} else if (fileResourceCrawler.wasTimedOut()) {
restartMsg = "Crawler was timed out.";
} else if (fileResourceCrawler.isActive()) {
restartMsg = "Crawler is still active.";
} else if (!fileResourceCrawler.isQueueEmpty()) {
restartMsg = "Resources still exist for processing";
}
LOG.trace("restart msg: " + restartMsg);
int exitStatus = getExitStatus(state.causeForTermination, restartMsg);
//need to re-check, report, mark timed out consumers
timeoutChecker.checkForTimedOutConsumers();
for (FileStarted fs : timedOuts) {
LOG.warn("A parser was still working on >{}< for {} milliseconds after it started. " + "This exceeds the maxTimeoutMillis parameter", fs.getResourceId(),
fs.getElapsedMillis());
}
double elapsed = ((double) System.currentTimeMillis() - (double) state.start) / 1000.0;
int processed = 0;
int numExceptions = 0;
for (FileResourceConsumer c : consumersManager.getConsumers()) {
processed += c.getNumResourcesConsumed();
numExceptions += c.getNumHandledExceptions();
}
LOG.trace("returning " + state.causeForTermination);
return new ParallelFileProcessingResult(considered, added, processed, numExceptions, elapsed, exitStatus, state.causeForTermination.toString());
}