in framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/JobNotificationThread.java [52:490]
public void run()
{
resetManager.registerMe();
try
{
// Create a thread context object.
IThreadContext threadContext = ThreadContextFactory.make();
IJobManager jobManager = JobManagerFactory.make(threadContext);
IOutputConnectionManager connectionManager = OutputConnectionManagerFactory.make(threadContext);
IRepositoryConnectionManager repositoryConnectionManager = RepositoryConnectionManagerFactory.make(threadContext);
IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
// Loop
while (true)
{
// Do another try/catch around everything in the loop
try
{
// Before we begin, conditionally reset
resetManager.waitForReset(threadContext);
// Find the jobs ready for inactivity and notify them
JobNotifyRecord[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity(processID);
try
{
Set<OutputAndRepositoryConnection> connectionNames = new HashSet<OutputAndRepositoryConnection>();
for (JobNotifyRecord jsr : jobsNeedingNotification)
{
Long jobID = jsr.getJobID();
IJobDescription job = jobManager.load(jobID,true);
if (job != null)
{
// Get the connection name
String repositoryConnectionName = job.getConnectionName();
IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
for (int i = 0; i < basicSpec.getOutputCount(); i++)
{
String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
connectionNames.add(c);
}
}
}
// Attempt to notify the specified connections
Map<OutputAndRepositoryConnection,Disposition> notifiedConnections = new HashMap<OutputAndRepositoryConnection,Disposition>();
for (OutputAndRepositoryConnection connections : connectionNames)
{
String outputConnectionName = connections.getOutputConnectionName();
String repositoryConnectionName = connections.getRepositoryConnectionName();
OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
IOutputConnection connection = connectionManager.load(outputConnectionName);
if (connection != null)
{
// Grab an appropriate connection instance
IOutputConnector connector = outputConnectorPool.grab(connection);
if (connector != null)
{
try
{
// Do the notification itself
try
{
connector.noteJobComplete(activity);
notifiedConnections.put(connections,new Disposition());
}
catch (ServiceInterruption e)
{
notifiedConnections.put(connections,new Disposition(e));
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
throw e;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
throw e;
if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
throw e;
// Nothing special; report the error and keep going.
Logging.threads.error(e.getMessage(),e);
}
}
finally
{
outputConnectorPool.release(connection,connector);
}
}
}
}
// Go through jobs again, and put the notified ones into the inactive state.
for (JobNotifyRecord jsr : jobsNeedingNotification)
{
Long jobID = jsr.getJobID();
IJobDescription job = jobManager.load(jobID,true);
if (job != null)
{
// Get the connection name
String repositoryConnectionName = job.getConnectionName();
IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
boolean allOK = true;
for (int i = 0; i < basicSpec.getOutputCount(); i++)
{
String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
Disposition d = notifiedConnections.get(c);
if (d != null)
{
ServiceInterruption e = d.getServiceInterruption();
if (e == null)
{
break;
}
else
{
if (!e.jobInactiveAbort())
{
Logging.jobs.warn("Notification service interruption reported for job "+
jobID+" output connection '"+outputConnectionName+"': "+
e.getMessage(),e);
}
// If either we are going to be requeuing beyond the fail time, OR
// the number of retries available has hit 0, THEN we treat this
// as either an "ignore" or a hard error.
if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L && jsr.getFailTime() < e.getRetryTime() ||
jsr.getFailRetryCount() == 0))
{
// Treat this as a hard failure.
if (e.isAbortOnFail())
{
// Note the error in the job, and transition to inactive state
String message = e.jobInactiveAbort()?"":"Repeated service interruptions during notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
if (jobManager.errorAbort(jobID,message) && message.length() > 0)
Logging.jobs.error(message,e.getCause());
jsr.noteStarted();
}
else
{
// Not sure this can happen -- but just transition silently to inactive state
jobManager.inactivateJob(jobID);
jsr.noteStarted();
}
}
else
{
// Reset the job to the READYFORNOTIFY state, updating the failtime and failcount fields
jobManager.retryNotification(jsr,e.getFailTime(),e.getFailRetryCount());
jsr.noteStarted();
}
allOK = false;
break;
}
}
}
if (allOK)
{
jobManager.inactivateJob(jobID);
jsr.noteStarted();
}
}
}
}
finally
{
// Clean up all jobs that did not start
ManifoldCFException exception = null;
int i = 0;
while (i < jobsNeedingNotification.length)
{
JobNotifyRecord jsr = jobsNeedingNotification[i++];
if (!jsr.wasStarted())
{
// Clean up from failed start.
try
{
jobManager.resetNotifyJob(jsr.getJobID());
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
throw e;
exception = e;
}
}
}
if (exception != null)
throw exception;
}
// We also need to do a notify for jobs that are about to be deleted
JobNotifyRecord[] jobsNeedingDeleteNotification = jobManager.getJobsReadyForDelete(processID);
try
{
Set<OutputAndRepositoryConnection> connectionNames = new HashSet<OutputAndRepositoryConnection>();
for (JobNotifyRecord jsr : jobsNeedingDeleteNotification)
{
Long jobID = jsr.getJobID();
IJobDescription job = jobManager.load(jobID,true);
if (job != null)
{
// Get the connection name
String repositoryConnectionName = job.getConnectionName();
IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
for (int i = 0; i < basicSpec.getOutputCount(); i++)
{
String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
connectionNames.add(c);
}
}
}
// Attempt to notify the specified connections
Map<OutputAndRepositoryConnection,Disposition> notifiedConnections = new HashMap<OutputAndRepositoryConnection,Disposition>();
for (OutputAndRepositoryConnection connections : connectionNames)
{
String outputConnectionName = connections.getOutputConnectionName();
String repositoryConnectionName = connections.getRepositoryConnectionName();
OutputNotifyActivity activity = new OutputNotifyActivity(repositoryConnectionName,repositoryConnectionManager,outputConnectionName);
IOutputConnection connection = connectionManager.load(outputConnectionName);
if (connection != null)
{
// Grab an appropriate connection instance
IOutputConnector connector = outputConnectorPool.grab(connection);
if (connector != null)
{
try
{
// Do the notification itself
try
{
connector.noteJobComplete(activity);
notifiedConnections.put(connections,new Disposition());
}
catch (ServiceInterruption e)
{
notifiedConnections.put(connections,new Disposition(e));
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
throw e;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
throw e;
if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
throw e;
// Nothing special; report the error and keep going.
Logging.threads.error(e.getMessage(),e);
}
}
finally
{
outputConnectorPool.release(connection,connector);
}
}
}
}
// Go through jobs again, and put the notified ones into the inactive state.
for (JobNotifyRecord jsr : jobsNeedingDeleteNotification)
{
Long jobID = jsr.getJobID();
IJobDescription job = jobManager.load(jobID,true);
if (job != null)
{
// Get the connection name
String repositoryConnectionName = job.getConnectionName();
IPipelineSpecificationBasic basicSpec = new PipelineSpecificationBasic(job);
boolean allOK = true;
for (int i = 0; i < basicSpec.getOutputCount(); i++)
{
String outputConnectionName = basicSpec.getStageConnectionName(basicSpec.getOutputStage(i));
OutputAndRepositoryConnection c = new OutputAndRepositoryConnection(outputConnectionName, repositoryConnectionName);
Disposition d = notifiedConnections.get(c);
if (d != null)
{
ServiceInterruption e = d.getServiceInterruption();
if (e == null)
{
break;
}
else
{
if (!e.jobInactiveAbort())
{
Logging.jobs.warn("Delete notification service interruption reported for job "+
jobID+" output connection '"+outputConnectionName+"': "+
e.getMessage(),e);
}
// If either we are going to be requeuing beyond the fail time, OR
// the number of retries available has hit 0, THEN we treat this
// as either an "ignore" or a hard error.
///System.out.println("jsr.getFailTime()="+jsr.getFailTime()+"; e.getRetryTime()="+e.getRetryTime()+"; jsr.getFailRetryCount()="+jsr.getFailRetryCount());
if (!e.jobInactiveAbort() && (jsr.getFailTime() != -1L && jsr.getFailTime() < e.getRetryTime() ||
jsr.getFailRetryCount() == 0))
{
// Treat this as a hard failure.
if (e.isAbortOnFail())
{
// Note the error in the job, and transition to inactive state
String message = e.jobInactiveAbort()?"":"Repeated service interruptions during delete notification"+((e.getCause()!=null)?": "+e.getCause().getMessage():"");
if (message.length() > 0)
Logging.jobs.error(message,e.getCause());
// Can't abort a delete!!
jobManager.removeJob(jobID);
jsr.noteStarted();
}
else
{
// Not sure this can happen -- but just transition silently to inactive state
jobManager.removeJob(jobID);
jsr.noteStarted();
}
}
else
{
// Reset the job to the READYFORDELETENOTIFY state, updating the failtime and failcount fields
//System.out.println("Retrying... e.getFailTime()="+e.getFailTime()+"; e.getFailRetryCount()="+e.getFailRetryCount());
jobManager.retryDeleteNotification(jsr,e.getFailTime(),e.getFailRetryCount());
jsr.noteStarted();
}
allOK = false;
break;
}
}
}
if (allOK)
{
jobManager.removeJob(jobID);
jsr.noteStarted();
}
}
}
}
finally
{
// Clean up all jobs that did not start
ManifoldCFException exception = null;
int i = 0;
while (i < jobsNeedingDeleteNotification.length)
{
JobNotifyRecord jsr = jobsNeedingDeleteNotification[i++];
if (!jsr.wasStarted())
{
// Clean up from failed start.
try
{
jobManager.resetDeleteNotifyJob(jsr.getJobID());
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
throw e;
exception = e;
}
}
}
if (exception != null)
throw exception;
}
ManifoldCF.sleep(10000L);
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
break;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
{
resetManager.noteEvent();
Logging.threads.error("Job notification thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
try
{
// Give the database a chance to catch up/wake up
ManifoldCF.sleep(10000L);
}
catch (InterruptedException se)
{
break;
}
continue;
}
// Log it, but keep the thread alive
Logging.threads.error("Exception tossed: "+e.getMessage(),e);
if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
{
// Shut the whole system down!
ManifoldCF.systemExit(1);
}
}
catch (InterruptedException e)
{
// We're supposed to quit
break;
}
catch (OutOfMemoryError e)
{
System.err.println("agents process ran out of memory - shutting down");
e.printStackTrace(System.err);
ManifoldCF.systemExit(-200);
}
catch (Throwable e)
{
// A more severe error - but stay alive
Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
}
}
}
catch (Throwable e)
{
// Severe error on initialization
System.err.println("agents process could not start - shutting down");
Logging.threads.fatal("JobNotificationThread initialization error tossed: "+e.getMessage(),e);
ManifoldCF.systemExit(-300);
}
}