public void processDocuments()

in connectors/jdbc/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/jdbc/JDBCConnector.java [303:953]


  public void processDocuments(String[] documentIdentifiers, IExistingVersions statuses, Specification spec,
    IProcessActivity activities, int jobMode, boolean usesDefaultAuthority)
    throws ManifoldCFException, ServiceInterruption
  {
    TableSpec ts = new TableSpec(spec);
    
    Set<String> acls = ts.getAcls();

    String[] versionsReturned = new String[documentIdentifiers.length];

    // If there is no version query, then always return empty string for all documents.
    // This will mean that processDocuments will be called
    // for all.  ProcessDocuments will then be responsible for doing document deletes itself,
    // based on the query results.

    Map<String,String> documentVersions = new HashMap<String,String>();
    if (ts.versionQuery != null && ts.versionQuery.length() > 0)
    {
      // If there IS a versions query, do it.  First set up the variables, then do the substitution.
      VariableMap vm = new VariableMap();
      addConstant(vm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
      addConstant(vm,JDBCConstants.versionReturnVariable,JDBCConstants.versionReturnColumnName);
      if (addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,null))
      {
        // Do the substitution
        ArrayList paramList = new ArrayList();
        StringBuilder sb = new StringBuilder();
        substituteQuery(ts.versionQuery,vm,sb,paramList);

        // Now, build a result return, and a hash table so we can correlate the returned values with the place to put them.
        // We presume that if the row is missing, the document is gone.
        // Fire off the query!
        getSession();
        IDynamicResultSet result;
        String queryText = sb.toString();
        long startTime = System.currentTimeMillis();
        // Get a dynamic resultset.  Contract for dynamic resultset is that if
        // one is returned, it MUST be closed, or a connection will leak.
        try
        {
          result = connection.executeUncachedQuery(queryText,paramList,-1);
        }
        catch (ManifoldCFException e)
        {
          // If failure, record the failure.
          if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
            activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
              createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
          throw e;
        }
        try
        {
          // If success, record that too.
          activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
            createQueryString(queryText,paramList), "OK", null, null);
          // Now, go through resultset
          while (true)
          {
            IDynamicResultRow row = result.getNextRow();
            if (row == null)
              break;
            try
            {
              Object o = row.getValue(JDBCConstants.idReturnColumnName);
              if (o == null)
                throw new ManifoldCFException("Bad version query; doesn't return $(IDCOLUMN) column.  Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\", or, for MySQL, select \"by label\" in your repository connection.");
              String idValue = JDBCConnection.readAsString(o);
              o = row.getValue(JDBCConstants.versionReturnColumnName);
              String versionValue;
              // Null version is OK; make it a ""
              if (o == null)
                versionValue = "";
              else
                versionValue = JDBCConnection.readAsString(o);
              documentVersions.put(idValue,versionValue);
            }
            finally
            {
              row.close();
            }
          }
        }
        finally
        {
          result.close();
        }
      }
    }
    else
    {
      for (String documentIdentifier : documentIdentifiers)
      {
        documentVersions.put(documentIdentifier,"");
      }
    }

    // Delete the documents that had no version, and work only on ones that did
    final Set<String> fetchDocuments = documentVersions.keySet();
    for (String documentIdentifier : documentIdentifiers)
    {
      String documentVersion = documentVersions.get(documentIdentifier);
      if (documentVersion == null)
      {
        activities.deleteDocument(documentIdentifier);
        continue;
      }
    }

    // Pick up document acls
    Map<String,Set<String>> documentAcls = new HashMap<String,Set<String>>();
    if (ts.securityOn)
    {
      if (acls.size() == 0 && ts.aclQuery != null && ts.aclQuery.length() > 0)
      {
        // If there IS an acls query, do it.  First set up the variables, then do the substitution.
        VariableMap vm = new VariableMap();
        addConstant(vm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
        addConstant(vm,JDBCConstants.tokenReturnVariable,JDBCConstants.tokenReturnColumnName);
        if (addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,fetchDocuments))
        {
          // Do the substitution
          ArrayList paramList = new ArrayList();
          StringBuilder sb = new StringBuilder();
          substituteQuery(ts.aclQuery,vm,sb,paramList);

          // Fire off the query!
          getSession();
          IDynamicResultSet result;
          String queryText = sb.toString();
          long startTime = System.currentTimeMillis();
          // Get a dynamic resultset.  Contract for dynamic resultset is that if
          // one is returned, it MUST be closed, or a connection will leak.
          try
          {
            result = connection.executeUncachedQuery(queryText,paramList,-1);
          }
          catch (ManifoldCFException e)
          {
            // If failure, record the failure.
            if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
              activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
                createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
            throw e;
          }
          try
          {
            // If success, record that too.
            activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
              createQueryString(queryText,paramList), "OK", null, null);
            // Now, go through resultset
            while (true)
            {
              IDynamicResultRow row = result.getNextRow();
              if (row == null)
                break;
              try
              {
                Object o = row.getValue(JDBCConstants.idReturnColumnName);
                if (o == null)
                  throw new ManifoldCFException("Bad acl query; doesn't return $(IDCOLUMN) column.  Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\", or, for MySQL, select \"by label\" in your repository connection.");
                String idValue = JDBCConnection.readAsString(o);
                o = row.getValue(JDBCConstants.tokenReturnColumnName);
                String tokenValue;
                if (o == null)
                  tokenValue = "";
                else
                  tokenValue = JDBCConnection.readAsString(o);
                // Versions that are "", when processed, will have their acls fetched at that time...
                Set<String> dcs = documentAcls.get(idValue);
                if (dcs == null)
                {
                  dcs = new HashSet<String>();
                  documentAcls.put(idValue,dcs);
                }
                dcs.add(tokenValue);
              }
              finally
              {
                row.close();
              }
            }
          }
          finally
          {
            result.close();
          }
        }
      }
      else
      {
        for (String documentIdentifier : fetchDocuments)
        {
          documentAcls.put(documentIdentifier,acls);
        }
      }
    }
    
    // Map from identifier to version string
    final Map<String,String> map = new HashMap<>();
    // This is the set of documents actually seen
    final Set<String> seenDocuments = new HashSet<>();
    
    for (String documentIdentifier : fetchDocuments)
    {
      String documentVersion = documentVersions.get(documentIdentifier);
      if (documentVersion.length() == 0)
      {
        map.put(documentIdentifier,documentVersion);
      }
      else
      {
        // Compute a full version string
        StringBuilder sb = new StringBuilder();
        Set<String> dAcls = documentAcls.get(documentIdentifier);
        if (dAcls == null)
          sb.append('-');
        else
        {
          sb.append('+');
          String[] aclValues = new String[dAcls.size()];
          int k = 0;
          for (String acl : dAcls)
          {
            aclValues[k++] = acl;
          }
          java.util.Arrays.sort(aclValues);
          packList(sb,aclValues,'+');
        }

        sb.append(documentVersion).append("=").append(ts.dataQuery);
        String versionValue = sb.toString();

        if (activities.checkDocumentNeedsReindexing(documentIdentifier,versionValue))
        {
          map.put(documentIdentifier,versionValue);
        }
      }
    }
    
    // We have a primary query and a number of attribute queries to execute.
    // We execute the attribute queries first because those do not include binary data.
    final Map<String, Map<String, Set<String>>> attributeValues = new HashMap<String, Map<String, Set<String>>>();
    int index = 0;
    while (index < spec.getChildCount())
    {
      SpecificationNode sn = spec.getChild(index++);
      if (sn.getType().equals(JDBCConstants.attributeQueryNode))
      {
        final String attributeName = sn.getAttributeValue(JDBCConstants.attributeName);
        final String attributeQuery = sn.getValue();
        // Fire off attribute query
        VariableMap attrVm = new VariableMap();
        addConstant(attrVm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
        addConstant(attrVm,JDBCConstants.dataReturnVariable,JDBCConstants.dataReturnColumnName);
        if (!addIDList(attrVm,JDBCConstants.idListVariable,documentIdentifiers,map.keySet()))
          continue;
        
        // Do the substitution
        ArrayList paramList = new ArrayList();
        StringBuilder sb = new StringBuilder();
        substituteQuery(attributeQuery,attrVm,sb,paramList);

        // Fire off the query!
        getSession();
        IDynamicResultSet result;
        String queryText = sb.toString();
        long startTime = System.currentTimeMillis();
        // Get a dynamic resultset.  Contract for dynamic resultset is that if
        // one is returned, it MUST be closed, or a connection will leak.
        try
        {
          result = connection.executeUncachedQuery(queryText,paramList,-1);
        }
        catch (ManifoldCFException e)
        {
          // If failure, record the failure.
          if (e.getErrorCode() != ManifoldCFException.INTERRUPTED)
            activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
              createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
          throw e;
        }
        try
        {
          // If success, record that too.
          activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
            createQueryString(queryText,paramList), "OK", null, null);
          // Now, go through resultset
          while (true)
          {
            IDynamicResultRow row = result.getNextRow();
            if (row == null)
              break;
            try
            {
              Object o = row.getValue(JDBCConstants.idReturnColumnName);
              if (o == null)
                throw new ManifoldCFException("Bad attribute query; doesn't return $(IDCOLUMN) column.  Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\", or, for MySQL, select \"by label\" in your repository connection.");
              String idValue = JDBCConnection.readAsString(o);
              o = row.getValue(JDBCConstants.dataReturnColumnName);
              String dataValue;
              if (o == null)
                dataValue = "";
              else
                dataValue = JDBCConnection.readAsString(o);
              Map<String, Set<String>> avs = attributeValues.get(idValue);
              if (avs == null)
              {
                avs = new HashMap<String, Set<String>>();
                attributeValues.put(idValue,avs);
              }
              Set<String> dataValues = avs.get(attributeName);
              if (dataValues == null)
              {
                dataValues = new HashSet<String>();
                avs.put(attributeName, dataValues);
              }
              dataValues.add(dataValue);
            }
            finally
            {
              row.close();
            }
          }
        }
        finally
        {
          result.close();
        }
      }
    }
    
    // For all the documents not marked "scan only", form a query and pick up the contents.
    // If the contents is not found, then explicitly call the delete action method.
    VariableMap vm = new VariableMap();
    addConstant(vm,JDBCConstants.idReturnVariable,JDBCConstants.idReturnColumnName);
    addConstant(vm,JDBCConstants.urlReturnVariable,JDBCConstants.urlReturnColumnName);
    addConstant(vm,JDBCConstants.dataReturnVariable,JDBCConstants.dataReturnColumnName);
    addConstant(vm,JDBCConstants.contentTypeReturnVariable,JDBCConstants.contentTypeReturnColumnName);
    if (!addIDList(vm,JDBCConstants.idListVariable,documentIdentifiers,map.keySet()))
      return;

    // Do the substitution
    ArrayList paramList = new ArrayList();
    StringBuilder sb = new StringBuilder();
    substituteQuery(ts.dataQuery,vm,sb,paramList);

    // Execute the query
    getSession();
    IDynamicResultSet result;
    String queryText = sb.toString();
    long startTime = System.currentTimeMillis();
    // Get a dynamic resultset.  Contract for dynamic resultset is that if
    // one is returned, it MUST be closed, or a connection will leak.
    try
    {
      result = connection.executeUncachedQuery(queryText,paramList,-1);
    }
    catch (ManifoldCFException e)
    {
      // If failure, record the failure.
      activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
        createQueryString(queryText,paramList), "ERROR", e.getMessage(), null);
      throw e;
    }
    try
    {
      // If success, record that too.
      activities.recordActivity(new Long(startTime), ACTIVITY_EXTERNAL_QUERY, null,
        createQueryString(queryText,paramList), "OK", null, null);

      while (true)
      {
        IDynamicResultRow row = result.getNextRow();
        if (row == null)
          break;
        try
        {
          Object o = row.getValue(JDBCConstants.idReturnColumnName);
          if (o == null)
            throw new ManifoldCFException("Bad document query; doesn't return $(IDCOLUMN) column.  Try using quotes around $(IDCOLUMN) variable, e.g. \"$(IDCOLUMN)\", or, for MySQL, select \"by label\" in your repository connection.");
          final String id = JDBCConnection.readAsString(o);
          seenDocuments.add(id);
          
          String errorCode = null;
          String errorDesc = null;
          Long fileLengthLong = null;
          long fetchStartTime = System.currentTimeMillis();
          
          try
          {
            String version = map.get(id);
            if (version == null)
              // Does not need refetching
              continue;

            // This document was marked as "not scan only", so we expect to find it.
            if (Logging.connectors.isDebugEnabled())
              Logging.connectors.debug("JDBC: Document data result found for '"+id+"'");
            
            // We will determine the disposition of the document here, so remove this id from the map in order that we know what we still
            // need to delete when all done.
            map.remove(id);

            o = row.getValue(JDBCConstants.urlReturnColumnName);
            if (o == null)
            {
              Logging.connectors.debug("JDBC: Document '"+id+"' has a null url - skipping");
              errorCode = activities.NULL_URL;
              errorDesc = "Excluded because document had a null URL";
              activities.noDocument(id,version);
              continue;
            }
            
            // This is not right - url can apparently be a BinaryInput
            String url = JDBCConnection.readAsString(o);
            boolean validURL;
            try
            {
              // Check to be sure url is valid
              new java.net.URI(url);
              validURL = true;
            }
            catch (java.net.URISyntaxException e)
            {
              validURL = false;
            }

            if (!validURL)
            {
              Logging.connectors.debug("JDBC: Document '"+id+"' has an illegal url: '"+url+"' - skipping");
              errorCode = activities.BAD_URL;
              errorDesc = "Excluded because document had illegal URL ('"+url+"')";
              activities.noDocument(id,version);
              continue;
            }
            
            // Process the document itself
            Object contents = row.getValue(JDBCConstants.dataReturnColumnName);
            // Null data is allowed; we just ignore these
            if (contents == null)
            {
              Logging.connectors.debug("JDBC: Document '"+id+"' seems to have null data - skipping");
              errorCode = "NULLDATA";
              errorDesc = "Excluded because document had null data";
              activities.noDocument(id,version);
              continue;
            }
            
            String contentType;
            o = row.getValue(JDBCConstants.contentTypeReturnColumnName);
            if (o != null)
              contentType = JDBCConnection.readAsString(o);
            else
            {
              if (contents instanceof BinaryInput)
                contentType = "application/octet-stream";
              else if (contents instanceof CharacterInput)
                contentType = "text/plain; charset=utf-8";
              else
                contentType = "text/plain";
            }
                    
            if (!activities.checkMimeTypeIndexable(contentType))
            {
              Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of mime type - skipping");
              errorCode = activities.EXCLUDED_MIMETYPE;
              errorDesc = "Excluded because of mime type ("+contentType+")";
              activities.noDocument(id,version);
              continue;
            }
                    
            if (!activities.checkURLIndexable(url))
            {
              Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of url - skipping");
              errorCode = activities.EXCLUDED_URL;
              errorDesc = "Excluded because of URL ('"+url+"')";
              activities.noDocument(id,version);
              continue;
            }

            // An ingestion will take place for this document.
            RepositoryDocument rd = new RepositoryDocument();
            rd.setMimeType(contentType);

            applyMultiAttributeValues(rd,attributeValues.get(id));
            applyAccessTokens(rd,documentAcls.get(id));
            applyMetadata(rd,row);

            if (contents instanceof BinaryInput)
            {

              BinaryInput bi = (BinaryInput)contents;
              long fileLength = bi.getLength();
                      
              if (!activities.checkLengthIndexable(fileLength))
              {
                Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of length - skipping");
                errorCode = activities.EXCLUDED_LENGTH;
                errorDesc = "Excluded because of length ("+fileLength+")";
                activities.noDocument(id, version);
                continue;
              }

              try
              {
                // Read the stream
                InputStream is = bi.getStream();
                try
                {
                  rd.setBinary(is,fileLength);
                  activities.ingestDocumentWithException(id, version, url, rd);
                  errorCode = "OK";
                  fileLengthLong = new Long(fileLength);
                }
                finally
                {
                  is.close();
                }
              }
              catch (IOException e)
              {
                errorCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
                errorDesc = e.getMessage();
                handleIOException(id,e);
              }
            }
            else if (contents instanceof CharacterInput)
            {
              CharacterInput ci = (CharacterInput)contents;
              long fileLength = ci.getUtf8StreamLength();
              
              if (!activities.checkLengthIndexable(fileLength))
              {
                Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of length - skipping");
                errorCode = activities.EXCLUDED_LENGTH;
                errorDesc = "Excluded because of length ("+fileLength+")";
                activities.noDocument(id, version);
                continue;
              }
                      
              try
              {
                // Read the stream
                InputStream is = ci.getUtf8Stream();
                try
                {
                  rd.setBinary(is,fileLength);
                  activities.ingestDocumentWithException(id, version, url, rd);
                  errorCode = "OK";
                  fileLengthLong = new Long(fileLength);
                }
                finally
                {
                  is.close();
                }
              }
              catch (IOException e)
              {
                errorCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
                errorDesc = e.getMessage();
                handleIOException(id,e);
              }
            }
            else
            {
              // Turn it into a string, and then into a stream
              String value = contents.toString();
              byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
              long fileLength = bytes.length;
              
              if (!activities.checkLengthIndexable(fileLength))
              {
                Logging.connectors.debug("JDBC: Document '"+id+"' excluded because of length - skipping");
                errorCode = activities.EXCLUDED_LENGTH;
                errorDesc = "Excluded because of length ("+fileLength+")";
                activities.noDocument(id, version);
                continue;
              }

              try
              {
                InputStream is = new ByteArrayInputStream(bytes);
                try
                {
                  rd.setBinary(is,fileLength);
                  activities.ingestDocumentWithException(id, version, url, rd);
                  errorCode = "OK";
                  fileLengthLong = new Long(fileLength);
                }
                finally
                {
                  is.close();
                }
              }
              catch (IOException e)
              {
                errorCode = e.getClass().getSimpleName().toUpperCase(Locale.ROOT);
                errorDesc = e.getMessage();
                handleIOException(id,e);
              }
            }
          }
          catch (ManifoldCFException e)
          {
            if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
              errorCode = null;
            throw e;
          }
          finally
          {
            if (errorCode != null)
              activities.recordActivity(new Long(fetchStartTime), ACTIVITY_FETCH,
                fileLengthLong, id, errorCode, errorDesc, null);
          }
        }
        finally
        {
          row.close();
        }
      }

    }
    finally
    {
      result.close();
    }
    
    // Now, go through the original id's, and see which ones are still in the map.  These
    // did not appear in the result and are presumed to be gone from the database, and thus must be deleted.
    for (final String documentIdentifier : fetchDocuments)
    {
      if (!seenDocuments.contains(documentIdentifier))
      {
        // Never saw it in the fetch attempt
        activities.deleteDocument(documentIdentifier);
      }
      else
      {
        // Saw it in the fetch attempt, and we might have fetched it
        final String documentVersion = map.get(documentIdentifier);
        if (documentVersion != null)
        {
          // This means we did not see it (or data for it) in the result set.  Delete it!
          activities.noDocument(documentIdentifier,documentVersion);
          activities.recordActivity(null, ACTIVITY_FETCH,
            null, documentIdentifier, "NOTFETCHED", "Document was not seen by processing query", null);
        }
      }
    }

  }