in connectors/tikaservice-rmeta/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/rmeta/TikaExtractor.java [643:1072]
public int addOrReplaceDocumentWithException(final String documentURI, final VersionContext pipelineDescription, final RepositoryDocument document, final String authorityNameString,
final IOutputAddActivity activities) throws ManifoldCFException, ServiceInterruption, IOException {
// First, make sure downstream pipeline will now accept
// text/plain;charset=utf-8
if (!activities.checkMimeTypeIndexable("text/plain;charset=utf-8")) {
activities.noDocument();
activities.recordActivity(null, ACTIVITY_EXTRACT, null, documentURI, activities.EXCLUDED_MIMETYPE, "Downstream pipeline rejected mime type 'text/plain;charset=utf-8'");
return DOCUMENTSTATUS_REJECTED;
}
final SpecPacker sp = new SpecPacker(pipelineDescription.getSpecification());
getSession();
// Tika server variables
CloseableHttpResponse response = null;
// Tika's API reads from an input stream and writes to an output Writer.
// Since a RepositoryDocument includes readers and inputstreams exclusively,
// AND all downstream
// processing needs to occur in a ManifoldCF thread, we have some
// constraints on the architecture we need to get this done:
// (1) The principle worker thread must call the downstream pipeline send()
// method.
// (2) The callee of the send() method must call a reader in the Repository
// Document.
// (3) The Reader, if its databuffer is empty, must pull more data from the
// original input stream and hand it to Tika, which populates the Reader's
// databuffer.
// So all this can be done in one thread, with some work, and the creation
// of a special InputStream or Reader implementation. Where it fails,
// though, is the
// requirement that tika-extracted metadata be included in the
// RepositoryDocument right from the beginning. Effectively this means that
// the entire document
// must be parsed before it is handed downstream -- so basically a temporary
// file (or in-memory buffer if small enough) must be created.
// Instead of the elegant flow above, we have the following:
// (1) Create a temporary file (or in-memory buffer if file is small enough)
// (2) Run Tika to completion, streaming content output to temporary file
// (3) Modify RepositoryDocument to read from temporary file, and include
// Tika-extracted metadata
// (4) Call downstream document processing
// Prepare the destination storage
DestinationStorage ds;
if (document.getBinaryLength() <= inMemoryMaximumFile) {
ds = new MemoryDestinationStorage((int) document.getBinaryLength());
} else {
ds = new FileDestinationStorage();
}
try {
final Map<String, List<String>> metadata = new HashMap<>();
final List<String> embeddedResourcesNames = new ArrayList<>();
if (document.getFileName() != null) {
metadata.put(TikaMetadataKeys.RESOURCE_NAME_KEY, new ArrayList<>());
metadata.put("stream_name", new ArrayList<>());
metadata.get(TikaMetadataKeys.RESOURCE_NAME_KEY).add(document.getFileName());
metadata.get("stream_name").add(document.getFileName());
}
metadata.put("stream_size", new ArrayList<>());
metadata.get("stream_size").add(String.valueOf(document.getBinaryLength()));
// We only log the extraction
final long startTime = System.currentTimeMillis();
String resultCode = "OK";
String description = "";
Long length = 0L;
boolean truncated = false;
boolean resources_limit = false;
int tikaServerResultCode = 0;
try {
try {
// Process the document only if it is not an archive or if the extract archives
// option is set to true
if (!isArchive(document.getFileName(), document.getMimeType()) || isArchive(document.getFileName(), document.getMimeType()) && sp.extractArchives) {
// Send document to the Tika Server
final HttpPut httpPut = new HttpPut(rmetaURI);
if (sp.writeLimit != -1) {
httpPut.addHeader("writeLimit", String.valueOf(sp.writeLimit));
}
if (sp.maxEmbeddedResources != -1) {
httpPut.addHeader("maxEmbeddedResources", String.valueOf(sp.maxEmbeddedResources));
}
final HttpEntity entity = new InputStreamEntity(document.getBinaryStream());
httpPut.setEntity(entity);
try {
response = this.httpClient.execute(tikaHost, httpPut);
} catch (final SocketTimeoutException e) { // The document is probably too big ! So we don't retry it
resultCode = "TIKASERVERRESPONSETIMEOUT";
description = "Socket timeout while processing document " + documentURI + " : " + e.getMessage();
tikaServerResultCode = handleTikaServerError(description);
} catch (final SocketException e) {
// If the exception occurred after the connection, this probably means that the
// tika server is not
// down ! so retry {retryNumber} times without aborting the job in case of
// failure
if (!(e instanceof ConnectException) && !(e instanceof BindException) && !(e instanceof NoRouteToHostException) && !(e instanceof PortUnreachableException)) {
resultCode = "TIKASERVERSOCKETEXCEPTION";
description = "Socket exception while processing document " + documentURI + " : " + e.getMessage();
tikaServerResultCode = handleTikaServerError(description);
retryWithoutAbort(e);
} else { // The tika server seams to be down : retry {retryNumber} times and abort the
// job if it fails on
// each retry
resultCode = "TIKASERVEREXCEPTION";
description = "Tika seemed to be down when requested to process document " + documentURI + " : " + e.getMessage();
tikaServerResultCode = handleTikaServerError(description);
triggerServiceInterruption(documentURI, e);
}
} catch (final NoHttpResponseException e) {
// Tika probably does not manage to process document in time (task timeout)
resultCode = "TIKASERVERNORESPONSEEXCEPTION";
description = "Tika does not manage to treat " + documentURI + " (potential task timeout): " + e.getMessage();
tikaServerResultCode = handleTikaServerError(description);
} catch (final IOException e) { // Unknown problem with the Tika Server. Retry {retryNumber} times and abort
// the job if it fails on
// each retry
resultCode = "TIKASERVEREXCEPTION";
description = "Unknown Tika problem when processing document " + documentURI + " : " + e.getMessage();
tikaServerResultCode = handleTikaServerError(description);
triggerServiceInterruption(documentURI, e);
}
if (response != null) {
final int responseCode = response.getStatusLine().getStatusCode();
if (responseCode == 200 || responseCode == 204) {
try (final OutputStream os = ds.getOutputStream(); Writer w = new OutputStreamWriter(os, StandardCharsets.UTF_8.name()); InputStream is = response.getEntity().getContent();) {
final JsonFactory jfactory = new JsonFactory();
final JsonParser jParser = jfactory.createParser(is);
JsonToken token = null;
// Go to beginning of metadata
boolean inMetadata = false;
while (!inMetadata && (token = jParser.nextToken()) != null) {
if (token == JsonToken.START_OBJECT) {
inMetadata = true;
}
}
int totalMetadataLength = 0;
boolean maxMetadataReached = false;
boolean metadataSkipped = false;
boolean metadataTruncated = false;
if (token != null) {
while ((token = jParser.nextToken()) != null && token != JsonToken.END_OBJECT) {
final int fieldNameLength = jParser.getTextLength();
if (fieldNameLength <= maxMetadataNameLength) {
final String fieldName = jParser.getCurrentName();
if (fieldName != null) {
if (fieldName.startsWith("X-Parsed-By")) {
skipMetadata(jParser);
} else if (fieldName.contentEquals("X-TIKA:content")) {
// Consume content
jParser.nextToken();
length += jParser.getText(w);
} else if (!fieldName.startsWith("X-TIKA")) {
token = jParser.nextToken();
if (!metadata.containsKey(fieldName)) {
totalMetadataLength += fieldName.length();
metadata.put(fieldName, new ArrayList<>());
}
if (token == JsonToken.START_ARRAY) {
while (jParser.nextToken() != JsonToken.END_ARRAY) {
if (jParser.getTextLength() <= sp.maxMetadataValueLength) {
final int totalMetadataLengthPreview = totalMetadataLength + jParser.getTextLength();
if (totalMetadataLengthPreview <= sp.totalMetadataLimit) {
metadata.get(fieldName).add(jParser.getText());
totalMetadataLength = totalMetadataLengthPreview;
} else {
maxMetadataReached = true;
}
} else {
metadataSkipped = true;
if (Logging.ingest.isDebugEnabled()) {
Logging.ingest
.debug("Skip value of metadata " + fieldName + " of document " + documentURI + " because it exceeds the max value limit of " + sp.maxMetadataValueLength);
}
}
}
} else {
if (jParser.getTextLength() <= sp.maxMetadataValueLength) {
final int totalMetadataLengthPreview = totalMetadataLength + jParser.getTextLength();
if (totalMetadataLengthPreview <= sp.totalMetadataLimit) {
metadata.get(fieldName).add(jParser.getText());
} else {
maxMetadataReached = true;
}
} else {
metadataSkipped = true;
if (Logging.ingest.isDebugEnabled()) {
Logging.ingest
.debug("Skip value of metadata " + fieldName + " of document " + documentURI + " because it exceeds the max value limit of " + sp.maxMetadataValueLength);
}
}
}
// Remove metadata if no data has been gathered
if (metadata.get(fieldName).isEmpty()) {
totalMetadataLength -= fieldName.length();
metadata.remove(fieldName);
}
} else if (fieldName.startsWith("X-TIKA:EXCEPTION:")) { // deal with Tika exceptions
boolean unknownException = false;
if (fieldName.contentEquals("X-TIKA:EXCEPTION:write_limit_reached")) {
resultCode = "TRUNCATEDOK";
truncated = true;
} else if (fieldName.contentEquals("X-TIKA:EXCEPTION:embedded_resource_limit_reached")) {
resources_limit = true;
} else if (!fieldName.contentEquals("X-TIKA:EXCEPTION:warn")) { // If the exception is other than a warning message
unknownException = true;
resultCode = "TIKAEXCEPTION";
description += getTikaExceptionDesc(jParser) + System.lineSeparator();
}
if (!unknownException) {
skipMetadata(jParser);
}
} else if (fieldName.startsWith("X-TIKA:WARN:truncated_metadata")) {
metadataTruncated = true;
skipMetadata(jParser);
} else {
skipMetadata(jParser);
}
}
} else {
metadataSkipped = true;
if (Logging.ingest.isDebugEnabled()) {
Logging.ingest.debug("Skip a metadata of document " + documentURI + " because its name exceeds the max allowed length of " + maxMetadataNameLength);
}
skipMetadata(jParser);
}
}
// If token not null then there are embedded resources, process them if the extractArchives option is enabled
if (token != null && token == JsonToken.END_OBJECT && sp.extractArchives) {
// For embedded resource we only gather resourceNames and resources content, skip the rest
while ((token = jParser.nextToken()) != null) {
final String fieldName = jParser.getCurrentName();
if (fieldName != null && fieldName.contentEquals("resourceName")) {
token = jParser.nextToken();
if (jParser.getTextLength() <= sp.maxMetadataValueLength) {
embeddedResourcesNames.add(jParser.getText());
} else {
metadataSkipped = true;
}
} else if (fieldName != null && fieldName.contentEquals("X-TIKA:content")) {
// Add embedded resource content to main document content
jParser.nextToken();
length += jParser.getText(w);
}
}
}
jParser.close();
}
// If the are embedded resources, add their names, if possible, to the metadata
for (final String embeddedResourceName : embeddedResourcesNames) {
final int resourceNameBytesLength = embeddedResourceName.getBytes(StandardCharsets.UTF_8).length;
final int totalMetadataLengthPreview = totalMetadataLength + resourceNameBytesLength;
if (totalMetadataLengthPreview <= sp.totalMetadataLimit) {
if (!metadata.containsKey("embeddedResourcesNames")) {
totalMetadataLength += "embeddedResourcesNames".getBytes(StandardCharsets.UTF_8).length;
metadata.put("embeddedResourcesNames", new ArrayList<>());
}
metadata.get("embeddedResourcesNames").add(embeddedResourceName);
totalMetadataLength += resourceNameBytesLength;
} else {
maxMetadataReached = true;
}
}
if (maxMetadataReached) {
description += "Some metadata have been skipped because the total metadata limit of " + sp.totalMetadataLimit + " has been reached" + System.lineSeparator();
} else if (metadataSkipped) {
description += "Some metadata have been skipped because their names or values exceeded the limits" + System.lineSeparator();
}
if (metadataTruncated) {
description += "Some metadata have been truncated by Tika because they exceeded the limits specified in the Tika conf" + System.lineSeparator();
}
}
} else if (responseCode == 503) {
// Service interruption; Tika trying to come up.
// Retry unlimited times, retryInterval ms between retries
resultCode = "TIKASERVERUNAVAILABLE";
description = "Tika Server was unavailable: 503 " + response.getStatusLine().getReasonPhrase();
tikaServerResultCode = handleTikaServerError(description);
Logging.ingest.warn("Tika Server unavailable, retrying...");
final long currentTime = System.currentTimeMillis();
throw new ServiceInterruption("Tika Server unavailable, retrying...", new Exception(description), currentTime + retryInterval, -1L, -1, false);
} else {
if (responseCode == 500) {
resultCode = "TIKASERVERERROR";
description = "Tika Server failed to parse document with the following error: " + response.getStatusLine().getReasonPhrase();
tikaServerResultCode = handleTikaServerError(description);
} else {
resultCode = "TIKASERVERREJECTS";
description = "Tika Server rejected document " + documentURI + " with the following reason: " + response.getStatusLine().getReasonPhrase();
tikaServerResultCode = handleTikaServerRejects(description);
}
}
}
} else {
resultCode = "EXCLUDED";
description = "Detected as an archive file and the extract archives option is set to false";
}
} catch (final IOException e) {
resultCode = "TIKASERVERRESPONSEISSUE";
if (e.getMessage() != null) {
description = e.getMessage();
}
tikaServerResultCode = handleTikaServerException(e);
} finally {
if (response != null) {
response.close();
}
}
if (!activities.checkLengthIndexable(ds.getBinaryLength())) {
activities.noDocument();
resultCode = activities.EXCLUDED_LENGTH;
description = "Downstream pipeline rejected document with length " + ds.getBinaryLength();
return DOCUMENTSTATUS_REJECTED;
}
} finally {
// Before injecting activity record, clean the description as it can contains non ascii chars that can cause errors during SQL insertion
description = description.replaceAll("[^\\x20-\\x7e]", "");
// Log the extraction processing
activities.recordActivity(startTime, ACTIVITY_EXTRACT, length, documentURI, resultCode, description);
}
// Parsing complete!
// Create a copy of Repository Document
final RepositoryDocument docCopy = document.duplicate();
// Open new input stream
final InputStream is = ds.getInputStream();
// Get new stream length
final long newBinaryLength = ds.getBinaryLength();
try {
docCopy.setBinary(is, newBinaryLength);
// Set up all metadata from Tika. We may want to run this through a
// mapper eventually...
for (String mName : metadata.keySet()) {
String[] values = metadata.get(mName).toArray(new String[0]);
// Only keep metadata if its name does not exceed 8k chars to avoid HTTP header error
if (mName.length() < maxMetadataNameLength) {
if (sp.lowerNames()) {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < mName.length(); i++) {
char ch = mName.charAt(i);
if (!Character.isLetterOrDigit(ch)) {
ch = '_';
} else {
ch = Character.toLowerCase(ch);
}
sb.append(ch);
}
mName = sb.toString();
}
final String target = sp.getMapping(mName);
if (target != null) {
if (docCopy.getField(target) != null) {
final String[] persistentValues = docCopy.getFieldAsStrings(target);
values = ArrayUtils.addAll(persistentValues, values);
}
docCopy.addField(target, values);
} else {
if (sp.keepAllMetadata()) {
if (docCopy.getField(mName) != null) {
final String[] persistentValues = docCopy.getFieldAsStrings(mName);
values = ArrayUtils.addAll(persistentValues, values);
}
docCopy.addField(mName, values);
}
}
}
}
if (truncated) {
removeField(docCopy, "truncated");
docCopy.addField("truncated", "true");
} else {
removeField(docCopy, "truncated");
docCopy.addField("truncated", "false");
}
if (resources_limit) {
removeField(docCopy, "resources_limit");
docCopy.addField("resources_limit", "true");
} else {
removeField(docCopy, "resources_limit");
docCopy.addField("resources_limit", "false");
}
// Send new document downstream
final int sendDocumentResultCode = activities.sendDocument(documentURI, docCopy);
if (sendDocumentResultCode == 0) {
return tikaServerResultCode;
} else {
return sendDocumentResultCode;
}
} finally {
// This is really important to close the input stream in a finally statement as it will wait that the input stream is fully read (or closed) by down pipeline
is.close();
}
} finally {
if (ds != null) {
ds.close();
}
}
}