public ConsumersManager build()

in tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java [60:179]


    public ConsumersManager build(Node node, Map<String, String> runtimeAttributes, ArrayBlockingQueue<FileResource> queue) {

        //figure out if we're building a recursiveParserWrapper
        boolean recursiveParserWrapper = false;
        String recursiveParserWrapperString = runtimeAttributes.get("recursiveParserWrapper");
        if (recursiveParserWrapperString != null) {
            recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperString, recursiveParserWrapper);
        } else {
            Node recursiveParserWrapperNode = node
                    .getAttributes()
                    .getNamedItem("recursiveParserWrapper");
            if (recursiveParserWrapperNode != null) {
                recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperNode.getNodeValue(), recursiveParserWrapper);
            }
        }

        boolean streamOut = false;
        String streamOutString = runtimeAttributes.get("streamOut");
        if (streamOutString != null) {
            streamOut = PropsUtil.getBoolean(streamOutString, streamOut);
        } else {
            Node streamOutNode = node
                    .getAttributes()
                    .getNamedItem("streamout");
            if (streamOutNode != null) {
                streamOut = PropsUtil.getBoolean(streamOutNode.getNodeValue(), streamOut);
            }
        }

        //how long to let the consumersManager run on init() and shutdown()
        Long consumersManagerMaxMillis = null;
        String consumersManagerMaxMillisString = runtimeAttributes.get("consumersManagerMaxMillis");
        if (consumersManagerMaxMillisString != null) {
            consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisString, null);
        } else {
            Node consumersManagerMaxMillisNode = node
                    .getAttributes()
                    .getNamedItem("consumersManagerMaxMillis");
            if (consumersManagerMaxMillis == null && consumersManagerMaxMillisNode != null) {
                consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisNode.getNodeValue(), null);
            }
        }

        TikaConfig config = null;
        String tikaConfigPath = runtimeAttributes.get("c");

        if (tikaConfigPath == null) {
            Node tikaConfigNode = node
                    .getAttributes()
                    .getNamedItem("tikaConfig");
            if (tikaConfigNode != null) {
                tikaConfigPath = PropsUtil.getString(tikaConfigNode.getNodeValue(), null);
            }
        }
        if (tikaConfigPath != null) {
            try (InputStream is = Files.newInputStream(Paths.get(tikaConfigPath))) {
                config = new TikaConfig(is);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            config = TikaConfig.getDefaultConfig();
        }

        List<FileResourceConsumer> consumers = new LinkedList<>();
        int numConsumers = BatchProcessBuilder.getNumConsumers(runtimeAttributes);

        NodeList nodeList = node.getChildNodes();
        Node contentHandlerFactoryNode = null;
        Node parserFactoryNode = null;
        Node outputStreamFactoryNode = null;

        for (int i = 0; i < nodeList.getLength(); i++) {
            Node child = nodeList.item(i);
            String cn = child.getNodeName();
            switch (cn) {
                case "parser":
                    parserFactoryNode = child;
                    break;
                case "contenthandler":
                    contentHandlerFactoryNode = child;
                    break;
                case "outputstream":
                    outputStreamFactoryNode = child;
                    break;
            }
        }

        if (contentHandlerFactoryNode == null || parserFactoryNode == null || outputStreamFactoryNode == null) {
            throw new RuntimeException("You must specify a ContentHandlerFactory, " + "a ParserFactory and an OutputStreamFactory");
        }
        ContentHandlerFactory contentHandlerFactory = getContentHandlerFactory(contentHandlerFactoryNode, runtimeAttributes);
        ParserFactory parserFactory = getParserFactory(parserFactoryNode, runtimeAttributes);
        OutputStreamFactory outputStreamFactory = getOutputStreamFactory(outputStreamFactoryNode, runtimeAttributes, contentHandlerFactory, recursiveParserWrapper);
        Parser parser = parserFactory.getParser(config);
        if (recursiveParserWrapper) {
            MetadataFilter metadataFilter = config.getMetadataFilter();
            parser = new RecursiveParserWrapper(parser);

            for (int i = 0; i < numConsumers; i++) {
                FileResourceConsumer c = null;
                if (streamOut) {
                    c = new StreamOutRPWFSConsumer(queue, parser, contentHandlerFactory, outputStreamFactory, metadataFilter);
                } else {
                    c = new RecursiveParserWrapperFSConsumer(queue, parser, contentHandlerFactory, outputStreamFactory, metadataFilter);
                }
                consumers.add(c);
            }
        } else {
            for (int i = 0; i < numConsumers; i++) {
                FileResourceConsumer c = new BasicTikaFSConsumer(queue, parser, contentHandlerFactory, outputStreamFactory);
                consumers.add(c);
            }
        }
        ConsumersManager manager = new FSConsumersManager(consumers);
        if (consumersManagerMaxMillis != null) {
            manager.setConsumersManagerMaxMillis(consumersManagerMaxMillis);
        }
        return manager;
    }