public void run()

in framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java [52:490]


  public void run()
  {
    resetManager.registerMe();

    try
    {
      // Create a thread context object.
      IThreadContext threadContext = ThreadContextFactory.make();
      IJobManager jobManager = JobManagerFactory.make(threadContext);
      IOutputConnectionManager connectionManager = OutputConnectionManagerFactory.make(threadContext);
      IRepositoryConnectionManager repositoryConnectionManager = RepositoryConnectionManagerFactory.make(threadContext);

      IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
      
      // Loop
      while (true)
      {
        // Do another try/catch around everything in the loop
        try
        {
          // Before we begin, conditionally reset
          resetManager.waitForReset(threadContext);

          // Find the jobs ready for inactivity and notify them
          JobNotifyRecord[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity(processID);
          try
          {
            Set<OutputAndRepositoryConnection> connectionNames = new HashSet<OutputAndRepositoryConnection>();
            
            for (JobNotifyRecord jsr : jobsNeedingNotification)
            {
              Long jobID = jsr.getJobID();
              IJobDescription job = jobManager.load(jobID,true);
              if (job != null)
              {
                // Get the connection name
                String repositoryConnectionName = job.getConnectionName();
                IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
                for (int i = 0; i < basicSpec.getOutputCount(); i++)
                {
                  String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
                  OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
                  connectionNames.add(c);
                }
              }
            }
            
            // Attempt to notify the specified connections
            Map<OutputAndRepositoryConnection,Disposition> notifiedConnections = new HashMap<OutputAndRepositoryConnection,Disposition>();
            
            for (OutputAndRepositoryConnection connections : connectionNames)
            {
              String outputConnectionName = connections.getOutputConnectionName();
              String repositoryConnectionName = connections.getRepositoryConnectionName();
              
              OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
              
              IOutputConnection connection = connectionManager.load(outputConnectionName);
              if (connection != null)
              {
                // Grab an appropriate connection instance
                IOutputConnector connector = outputConnectorPool.grab(connection);
                if (connector != null)
                {
                  try
                  {
                    // Do the notification itself
                    try
                    {
                      connector.noteJobComplete(activity);
                      notifiedConnections.put(connections,new Disposition());
                    }
                    catch (ServiceInterruption e)
                    {
                      notifiedConnections.put(connections,new Disposition(e));
                    }
                    catch (ManifoldCFException e)
                    {
                      if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
                        throw e;
                      if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
                        throw e;
                      if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
                        throw e;
                      // Nothing special; report the error and keep going.
                      Logging.threads.error(e.getMessage(),e);
                    }
                  }
                  finally
                  {
                    outputConnectorPool.release(connection,connector);
                  }
                }
              }
            }
            
            // Go through jobs again, and put the notified ones into the inactive state.
            for (JobNotifyRecord jsr : jobsNeedingNotification)
            {
              Long jobID = jsr.getJobID();
              IJobDescription job = jobManager.load(jobID,true);
              if (job != null)
              {
                // Get the connection name
                String repositoryConnectionName = job.getConnectionName();
                IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
                boolean allOK = true;
                for (int i = 0; i < basicSpec.getOutputCount(); i++)
                {
                  String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));

                  OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
                  
                  Disposition d = notifiedConnections.get(c);
                  if (d != null)
                  {
                    ServiceInterruption e = d.getServiceInterruption();
                    if (e == null)
                    {
                      break;
                    }
                    else
                    {
                      if (!e.jobInactiveAbort())
                      {
                        Logging.jobs.warn("Notification service interruption reported for job "+
                          jobID+" output connection '"+outputConnectionName+"': "+
                          e.getMessage(),e);
                      }

                      // If either we are going to be requeuing beyond the fail time, OR
                      // the number of retries available has hit 0, THEN we treat this
                      // as either an "ignore" or a hard error.
                      if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L && jsr.getFailTime() < e.getRetryTime() ||
                        jsr.getFailRetryCount() == 0))
                      {
                        // Treat this as a hard failure.
                        if (e.isAbortOnFail())
                        {
                          // Note the error in the job, and transition to inactive state
                          String message = e.jobInactiveAbort()?"":"Repeated service interruptions during notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
                          if (jobManager.errorAbort(jobID,message) && message.length() > 0)
                            Logging.jobs.error(message,e.getCause());
                          jsr.noteStarted();
                        }
                        else
                        {
                          // Not sure this can happen -- but just transition silently to inactive state
                          jobManager.inactivateJob(jobID);
                          jsr.noteStarted();
                        }
                      }
                      else
                      {
                        // Reset the job to the READYFORNOTIFY state, updating the failtime and failcount fields
                        jobManager.retryNotification(jsr,e.getFailTime(),e.getFailRetryCount());
                        jsr.noteStarted();
                      }
                      allOK = false;
                      break;
                    }
                  }
                }
                if (allOK)
                {
                  jobManager.inactivateJob(jobID);
                  jsr.noteStarted();
                }

              }
            }
          }
          finally
          {
            // Clean up all jobs that did not start
            ManifoldCFException exception = null;
            int i = 0;
            while (i < jobsNeedingNotification.length)
            {
              JobNotifyRecord jsr = jobsNeedingNotification[i++];
              if (!jsr.wasStarted())
              {
                // Clean up from failed start.
                try
                {
                  jobManager.resetNotifyJob(jsr.getJobID());
                }
                catch (ManifoldCFException e)
                {
                  if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
                    throw e;
                  exception = e;
                }
              }
            }
            if (exception != null)
              throw exception;
          }

          // We also need to do a notify for jobs that are about to be deleted
          JobNotifyRecord[] jobsNeedingDeleteNotification = jobManager.getJobsReadyForDelete(processID);
          try
          {
            Set<OutputAndRepositoryConnection> connectionNames = new HashSet<OutputAndRepositoryConnection>();
            
            for (JobNotifyRecord jsr : jobsNeedingDeleteNotification)
            {
              Long jobID = jsr.getJobID();
              IJobDescription job = jobManager.load(jobID,true);
              if (job != null)
              {
                // Get the connection name
                String repositoryConnectionName = job.getConnectionName();
                IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
                for (int i = 0; i < basicSpec.getOutputCount(); i++)
                {
                  String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
                  OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
                  connectionNames.add(c);
                }
              }
            }
            
            // Attempt to notify the specified connections
            Map<OutputAndRepositoryConnection,Disposition> notifiedConnections = new HashMap<OutputAndRepositoryConnection,Disposition>();
            
            for (OutputAndRepositoryConnection connections : connectionNames)
            {
              String outputConnectionName = connections.getOutputConnectionName();
              String repositoryConnectionName = connections.getRepositoryConnectionName();
              
              OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
              
              IOutputConnection connection = connectionManager.load(outputConnectionName);
              if (connection != null)
              {
                // Grab an appropriate connection instance
                IOutputConnector connector = outputConnectorPool.grab(connection);
                if (connector != null)
                {
                  try
                  {
                    // Do the notification itself
                    try
                    {
                      connector.noteJobComplete(activity);
                      notifiedConnections.put(connections,new Disposition());
                    }
                    catch (ServiceInterruption e)
                    {
                      notifiedConnections.put(connections,new Disposition(e));
                    }
                    catch (ManifoldCFException e)
                    {
                      if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
                        throw e;
                      if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
                        throw e;
                      if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
                        throw e;
                      // Nothing special; report the error and keep going.
                      Logging.threads.error(e.getMessage(),e);
                    }
                  }
                  finally
                  {
                    outputConnectorPool.release(connection,connector);
                  }
                }
              }
            }
            
            // Go through jobs again, and put the notified ones into the inactive state.
            for (JobNotifyRecord jsr : jobsNeedingDeleteNotification)
            {
              Long jobID = jsr.getJobID();
              IJobDescription job = jobManager.load(jobID,true);
              if (job != null)
              {
                // Get the connection name
                String repositoryConnectionName = job.getConnectionName();
                IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
                boolean allOK = true;
                for (int i = 0; i < basicSpec.getOutputCount(); i++)
                {
                  String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));

                  OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
                  
                  Disposition d = notifiedConnections.get(c);
                  if (d != null)
                  {
                    ServiceInterruption e = d.getServiceInterruption();
                    if (e == null)
                    {
                      break;
                    }
                    else
                    {
                      if (!e.jobInactiveAbort())
                      {
                        Logging.jobs.warn("Delete notification service interruption reported for job "+
                          jobID+" output connection '"+outputConnectionName+"': "+
                          e.getMessage(),e);
                      }

                      // If either we are going to be requeuing beyond the fail time, OR
                      // the number of retries available has hit 0, THEN we treat this
                      // as either an "ignore" or a hard error.
                      ///System.out.println("jsr.getFailTime()="+jsr.getFailTime()+"; e.getRetryTime()="+e.getRetryTime()+"; jsr.getFailRetryCount()="+jsr.getFailRetryCount());
                      if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L && jsr.getFailTime() < e.getRetryTime() ||
                        jsr.getFailRetryCount() == 0))
                      {
                        // Treat this as a hard failure.
                        if (e.isAbortOnFail())
                        {
                          // Note the error in the job, and transition to inactive state
                          String message = e.jobInactiveAbort()?"":"Repeated service interruptions during delete notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
                          if (message.length() > 0)
                            Logging.jobs.error(message,e.getCause());
                          // Can't abort a delete!!
                          jobManager.removeJob(jobID);
                          jsr.noteStarted();
                        }
                        else
                        {
                          // Not sure this can happen -- but just transition silently to inactive state
                          jobManager.removeJob(jobID);
                          jsr.noteStarted();
                        }
                      }
                      else
                      {
                        // Reset the job to the READYFORDELETENOTIFY state, updating the failtime and failcount fields
                        //System.out.println("Retrying... e.getFailTime()="+e.getFailTime()+"; e.getFailRetryCount()="+e.getFailRetryCount());
                        jobManager.retryDeleteNotification(jsr,e.getFailTime(),e.getFailRetryCount());
                        jsr.noteStarted();
                      }
                      allOK = false;
                      break;
                    }
                  }
                }
                if (allOK)
                {
                  jobManager.removeJob(jobID);
                  jsr.noteStarted();
                }

              }
            }
          }
          finally
          {
            // Clean up all jobs that did not start
            ManifoldCFException exception = null;
            int i = 0;
            while (i < jobsNeedingDeleteNotification.length)
            {
              JobNotifyRecord jsr = jobsNeedingDeleteNotification[i++];
              if (!jsr.wasStarted())
              {
                // Clean up from failed start.
                try
                {
                  jobManager.resetDeleteNotifyJob(jsr.getJobID());
                }
                catch (ManifoldCFException e)
                {
                  if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
                    throw e;
                  exception = e;
                }
              }
            }
            if (exception != null)
              throw exception;
          }

          ManifoldCF.sleep(10000L);
        }
        catch (ManifoldCFException e)
        {
          if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
            break;

          if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
          {
            resetManager.noteEvent();
            
            Logging.threads.error("Job notification thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
            try
            {
              // Give the database a chance to catch up/wake up
              ManifoldCF.sleep(10000L);
            }
            catch (InterruptedException se)
            {
              break;
            }
            continue;
          }

          // Log it, but keep the thread alive
          Logging.threads.error("Exception tossed: "+e.getMessage(),e);

          if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
          {
            // Shut the whole system down!
            ManifoldCF.systemExit(1);
          }

        }
        catch (InterruptedException e)
        {
          // We're supposed to quit
          break;
        }
        catch (OutOfMemoryError e)
        {
          System.err.println("agents process ran out of memory - shutting down");
          e.printStackTrace(System.err);
          ManifoldCF.systemExit(-200);
        }
        catch (Throwable e)
        {
          // A more severe error - but stay alive
          Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
        }
      }
    }
    catch (Throwable e)
    {
      // Severe error on initialization
      System.err.println("agents process could not start - shutting down");
      Logging.threads.fatal("JobNotificationThread initialization error tossed: "+e.getMessage(),e);
      ManifoldCF.systemExit(-300);
    }
  }