public int addOrReplaceDocumentWithException()

in connectors/tikaservice-rmeta/connector/src/main/java/org/apache/manifoldcf/agents/transformation/tikaservice/rmeta/TikaExtractor.java [643:1072]


  public int addOrReplaceDocumentWithException(final String documentURI, final VersionContext pipelineDescription, final RepositoryDocument document, final String authorityNameString,
      final IOutputAddActivity activities) throws ManifoldCFException, ServiceInterruption, IOException {
    // First, make sure downstream pipeline will now accept
    // text/plain;charset=utf-8
    if (!activities.checkMimeTypeIndexable("text/plain;charset=utf-8")) {
      activities.noDocument();
      activities.recordActivity(null, ACTIVITY_EXTRACT, null, documentURI, activities.EXCLUDED_MIMETYPE, "Downstream pipeline rejected mime type 'text/plain;charset=utf-8'");
      return DOCUMENTSTATUS_REJECTED;
    }

    final SpecPacker sp = new SpecPacker(pipelineDescription.getSpecification());

    getSession();

    // Tika server variables
    CloseableHttpResponse response = null;

    // Tika's API reads from an input stream and writes to an output Writer.
    // Since a RepositoryDocument includes readers and inputstreams exclusively,
    // AND all downstream
    // processing needs to occur in a ManifoldCF thread, we have some
    // constraints on the architecture we need to get this done:
    // (1) The principle worker thread must call the downstream pipeline send()
    // method.
    // (2) The callee of the send() method must call a reader in the Repository
    // Document.
    // (3) The Reader, if its databuffer is empty, must pull more data from the
    // original input stream and hand it to Tika, which populates the Reader's
    // databuffer.
    // So all this can be done in one thread, with some work, and the creation
    // of a special InputStream or Reader implementation. Where it fails,
    // though, is the
    // requirement that tika-extracted metadata be included in the
    // RepositoryDocument right from the beginning. Effectively this means that
    // the entire document
    // must be parsed before it is handed downstream -- so basically a temporary
    // file (or in-memory buffer if small enough) must be created.
    // Instead of the elegant flow above, we have the following:
    // (1) Create a temporary file (or in-memory buffer if file is small enough)
    // (2) Run Tika to completion, streaming content output to temporary file
    // (3) Modify RepositoryDocument to read from temporary file, and include
    // Tika-extracted metadata
    // (4) Call downstream document processing

    // Prepare the destination storage
    DestinationStorage ds;

    if (document.getBinaryLength() <= inMemoryMaximumFile) {
      ds = new MemoryDestinationStorage((int) document.getBinaryLength());
    } else {
      ds = new FileDestinationStorage();
    }

    try {
      final Map<String, List<String>> metadata = new HashMap<>();
      final List<String> embeddedResourcesNames = new ArrayList<>();
      if (document.getFileName() != null) {
        metadata.put(TikaMetadataKeys.RESOURCE_NAME_KEY, new ArrayList<>());
        metadata.put("stream_name", new ArrayList<>());
        metadata.get(TikaMetadataKeys.RESOURCE_NAME_KEY).add(document.getFileName());
        metadata.get("stream_name").add(document.getFileName());
      }
      metadata.put("stream_size", new ArrayList<>());
      metadata.get("stream_size").add(String.valueOf(document.getBinaryLength()));

      // We only log the extraction
      final long startTime = System.currentTimeMillis();
      String resultCode = "OK";
      String description = "";
      Long length = 0L;
      boolean truncated = false;
      boolean resources_limit = false;

      int tikaServerResultCode = 0;

      try {
        try {

          // Process the document only if it is not an archive or if the extract archives
          // option is set to true
          if (!isArchive(document.getFileName(), document.getMimeType()) || isArchive(document.getFileName(), document.getMimeType()) && sp.extractArchives) {

            // Send document to the Tika Server
            final HttpPut httpPut = new HttpPut(rmetaURI);
            if (sp.writeLimit != -1) {
              httpPut.addHeader("writeLimit", String.valueOf(sp.writeLimit));
            }
            if (sp.maxEmbeddedResources != -1) {
              httpPut.addHeader("maxEmbeddedResources", String.valueOf(sp.maxEmbeddedResources));
            }
            final HttpEntity entity = new InputStreamEntity(document.getBinaryStream());
            httpPut.setEntity(entity);
            try {
              response = this.httpClient.execute(tikaHost, httpPut);
            } catch (final SocketTimeoutException e) { // The document is probably too big ! So we don't retry it
              resultCode = "TIKASERVERRESPONSETIMEOUT";
              description = "Socket timeout while processing document " + documentURI + " : " + e.getMessage();
              tikaServerResultCode = handleTikaServerError(description);
            } catch (final SocketException e) {
              // If the exception occurred after the connection, this probably means that the
              // tika server is not
              // down ! so retry {retryNumber} times without aborting the job in case of
              // failure
              if (!(e instanceof ConnectException) && !(e instanceof BindException) && !(e instanceof NoRouteToHostException) && !(e instanceof PortUnreachableException)) {
                resultCode = "TIKASERVERSOCKETEXCEPTION";
                description = "Socket exception while processing document " + documentURI + " : " + e.getMessage();
                tikaServerResultCode = handleTikaServerError(description);
                retryWithoutAbort(e);
              } else { // The tika server seams to be down : retry {retryNumber} times and abort the
                // job if it fails on
                // each retry
                resultCode = "TIKASERVEREXCEPTION";
                description = "Tika seemed to be down when requested to process document " + documentURI + " : " + e.getMessage();
                tikaServerResultCode = handleTikaServerError(description);
                triggerServiceInterruption(documentURI, e);
              }
            } catch (final NoHttpResponseException e) {
              // Tika probably does not manage to process document in time (task timeout)
              resultCode = "TIKASERVERNORESPONSEEXCEPTION";
              description = "Tika does not manage to treat " + documentURI + " (potential task timeout): " + e.getMessage();
              tikaServerResultCode = handleTikaServerError(description);
            } catch (final IOException e) { // Unknown problem with the Tika Server. Retry {retryNumber} times and abort
              // the job if it fails on
              // each retry
              resultCode = "TIKASERVEREXCEPTION";
              description = "Unknown Tika problem when processing document " + documentURI + " : " + e.getMessage();
              tikaServerResultCode = handleTikaServerError(description);
              triggerServiceInterruption(documentURI, e);
            }
            if (response != null) {
              final int responseCode = response.getStatusLine().getStatusCode();
              if (responseCode == 200 || responseCode == 204) {
                try (final OutputStream os = ds.getOutputStream(); Writer w = new OutputStreamWriter(os, StandardCharsets.UTF_8.name()); InputStream is = response.getEntity().getContent();) {

                  final JsonFactory jfactory = new JsonFactory();
                  final JsonParser jParser = jfactory.createParser(is);
                  JsonToken token = null;

                  // Go to beginning of metadata
                  boolean inMetadata = false;
                  while (!inMetadata && (token = jParser.nextToken()) != null) {
                    if (token == JsonToken.START_OBJECT) {
                      inMetadata = true;
                    }
                  }

                  int totalMetadataLength = 0;
                  boolean maxMetadataReached = false;
                  boolean metadataSkipped = false;
                  boolean metadataTruncated = false;

                  if (token != null) {
                    while ((token = jParser.nextToken()) != null && token != JsonToken.END_OBJECT) {
                      final int fieldNameLength = jParser.getTextLength();
                      if (fieldNameLength <= maxMetadataNameLength) {
                        final String fieldName = jParser.getCurrentName();
                        if (fieldName != null) {
                          if (fieldName.startsWith("X-Parsed-By")) {
                            skipMetadata(jParser);
                          } else if (fieldName.contentEquals("X-TIKA:content")) {
                            // Consume content
                            jParser.nextToken();
                            length += jParser.getText(w);
                          } else if (!fieldName.startsWith("X-TIKA")) {
                            token = jParser.nextToken();
                            if (!metadata.containsKey(fieldName)) {
                              totalMetadataLength += fieldName.length();
                              metadata.put(fieldName, new ArrayList<>());
                            }
                            if (token == JsonToken.START_ARRAY) {
                              while (jParser.nextToken() != JsonToken.END_ARRAY) {
                                if (jParser.getTextLength() <= sp.maxMetadataValueLength) {
                                  final int totalMetadataLengthPreview = totalMetadataLength + jParser.getTextLength();
                                  if (totalMetadataLengthPreview <= sp.totalMetadataLimit) {
                                    metadata.get(fieldName).add(jParser.getText());
                                    totalMetadataLength = totalMetadataLengthPreview;
                                  } else {
                                    maxMetadataReached = true;
                                  }
                                } else {
                                  metadataSkipped = true;
                                  if (Logging.ingest.isDebugEnabled()) {
                                    Logging.ingest
                                        .debug("Skip value of metadata " + fieldName + " of document " + documentURI + " because it exceeds the max value limit of " + sp.maxMetadataValueLength);
                                  }
                                }
                              }
                            } else {
                              if (jParser.getTextLength() <= sp.maxMetadataValueLength) {
                                final int totalMetadataLengthPreview = totalMetadataLength + jParser.getTextLength();
                                if (totalMetadataLengthPreview <= sp.totalMetadataLimit) {
                                  metadata.get(fieldName).add(jParser.getText());
                                } else {
                                  maxMetadataReached = true;
                                }
                              } else {
                                metadataSkipped = true;
                                if (Logging.ingest.isDebugEnabled()) {
                                  Logging.ingest
                                      .debug("Skip value of metadata " + fieldName + " of document " + documentURI + " because it exceeds the max value limit of " + sp.maxMetadataValueLength);
                                }
                              }
                            }
                            // Remove metadata if no data has been gathered
                            if (metadata.get(fieldName).isEmpty()) {
                              totalMetadataLength -= fieldName.length();
                              metadata.remove(fieldName);
                            }
                          } else if (fieldName.startsWith("X-TIKA:EXCEPTION:")) { // deal with Tika exceptions
                            boolean unknownException = false;
                            if (fieldName.contentEquals("X-TIKA:EXCEPTION:write_limit_reached")) {
                              resultCode = "TRUNCATEDOK";
                              truncated = true;
                            } else if (fieldName.contentEquals("X-TIKA:EXCEPTION:embedded_resource_limit_reached")) {
                              resources_limit = true;
                            } else if (!fieldName.contentEquals("X-TIKA:EXCEPTION:warn")) { // If the exception is other than a warning message
                              unknownException = true;
                              resultCode = "TIKAEXCEPTION";
                              description += getTikaExceptionDesc(jParser) + System.lineSeparator();
                            }
                            if (!unknownException) {
                              skipMetadata(jParser);
                            }
                          } else if (fieldName.startsWith("X-TIKA:WARN:truncated_metadata")) {
                            metadataTruncated = true;
                            skipMetadata(jParser);
                          } else {
                            skipMetadata(jParser);
                          }
                        }
                      } else {
                        metadataSkipped = true;
                        if (Logging.ingest.isDebugEnabled()) {
                          Logging.ingest.debug("Skip a metadata of document " + documentURI + " because its name exceeds the max allowed length of " + maxMetadataNameLength);
                        }
                        skipMetadata(jParser);
                      }
                    }

                    // If token not null then there are embedded resources, process them if the extractArchives option is enabled
                    if (token != null && token == JsonToken.END_OBJECT && sp.extractArchives) {
                      // For embedded resource we only gather resourceNames and resources content, skip the rest
                      while ((token = jParser.nextToken()) != null) {
                        final String fieldName = jParser.getCurrentName();
                        if (fieldName != null && fieldName.contentEquals("resourceName")) {
                          token = jParser.nextToken();
                          if (jParser.getTextLength() <= sp.maxMetadataValueLength) {
                            embeddedResourcesNames.add(jParser.getText());
                          } else {
                            metadataSkipped = true;
                          }
                        } else if (fieldName != null && fieldName.contentEquals("X-TIKA:content")) {
                          // Add embedded resource content to main document content
                          jParser.nextToken();
                          length += jParser.getText(w);
                        }
                      }
                    }

                    jParser.close();
                  }

                  // If the are embedded resources, add their names, if possible, to the metadata
                  for (final String embeddedResourceName : embeddedResourcesNames) {
                    final int resourceNameBytesLength = embeddedResourceName.getBytes(StandardCharsets.UTF_8).length;

                    final int totalMetadataLengthPreview = totalMetadataLength + resourceNameBytesLength;
                    if (totalMetadataLengthPreview <= sp.totalMetadataLimit) {
                      if (!metadata.containsKey("embeddedResourcesNames")) {
                        totalMetadataLength += "embeddedResourcesNames".getBytes(StandardCharsets.UTF_8).length;
                        metadata.put("embeddedResourcesNames", new ArrayList<>());
                      }
                      metadata.get("embeddedResourcesNames").add(embeddedResourceName);
                      totalMetadataLength += resourceNameBytesLength;
                    } else {
                      maxMetadataReached = true;
                    }

                  }

                  if (maxMetadataReached) {
                    description += "Some metadata have been skipped because the total metadata limit of " + sp.totalMetadataLimit + " has been reached" + System.lineSeparator();
                  } else if (metadataSkipped) {
                    description += "Some metadata have been skipped because their names or values exceeded the limits" + System.lineSeparator();
                  }

                  if (metadataTruncated) {
                    description += "Some metadata have been truncated by Tika because they exceeded the limits specified in the Tika conf" + System.lineSeparator();
                  }
                }
              } else if (responseCode == 503) {
                // Service interruption; Tika trying to come up.
                // Retry unlimited times, retryInterval ms between retries
                resultCode = "TIKASERVERUNAVAILABLE";
                description = "Tika Server was unavailable: 503 " + response.getStatusLine().getReasonPhrase();
                tikaServerResultCode = handleTikaServerError(description);
                Logging.ingest.warn("Tika Server unavailable, retrying...");
                final long currentTime = System.currentTimeMillis();
                throw new ServiceInterruption("Tika Server unavailable, retrying...", new Exception(description), currentTime + retryInterval, -1L, -1, false);
              } else {
                if (responseCode == 500) {
                  resultCode = "TIKASERVERERROR";
                  description = "Tika Server failed to parse document with the following error: " + response.getStatusLine().getReasonPhrase();
                  tikaServerResultCode = handleTikaServerError(description);
                } else {
                  resultCode = "TIKASERVERREJECTS";
                  description = "Tika Server rejected document " + documentURI + " with the following reason: " + response.getStatusLine().getReasonPhrase();
                  tikaServerResultCode = handleTikaServerRejects(description);
                }
              }
            }
          } else {
            resultCode = "EXCLUDED";
            description = "Detected as an archive file and the extract archives option is set to false";
          }

        } catch (final IOException e) {
          resultCode = "TIKASERVERRESPONSEISSUE";
          if (e.getMessage() != null) {
            description = e.getMessage();
          }
          tikaServerResultCode = handleTikaServerException(e);
        } finally {
          if (response != null) {
            response.close();
          }
        }

        if (!activities.checkLengthIndexable(ds.getBinaryLength())) {
          activities.noDocument();
          resultCode = activities.EXCLUDED_LENGTH;
          description = "Downstream pipeline rejected document with length " + ds.getBinaryLength();
          return DOCUMENTSTATUS_REJECTED;
        }

      } finally {
        // Before injecting activity record, clean the description as it can contains non ascii chars that can cause errors during SQL insertion
        description = description.replaceAll("[^\\x20-\\x7e]", "");
        // Log the extraction processing
        activities.recordActivity(startTime, ACTIVITY_EXTRACT, length, documentURI, resultCode, description);
      }

      // Parsing complete!
      // Create a copy of Repository Document
      final RepositoryDocument docCopy = document.duplicate();

      // Open new input stream
      final InputStream is = ds.getInputStream();

      // Get new stream length
      final long newBinaryLength = ds.getBinaryLength();

      try {
        docCopy.setBinary(is, newBinaryLength);

        // Set up all metadata from Tika. We may want to run this through a
        // mapper eventually...
        for (String mName : metadata.keySet()) {
          String[] values = metadata.get(mName).toArray(new String[0]);

          // Only keep metadata if its name does not exceed 8k chars to avoid HTTP header error
          if (mName.length() < maxMetadataNameLength) {
            if (sp.lowerNames()) {
              final StringBuilder sb = new StringBuilder();
              for (int i = 0; i < mName.length(); i++) {
                char ch = mName.charAt(i);
                if (!Character.isLetterOrDigit(ch)) {
                  ch = '_';
                } else {
                  ch = Character.toLowerCase(ch);
                }
                sb.append(ch);
              }
              mName = sb.toString();
            }
            final String target = sp.getMapping(mName);
            if (target != null) {
              if (docCopy.getField(target) != null) {
                final String[] persistentValues = docCopy.getFieldAsStrings(target);
                values = ArrayUtils.addAll(persistentValues, values);
              }
              docCopy.addField(target, values);
            } else {
              if (sp.keepAllMetadata()) {
                if (docCopy.getField(mName) != null) {
                  final String[] persistentValues = docCopy.getFieldAsStrings(mName);
                  values = ArrayUtils.addAll(persistentValues, values);
                }
                docCopy.addField(mName, values);
              }
            }
          }
        }

        if (truncated) {
          removeField(docCopy, "truncated");
          docCopy.addField("truncated", "true");
        } else {
          removeField(docCopy, "truncated");
          docCopy.addField("truncated", "false");

        }

        if (resources_limit) {
          removeField(docCopy, "resources_limit");
          docCopy.addField("resources_limit", "true");
        } else {
          removeField(docCopy, "resources_limit");
          docCopy.addField("resources_limit", "false");

        }

        // Send new document downstream
        final int sendDocumentResultCode = activities.sendDocument(documentURI, docCopy);
        if (sendDocumentResultCode == 0) {
          return tikaServerResultCode;
        } else {
          return sendDocumentResultCode;
        }
      } finally {
        // This is really important to close the input stream in a finally statement as it will wait that the input stream is fully read (or closed) by down pipeline
        is.close();
      }
    } finally {
      if (ds != null) {
        ds.close();
      }
    }

  }