public void run()

in framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java [69:871]


  public void run()
  {
    // Register this thread in the worker reset manager
    resetManager.registerMe();

    try
    {
      // Create a thread context object.
      IThreadContext threadContext = ThreadContextFactory.make();
      IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
      IJobManager jobManager = JobManagerFactory.make(threadContext);
      IBinManager binManager = BinManagerFactory.make(threadContext);
      IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
      ITransformationConnectionManager transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
      IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
      IReprioritizationTracker rt = ReprioritizationTrackerFactory.make(threadContext);

      IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
      
      // This is the set of documents that we will either be marking as complete, or requeued, depending on the kind of crawl.
      List<QueuedDocument> finishList = new ArrayList<QueuedDocument>();

      // This is where we accumulate the document QueuedDocuments to be deleted from the job queue.
      List<QueuedDocument> deleteList = new ArrayList<QueuedDocument>();

      // This is where we accumulate documents that need to be placed in the HOPCOUNTREMOVED
      // state
      List<QueuedDocument> hopcountremoveList = new ArrayList<QueuedDocument>();
      
      // This is where we accumulate documents that need to be rescanned
      List<QueuedDocument> rescanList = new ArrayList<QueuedDocument>();
      
      // This is where we store document ID strings of documents that need to be noted as having
      // been checked.
      List<String> ingesterCheckList = new ArrayList<String>();

      // Service interruption thrown with "abort on fail".
      ManifoldCFException abortOnFail = null;
      
      // Loop
      while (true)
      {
        // Do another try/catch around everything in the loop
        try
        {
          if (Thread.currentThread().isInterrupted())
            throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);

          // Before we begin, conditionally reset
          resetManager.waitForReset(threadContext);

          // Once we pull something off the queue, we MUST make sure that
          // we update its status, even if there is an exception!!!

          // See if there is anything on the queue for me
          QueuedDocumentSet qds = documentQueue.getDocument(queueTracker);
          if (qds == null)
            // It's a reset, so recycle
            continue;

          try
          {
            // System.out.println("Got a document set");

            if (Thread.currentThread().isInterrupted())
              throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);

            // First of all: find out if the job for these documents has been aborted, paused, etc.
            // If so, we requeue the documents immediately.
            IJobDescription job = qds.getJobDescription();
            Long jobID = job.getID();
            if (jobManager.checkJobActive(jobID) == false)
              // Recycle; let these documents be requeued and go get the next set.
              continue;

            if (Logging.threads.isDebugEnabled())
            {
              StringBuilder sb = new StringBuilder();
              for (int z = 0; z < qds.getCount(); z++)
              {
                sb.append(qds.getDocument(z).getDocumentDescription().getID()).append(" ");
              }
              Logging.threads.debug("Worker thread processing documents: "+sb);
            }

            // Build a basic pipeline specification right off; we need it whenever
            // we interact with Incremental Ingester.
            IPipelineConnections pipelineConnections = new PipelineConnections(new PipelineSpecificationBasic(job),transformationConnectionManager,outputConnectionManager);
            
            String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineConnections);

            // Universal job data we'll need later
            String connectionName = job.getConnectionName();
            Specification spec = job.getSpecification();
            int jobType = job.getType();

            IRepositoryConnection connection = qds.getConnection();
            
            OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr);

            // The flow through this section of the code is as follows.
            // (1) We start with a list of documents
            // (2) We attempt to do various things to these documents
            // (3) Based on what happens, and what errors we get, we progressively move documents out of the main list
            //     and into secondary lists that will be all treated in the same way
            
            // First, initialize the active document set to contain everything.
            List<QueuedDocument> activeDocuments = new ArrayList<QueuedDocument>(qds.getCount());
            
            for (int i = 0; i < qds.getCount(); i++)
            {
              QueuedDocument qd = qds.getDocument(i);
              activeDocuments.add(qd);
            }

            // Clear out all of our disposition lists
            finishList.clear();
            deleteList.clear();
            ingesterCheckList.clear();
            hopcountremoveList.clear();
            rescanList.clear(); //                  jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1);
            abortOnFail = null;

            // Keep track of the starting processing time, for statistics calculation
            long processingStartTime = System.currentTimeMillis();
            // Log these documents in the overlap calculator
            qds.beginProcessing(queueTracker);
            try
            {
              long currentTime = System.currentTimeMillis();

              if (Logging.threads.isDebugEnabled())
                Logging.threads.debug("Worker thread starting document count is "+Integer.toString(activeDocuments.size()));

              // Get the legal link types.  This is needed for later hopcount checking.
              String[] legalLinkTypes = null;
              if (activeDocuments.size() > 0)
              {
                legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
                // If this came back null, it means that there is no underlying implementation available, so treat this like a kind of service interruption.
                if (legalLinkTypes == null)
                {
                  // Failure here puts all remaining documents into rescan list
                  if (Logging.threads.isDebugEnabled())
                    Logging.threads.debug(" Moving "+makeListString(activeDocuments)+" to rescanList");
                  moveList(activeDocuments,rescanList);
                }
              }

              if (Logging.threads.isDebugEnabled())
                Logging.threads.debug("Post-relationship document count is "+Integer.toString(activeDocuments.size()));

              // Do the hopcount checks, if any.  This will iteratively reduce the viable list of
              // document identifiers in need of having their versions fetched.
              if (legalLinkTypes != null && activeDocuments.size() > 0)
              {
                // Set up the current ID array
                String[] currentDocIDHashArray = new String[activeDocuments.size()];
                for (int i = 0; i < currentDocIDHashArray.length; i++)
                {
                  currentDocIDHashArray[i] = activeDocuments.get(i).getDocumentDescription().getDocumentIdentifierHash();
                }
                Map filterMap = job.getHopCountFilters();
                Iterator filterIter = filterMap.keySet().iterator();
                // Array to accumulate hopcount results for all link types
                boolean[] overallResults = new boolean[currentDocIDHashArray.length];
                for (int i = 0; i < overallResults.length; i++)
                {
                  overallResults[i] = true;
                }
                // Calculate the hopcount result for each link type, and fold it in.
                while (filterIter.hasNext())
                {
                  String linkType = (String)filterIter.next();
                  int maxHop = (int)((Long)filterMap.get(linkType)).longValue();
                  boolean[] results = jobManager.findHopCounts(job.getID(),legalLinkTypes,currentDocIDHashArray,linkType,
                    maxHop,job.getHopcountMode());
                  for (int i = 0; i < results.length; i++)
                  {
                    overallResults[i] = overallResults[i] && results[i];
                  }
                }
                // Move all documents to the appropriate list
                List<QueuedDocument> newActiveSet = new ArrayList<QueuedDocument>(activeDocuments.size());
                for (int i = 0; i < overallResults.length; i++)
                {
                  if (overallResults[i] == false)
                  {
                    if (Logging.threads.isDebugEnabled())
                      Logging.threads.debug(" Adding "+activeDocuments.get(i).getDocumentDescription().getID()+" to hopcountremovelist");
                    hopcountremoveList.add(activeDocuments.get(i));
                  }
                  else
                  {
                    newActiveSet.add(activeDocuments.get(i));
                  }
                }
                activeDocuments = newActiveSet;
              }

              if (Logging.threads.isDebugEnabled())
                Logging.threads.debug(" Post-hopcount pruned document count is "+Integer.toString(activeDocuments.size()));
              
              // From here on down we need a connector instance, so get one.
              IRepositoryConnector connector = null;
              if (activeDocuments.size() > 0 || hopcountremoveList.size() > 0)
              {
                connector = repositoryConnectorPool.grab(connection);

                // If we wind up with a null here, it means that a document got queued for a connector which is now gone.
                // Basically, what we want to do in that case is to treat this kind of like a service interruption - the document
                // must be requeued for immediate reprocessing.  When the rest of the world figures out that the job that owns this
                // document is in fact unable to function, we'll stop getting such documents handed to us, because the state of the
                // job will be changed.

                if (connector == null)
                {
                  // Failure here puts all remaining documents into rescan list
                  if (Logging.threads.isDebugEnabled())
                    Logging.threads.debug(" Moving "+makeListString(activeDocuments)+" to rescanList");
                  moveList(activeDocuments,rescanList);
                  if (Logging.threads.isDebugEnabled())
                    Logging.threads.debug(" Moving "+makeListString(hopcountremoveList)+" to rescanList");
                  moveList(hopcountremoveList,rescanList);
                }
              }
              
              if (connector != null)
              {
                // Open try/finally block to free the connector instance no matter what
                try
                {
                  // Check for interruption before we start fetching
                  if (Thread.currentThread().isInterrupted())
                    throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
                  
                  // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
                  // We put this in a map so it can be looked up by document identifier.
                  // Create a full PipelineSpecification, including description strings.  (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
                  IPipelineSpecification pipelineSpecification;
                  try
                  {
                    pipelineSpecification = new PipelineSpecification(pipelineConnections,job,ingester);
                  }
                  catch (ServiceInterruption e)
                  {
                    // Handle service interruption from pipeline
                    if (!e.jobInactiveAbort())
                      Logging.jobs.warn("Service interruption reported for job "+
                      job.getID()+" connection '"+job.getConnectionName()+"': "+
                      e.getMessage());

                    // All documents get requeued, because we never got far enough to make distinctions.  All we have to decide
                    // is whether to requeue or abort.
                    List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();

                    for (QueuedDocument qd : activeDocuments)
                    {
                      DocumentDescription dd = qd.getDocumentDescription();
                      // Check for hard failure.  But no hard failure possible of it's a job inactive abort.
                      if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
                        dd.getFailRetryCount() == 0))
                      {
                        // Treat this as a hard failure.
                        if (e.isAbortOnFail())
                        {
                          rescanList.add(qd);
                          abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
                        }
                        else
                        {
                          if (Logging.threads.isDebugEnabled())
                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList because of output service interruption");
                          requeueList.add(qd);
                        }
                      }
                      else
                      {
                        if (Logging.threads.isDebugEnabled())
                          Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList because of output service interruption");
                        requeueList.add(qd);
                      }
                    }
                      
                    requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
                      e.getFailRetryCount());
                    
                    Logging.threads.debug(" Clearing active documents list due to output service interruption");
                    activeDocuments.clear();
                    pipelineSpecification = null;
                  }

                  if (activeDocuments.size() > 0)
                  {
                    
                    // **** New worker thread code starts here!!! ****
                    
                    IExistingVersions existingVersions = new ExistingVersions(lastIndexedOutputConnectionName,activeDocuments);
                    String aclAuthority = connection.getACLAuthority();
                    if (aclAuthority == null)
                      aclAuthority = "";
                    boolean isDefaultAuthority = (aclAuthority.length() == 0);

                    // Build the processActivity object
                    Map<String,QueuedDocument> previousDocuments = new HashMap<String,QueuedDocument>();
                    
                    String[] documentIDs = new String[activeDocuments.size()];
                    int k = 0;
                    for (QueuedDocument qd : activeDocuments)
                    {
                      previousDocuments.put(qd.getDocumentDescription().getDocumentIdentifierHash(),qd);
                      documentIDs[k++] = qd.getDocumentDescription().getDocumentIdentifier();
                    }
                    
                    ProcessActivity activity = new ProcessActivity(job.getID(),processID,
                      rt,jobManager,ingester,
                      connectionName,pipelineSpecification,
                      previousDocuments,
                      currentTime,
                      job.getExpiration(),
                      job.getInterval(),
                      job.getMaxInterval(),
                      job.getHopcountMode(),
                      connection,connector,connMgr,legalLinkTypes,ingestLogger);
                    try
                    {
                      if (Logging.threads.isDebugEnabled())
                        Logging.threads.debug("Worker thread about to process "+makeListString(activeDocuments));

                      // Now, process in bulk -- catching and handling ServiceInterruptions
                      ServiceInterruption serviceInterruption = null;
                      try
                      {
                        connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
                        
                        // Now do everything that the connector might have done if we were not doing it for it.

                        // Right now, that's just getting rid of untouched components.
                        for (QueuedDocument qd : activeDocuments)
                        {
                          String documentIdentifier = qd.getDocumentDescription().getDocumentIdentifier();
                          if (!activity.wasDocumentAborted(documentIdentifier) && !activity.wasDocumentDeleted(documentIdentifier))
                          {
                            String documentIdentifierHash = qd.getDocumentDescription().getDocumentIdentifierHash();
                            // In order to be able to loop over all the components that the incremental ingester knows about, we need to know
                            // what the FIRST output is.
                            DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineConnections));
                            if (set != null)
                            {
                              Iterator<String> componentHashes = set.componentIterator();
                              while (componentHashes.hasNext())
                              {
                                String componentHash = componentHashes.next();
                                // Check whether we've indexed or not
                                if (!activity.wasDocumentComponentTouched(documentIdentifier,
                                  componentHash))
                                {
                                  // This component must be removed.
                                  ingester.documentRemove(
                                    pipelineConnections,
                                    connectionName,documentIdentifierHash,componentHash,
                                    ingestLogger);
                                }
                              }
                            }
                          }
                        }

                        // Done with connector functionality!
                      }
                      catch (ServiceInterruption e)
                      {
                        serviceInterruption = e;
                        if (!e.jobInactiveAbort())
                          Logging.jobs.warn("Service interruption reported for job "+
                          job.getID()+" connection '"+job.getConnectionName()+"': "+
                          e.getMessage());
                      }

                      // Flush remaining references into the database!
                      activity.flush();

                      if (Logging.threads.isDebugEnabled())
                        Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+" documents");

                      // Either way, handle the documents we were supposed to process.  But if there was a service interruption,
                      // and the disposition of the document was unclear, then the document will need to be requeued instead of handled normally.
                      List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();

                      for (QueuedDocument qd : activeDocuments)
                      {
                        // If this document was aborted, then treat it specially.
                        if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier()))
                        {
                          // Special treatment for aborted documents.
                          // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
                          // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
                          // Add to the finish list, so it gets requeued.  Because the document is already marked as aborted, this should be enough to cause an
                          // unconditional requeue.
                          if (Logging.threads.isDebugEnabled())
                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to finishList");
                          finishList.add(qd);
                        }
                        else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier()))
                        {
                          if (Logging.threads.isDebugEnabled())
                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to deleteList");
                          deleteList.add(qd);
                        }
                        else if (serviceInterruption != null)
                        {

                          // Service interruption has precedence over unchanged, because we might have been interrupted while scanning the document
                          // for references
                          DocumentDescription dd = qd.getDocumentDescription();
                          // Check for hard failure.  But no hard failure possible of it's a job inactive abort.
                          if (!serviceInterruption.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < serviceInterruption.getRetryTime() ||
                            dd.getFailRetryCount() == 0))
                          {
                            // Treat this as a hard failure.
                            if (serviceInterruption.isAbortOnFail())
                            {
                              // Make sure that the job aborts.
                              abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause());
                              if (Logging.threads.isDebugEnabled())
                                Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to rescanList due to service interruption");
                              rescanList.add(qd);
                            }
                            else
                            {
                              // Skip the document, rather than failing.
                              // We want this particular document to be not included in the
                              // reprocessing.  Therefore, we do the same thing as we would
                              // if we got back a null version.
                              if (Logging.threads.isDebugEnabled())
                                Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to deleteList due to service interruption");
                              deleteList.add(qd);
                            }
                          }
                          else
                          {
                            // Not a hard failure.  Requeue.
                            if (Logging.threads.isDebugEnabled())
                              Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList");
                            requeueList.add(qd);
                          }
                        }
                        else
                        {
                          if (Logging.threads.isDebugEnabled())
                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to finishList");
                          finishList.add(qd);
                        }
                        
                        // Note whether the document was untouched; if so, update it
                        if (!activity.wasDocumentTouched(qd.getDocumentDescription().getDocumentIdentifier()))
                        {
                          if (Logging.threads.isDebugEnabled())
                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to ingesterCheckList");
                          ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
                        }
                      }
                      

                      if (serviceInterruption != null)
                      {
                        // Requeue the documents we've identified as needing to be repeated
                        if (Logging.threads.isDebugEnabled())
                          Logging.threads.debug("Requeuing documents "+makeListString(requeueList));
                        requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(),
                          serviceInterruption.getFailRetryCount());
                      }
                      
                      // Note the documents that have been checked but not reingested.  This should happen BEFORE we need
                      // the statistics (which are calculated during the finishlist step below)
                      if (ingesterCheckList.size() > 0)
                      {
                        String[] checkClasses = new String[ingesterCheckList.size()];
                        String[] checkIDs = new String[ingesterCheckList.size()];
                        for (int i = 0; i < checkIDs.length; i++)
                        {
                          checkClasses[i] = connectionName;
                          checkIDs[i] = ingesterCheckList.get(i);
                        }
                        // This method should exercise reasonable intelligence.  If the document has never been indexed, it should detect that
                        // and stop.  Otherwise, it should update the statistics accordingly.
                        ingester.documentCheckMultiple(pipelineConnections,checkClasses,checkIDs,currentTime);
                      }

                      // Process the finish list!
                      if (finishList.size() > 0)
                      {
                        if (Logging.threads.isDebugEnabled())
                          Logging.threads.debug("Finishing documents "+makeListString(finishList));

                        // "Finish" the documents (removing unneeded carrydown info, and compute hopcounts).
                        // This can ONLY be done on fully-completed documents; everything else should be left in a dangling
                        // state (which we know is OK because it will be fixed the next time the document is attempted).
                        String[] documentIDHashes = new String[finishList.size()];
                        k = 0;
                        for (QueuedDocument qd : finishList)
                        {
                          documentIDHashes[k++] = qd.getDocumentDescription().getDocumentIdentifierHash();
                        }
                        DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode());
                        if (Logging.threads.isDebugEnabled())
                          Logging.threads.debug(" Requeueing documents due to carrydown "+makeListString(requeueCandidates));
                        ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,rt,currentTime);
                        
                        // In both job types, we have to go through the finishList to figure out what to do with the documents.
                        // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
                        switch (job.getType())
                        {
                        case IJobDescription.TYPE_CONTINUOUS:
                          {
                            // We need to populate timeArray
                            String[] timeIDClasses = new String[finishList.size()];
                            String[] timeIDHashes = new String[finishList.size()];
                            for (int i = 0; i < timeIDHashes.length; i++)
                            {
                              QueuedDocument qd = (QueuedDocument)finishList.get(i);
                              DocumentDescription dd = qd.getDocumentDescription();
                              String documentIDHash = dd.getDocumentIdentifierHash();
                              timeIDClasses[i] = connectionName;
                              timeIDHashes[i] = documentIDHash;
                            }
                            long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineConnections,timeIDClasses,timeIDHashes);
                            Long[] recheckTimeArray = new Long[timeArray.length];
                            int[] actionArray = new int[timeArray.length];
                            DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
                            for (int i = 0; i < finishList.size(); i++)
                            {
                              QueuedDocument qd = finishList.get(i);
                              recrawlDocs[i] = qd.getDocumentDescription();
                              String documentID = recrawlDocs[i].getDocumentIdentifier();

                              // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
                              boolean wasAborted = activity.wasDocumentAborted(documentID);
                              if (wasAborted)
                              {
                                // Requeue for immediate reprocessing
                                if (Logging.scheduling.isDebugEnabled())
                                  Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");

                                actionArray[i] = IJobManager.ACTION_RESCAN;
                                recheckTimeArray[i] = new Long(0L);     // Must not use null; that means 'never'.
                              }
                              else
                              {
                                // Calculate the next time to run, or time to expire.

                                // For run time, the formula is to calculate the running avg interval between changes,
                                // add an additional interval (which comes from the job description),
                                // and add that to the current time.
                                // One caveat: we really want to calculate the interval from the last
                                // time change was detected, but this is not implemented yet.
                                long timeAmt = timeArray[i];
                                // null value indicates never to schedule

                                Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
                                Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);


                                // Merge the two times together.  We decide on the action based on the action with the lowest time.
                                if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
                                {
                                  if (Logging.scheduling.isDebugEnabled())
                                    Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
                                  recheckTimeArray[i] = recrawlTime;
                                  actionArray[i] = IJobManager.ACTION_RESCAN;
                                }
                                else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
                                {
                                  if (Logging.scheduling.isDebugEnabled())
                                    Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
                                  recheckTimeArray[i] = expireTime;
                                  actionArray[i] = IJobManager.ACTION_REMOVE;
                                }
                                else
                                {
                                  // Default activity if conflict will be rescan
                                  if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
                                    Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
                                  recheckTimeArray[i] = recrawlTime;
                                  actionArray[i] = IJobManager.ACTION_RESCAN;
                                }
                              }
                            }

                            if (Logging.threads.isDebugEnabled())
                              Logging.threads.debug(" Requeuing "+makeListString(recrawlDocs));
                            jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);

                          }
                          break;
                        case IJobDescription.TYPE_SPECIFIED:
                          {
                            // Separate the ones we actually finished from the ones we need to requeue because they were aborted
                            List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
                            List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
                            for (QueuedDocument qd : finishList)
                            {
                              DocumentDescription dd = qd.getDocumentDescription();
                              if (activity.wasDocumentAborted(dd.getDocumentIdentifier()))
                              {
                                // The document was aborted, so put it into the abortedList
                                abortedList.add(dd);
                              }
                              else
                              {
                                // The document was completed.
                                completedList.add(dd);
                              }
                            }

                            // Requeue the ones that must be repeated
                            if (abortedList.size() > 0)
                            {
                              DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
                              Long[] recheckTimeArray = new Long[docDescriptions.length];
                              int[] actionArray = new int[docDescriptions.length];
                              for (int i = 0; i < docDescriptions.length; i++)
                              {
                                docDescriptions[i] = abortedList.get(i);
                                recheckTimeArray[i] = new Long(0L);
                                actionArray[i] = IJobManager.ACTION_RESCAN;
                              }

                              if (Logging.threads.isDebugEnabled())
                                Logging.threads.debug(" Requeuing "+makeListString(docDescriptions));
                              jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
                            }

                            // Mark the ones completed that were actually completed.
                            if (completedList.size() > 0)
                            {
                              DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
                              for (int i = 0; i < docDescriptions.length; i++)
                              {
                                docDescriptions[i] = completedList.get(i);
                              }

                              if (Logging.threads.isDebugEnabled())
                                Logging.threads.debug(" Marking completed "+makeListString(docDescriptions));
                              jobManager.markDocumentCompletedMultiple(docDescriptions);
                            }
                          }
                          break;
                        default:
                          throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
                        }

                        // Finally, if we're still alive, mark everything we finished as "processed".
                        for (QueuedDocument qd : finishList)
                        {
                          qd.setProcessed();
                        }
                      }
                    }
                    finally
                    {
                      // Make sure we don't leave any dangling carrydown files
                      activity.discard();
                    }
                    
                    // Successful processing of the set
                    // We count 'get version' time in the average, so even if we decide not to process a doc
                    // it still counts.
                    long elapsedTime = System.currentTimeMillis() - processingStartTime;
                    if (Logging.scheduling.isDebugEnabled())
                      Logging.scheduling.debug("Worker thread for connection "+connectionName+" took "+new Long(elapsedTime).toString()+"ms to handle "+Integer.toString(qds.getCount())+" documents");
                    queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,elapsedTime);

                  }
                  
                  // Now, handle the delete list
                  if (Logging.threads.isDebugEnabled())
                    Logging.threads.debug("Deleting "+makeListString(deleteList));
                  processDeleteLists(pipelineConnections,connector,connection,jobManager,
                    deleteList,ingester,
                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);

                  // Handle hopcount removal
                  if (Logging.threads.isDebugEnabled())
                    Logging.threads.debug("Hopcount removal "+makeListString(hopcountremoveList));
                  processHopcountRemovalLists(pipelineConnections,connector,connection,jobManager,
                    hopcountremoveList,ingester,
                    job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);

                }
                finally
                {
                  repositoryConnectorPool.release(connection,connector);
                }
              
              }
              
              // Handle rescanning
              if (Logging.threads.isDebugEnabled())
                Logging.threads.debug("Rescanning documents "+makeListString(rescanList));
              for (QueuedDocument qd : rescanList)
              {
                jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1);
                qd.setProcessed();
              }
                
            }
            finally
            {
              // Note termination of processing of these documents in the overlap calculator
              qds.endProcessing(queueTracker);
            }
            
            if (abortOnFail != null)
              throw abortOnFail;
            
          }
          catch (ManifoldCFException e)
          {
            if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
              break;

            if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
              throw e;

            if (jobManager.errorAbort(qds.getJobDescription().getID(),e.getMessage()))
            {
              // We eat the exception if there was already one recorded.

              // An exception occurred in the processing of a set of documents.
              // Shut the corresponding job down, with an appropriate error
              Logging.threads.error("Exception tossed: "+e.getMessage(),e);
            }
          }
          finally
          {
            // Go through qds and requeue any that aren't closed out in one way or another.  This allows the job
            // to be aborted; no dangling entries are left around.
            for (int i = 0; i < qds.getCount(); i++)
            {
              QueuedDocument qd = qds.getDocument(i);
              if (!qd.wasProcessed())
              {
                jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1);
              }
            }
          }
        }
        catch (ManifoldCFException e)
        {
          if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
            break;

          if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
          {
            // Note the failure, which will cause a reset to occur
            resetManager.noteEvent();

            Logging.threads.error("Worker thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
            try
            {
              // Give the database a chance to catch up/wake up
              ManifoldCF.sleep(10000L);
            }
            catch (InterruptedException se)
            {
              break;
            }
            continue;
          }

          // An exception occurred in the cleanup from another error.
          // Log the error (but that's all we can do)
          Logging.threads.error("Exception tossed: "+e.getMessage(),e);

        }
        catch (InterruptedException e)
        {
          // We're supposed to quit
          break;
        }
        catch (OutOfMemoryError e)
        {
          System.err.println(sdf.format(new Date()) +  " agents process ran out of memory - shutting down");
          e.printStackTrace(System.err);
          ManifoldCF.systemExit(-200);
        }
        catch (Throwable e)
        {
          // A more severe error - but stay alive
          Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
        }
      }
    }
    catch (Throwable e)
    {
      // Severe error on initialization
      System.err.println("agents process could not start - shutting down");
      Logging.threads.fatal("WorkerThread "+id+" initialization error tossed: "+e.getMessage(),e);
      ManifoldCF.systemExit(-300);
    }

  }