in tika-batch/src/main/java/org/apache/tika/batch/fs/builders/BasicTikaFSConsumersBuilder.java [60:179]
public ConsumersManager build(Node node, Map<String, String> runtimeAttributes, ArrayBlockingQueue<FileResource> queue) {
//figure out if we're building a recursiveParserWrapper
boolean recursiveParserWrapper = false;
String recursiveParserWrapperString = runtimeAttributes.get("recursiveParserWrapper");
if (recursiveParserWrapperString != null) {
recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperString, recursiveParserWrapper);
} else {
Node recursiveParserWrapperNode = node
.getAttributes()
.getNamedItem("recursiveParserWrapper");
if (recursiveParserWrapperNode != null) {
recursiveParserWrapper = PropsUtil.getBoolean(recursiveParserWrapperNode.getNodeValue(), recursiveParserWrapper);
}
}
boolean streamOut = false;
String streamOutString = runtimeAttributes.get("streamOut");
if (streamOutString != null) {
streamOut = PropsUtil.getBoolean(streamOutString, streamOut);
} else {
Node streamOutNode = node
.getAttributes()
.getNamedItem("streamout");
if (streamOutNode != null) {
streamOut = PropsUtil.getBoolean(streamOutNode.getNodeValue(), streamOut);
}
}
//how long to let the consumersManager run on init() and shutdown()
Long consumersManagerMaxMillis = null;
String consumersManagerMaxMillisString = runtimeAttributes.get("consumersManagerMaxMillis");
if (consumersManagerMaxMillisString != null) {
consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisString, null);
} else {
Node consumersManagerMaxMillisNode = node
.getAttributes()
.getNamedItem("consumersManagerMaxMillis");
if (consumersManagerMaxMillis == null && consumersManagerMaxMillisNode != null) {
consumersManagerMaxMillis = PropsUtil.getLong(consumersManagerMaxMillisNode.getNodeValue(), null);
}
}
TikaConfig config = null;
String tikaConfigPath = runtimeAttributes.get("c");
if (tikaConfigPath == null) {
Node tikaConfigNode = node
.getAttributes()
.getNamedItem("tikaConfig");
if (tikaConfigNode != null) {
tikaConfigPath = PropsUtil.getString(tikaConfigNode.getNodeValue(), null);
}
}
if (tikaConfigPath != null) {
try (InputStream is = Files.newInputStream(Paths.get(tikaConfigPath))) {
config = new TikaConfig(is);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
config = TikaConfig.getDefaultConfig();
}
List<FileResourceConsumer> consumers = new LinkedList<>();
int numConsumers = BatchProcessBuilder.getNumConsumers(runtimeAttributes);
NodeList nodeList = node.getChildNodes();
Node contentHandlerFactoryNode = null;
Node parserFactoryNode = null;
Node outputStreamFactoryNode = null;
for (int i = 0; i < nodeList.getLength(); i++) {
Node child = nodeList.item(i);
String cn = child.getNodeName();
switch (cn) {
case "parser":
parserFactoryNode = child;
break;
case "contenthandler":
contentHandlerFactoryNode = child;
break;
case "outputstream":
outputStreamFactoryNode = child;
break;
}
}
if (contentHandlerFactoryNode == null || parserFactoryNode == null || outputStreamFactoryNode == null) {
throw new RuntimeException("You must specify a ContentHandlerFactory, " + "a ParserFactory and an OutputStreamFactory");
}
ContentHandlerFactory contentHandlerFactory = getContentHandlerFactory(contentHandlerFactoryNode, runtimeAttributes);
ParserFactory parserFactory = getParserFactory(parserFactoryNode, runtimeAttributes);
OutputStreamFactory outputStreamFactory = getOutputStreamFactory(outputStreamFactoryNode, runtimeAttributes, contentHandlerFactory, recursiveParserWrapper);
Parser parser = parserFactory.getParser(config);
if (recursiveParserWrapper) {
MetadataFilter metadataFilter = config.getMetadataFilter();
parser = new RecursiveParserWrapper(parser);
for (int i = 0; i < numConsumers; i++) {
FileResourceConsumer c = null;
if (streamOut) {
c = new StreamOutRPWFSConsumer(queue, parser, contentHandlerFactory, outputStreamFactory, metadataFilter);
} else {
c = new RecursiveParserWrapperFSConsumer(queue, parser, contentHandlerFactory, outputStreamFactory, metadataFilter);
}
consumers.add(c);
}
} else {
for (int i = 0; i < numConsumers; i++) {
FileResourceConsumer c = new BasicTikaFSConsumer(queue, parser, contentHandlerFactory, outputStreamFactory);
consumers.add(c);
}
}
ConsumersManager manager = new FSConsumersManager(consumers);
if (consumersManagerMaxMillis != null) {
manager.setConsumersManagerMaxMillis(consumersManagerMaxMillis);
}
return manager;
}