private ParallelFileProcessingResult shutdown()

in tika-batch/src/main/java/org/apache/tika/batch/BatchProcess.java [215:325]


    private ParallelFileProcessingResult shutdown(ExecutorService ex, CompletionService<IFileProcessorFutureResult> completionService, TimeoutChecker timeoutChecker, State state) {

        if (reporter != null) {
            reporter.setIsShuttingDown(true);
        }
        int added = fileResourceCrawler.getAdded();
        int considered = fileResourceCrawler.getConsidered();

        //TODO: figure out safe way to shutdown resource crawler
        //if it isn't.  Does it need to add poison at this point?
        //fileResourceCrawler.pleaseShutdown();
        LOG.trace("about to shutdown");
        //Step 1: prevent uncalled threads from being started
        ex.shutdown();

        //Step 2: ask consumers to shutdown politely.
        //Under normal circumstances, they should all have completed by now.
        for (FileResourceConsumer consumer : consumersManager.getConsumers()) {
            consumer.pleaseShutdown();
        }
        //The resourceCrawler should shutdown now.  No need for poison.
        fileResourceCrawler.shutDownNoPoison();
        //if there are any active/asked-to-shutdown consumers, wait
        //a bit for those parsers to finish.
        //This can happen if the parent process dies
        //of if the crawler stops early, or ...
        politelyAwaitTermination(state.causeForTermination);

        //Step 3: Gloves come off.  We've tried to ask kindly before.
        //Now it is time shut down. This will corrupt
        //nio channels via thread interrupts!  Hopefully, everything
        //has shut down by now.
        LOG.trace("About to shutdownNow()");
        List<Runnable> neverCalled = ex.shutdownNow();
        LOG.trace("TERMINATED {} : {} : {}", ex.isTerminated(), state.consumersRemoved, state.crawlersRemoved);

        int end = state.numConsumers + state.numNonConsumers - state.removed - neverCalled.size();

        for (int t = 0; t < end; t++) {
            Future<IFileProcessorFutureResult> future = null;
            try {
                future = completionService.poll(10, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOG.warn("thread interrupt while polling in final shutdown loop");
                break;
            }
            LOG.trace("In while future==null loop in final shutdown loop");
            if (future == null) {
                break;
            }
            try {
                IFileProcessorFutureResult result = future.get();
                if (result instanceof FileConsumerFutureResult) {
                    FileConsumerFutureResult consumerResult = (FileConsumerFutureResult) result;
                    FileStarted fileStarted = consumerResult.getFileStarted();
                    LOG.trace("file started " + fileStarted);
                    if (fileStarted != null && fileStarted.getElapsedMillis() > timeoutThresholdMillis) {
                        LOG.warn("{} caused a file processor to hang or crash. You may need to remove " + "this file from your input set and rerun.", fileStarted.getResourceId());
                    }
                } else if (result instanceof FileResourceCrawlerFutureResult) {
                    FileResourceCrawlerFutureResult crawlerResult = (FileResourceCrawlerFutureResult) result;
                    considered += crawlerResult.getConsidered();
                    added += crawlerResult.getAdded();
                } //else ...we don't care about anything else stopping at this point
            } catch (ExecutionException e) {
                LOG.error("Execution exception trying to shutdown after shutdownNow", e);
            } catch (InterruptedException e) {
                LOG.error("Interrupted exception trying to shutdown after shutdownNow", e);
            }
        }
        //do we need to restart?
        String restartMsg = null;
        if (state.causeForTermination == CAUSE_FOR_TERMINATION.PARENT_SHUTDOWN || state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION_NO_RESTART) {
            //do not restart!!!
        } else if (state.causeForTermination == CAUSE_FOR_TERMINATION.MAIN_LOOP_EXCEPTION) {
            restartMsg = "Uncaught consumer throwable";
        } else if (state.causeForTermination == CAUSE_FOR_TERMINATION.TIMED_OUT_CONSUMER) {
            if (areResourcesPotentiallyRemaining()) {
                restartMsg = "Consumer timed out with resources remaining";
            }
        } else if (state.causeForTermination == CAUSE_FOR_TERMINATION.BATCH_PROCESS_ALIVE_TOO_LONG) {
            restartMsg = BATCH_CONSTANTS.BATCH_PROCESS_EXCEEDED_MAX_ALIVE_TIME.toString();
        } else if (state.causeForTermination == CAUSE_FOR_TERMINATION.CRAWLER_TIMED_OUT) {
            restartMsg = "Crawler timed out.";
        } else if (fileResourceCrawler.wasTimedOut()) {
            restartMsg = "Crawler was timed out.";
        } else if (fileResourceCrawler.isActive()) {
            restartMsg = "Crawler is still active.";
        } else if (!fileResourceCrawler.isQueueEmpty()) {
            restartMsg = "Resources still exist for processing";
        }
        LOG.trace("restart msg: " + restartMsg);
        int exitStatus = getExitStatus(state.causeForTermination, restartMsg);

        //need to re-check, report, mark timed out consumers
        timeoutChecker.checkForTimedOutConsumers();

        for (FileStarted fs : timedOuts) {
            LOG.warn("A parser was still working on >{}< for {} milliseconds after it started. " + "This exceeds the maxTimeoutMillis parameter", fs.getResourceId(),
                    fs.getElapsedMillis());
        }
        double elapsed = ((double) System.currentTimeMillis() - (double) state.start) / 1000.0;
        int processed = 0;
        int numExceptions = 0;
        for (FileResourceConsumer c : consumersManager.getConsumers()) {
            processed += c.getNumResourcesConsumed();
            numExceptions += c.getNumHandledExceptions();
        }
        LOG.trace("returning " + state.causeForTermination);
        return new ParallelFileProcessingResult(considered, added, processed, numExceptions, elapsed, exitStatus, state.causeForTermination.toString());
    }