in framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java [69:871]
public void run()
{
// Register this thread in the worker reset manager
resetManager.registerMe();
try
{
// Create a thread context object.
IThreadContext threadContext = ThreadContextFactory.make();
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
IBinManager binManager = BinManagerFactory.make(threadContext);
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
ITransformationConnectionManager transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
IReprioritizationTracker rt = ReprioritizationTrackerFactory.make(threadContext);
IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
// This is the set of documents that we will either be marking as complete, or requeued, depending on the kind of crawl.
List<QueuedDocument> finishList = new ArrayList<QueuedDocument>();
// This is where we accumulate the document QueuedDocuments to be deleted from the job queue.
List<QueuedDocument> deleteList = new ArrayList<QueuedDocument>();
// This is where we accumulate documents that need to be placed in the HOPCOUNTREMOVED
// state
List<QueuedDocument> hopcountremoveList = new ArrayList<QueuedDocument>();
// This is where we accumulate documents that need to be rescanned
List<QueuedDocument> rescanList = new ArrayList<QueuedDocument>();
// This is where we store document ID strings of documents that need to be noted as having
// been checked.
List<String> ingesterCheckList = new ArrayList<String>();
// Service interruption thrown with "abort on fail".
ManifoldCFException abortOnFail = null;
// Loop
while (true)
{
// Do another try/catch around everything in the loop
try
{
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
// Before we begin, conditionally reset
resetManager.waitForReset(threadContext);
// Once we pull something off the queue, we MUST make sure that
// we update its status, even if there is an exception!!!
// See if there is anything on the queue for me
QueuedDocumentSet qds = documentQueue.getDocument(queueTracker);
if (qds == null)
// It's a reset, so recycle
continue;
try
{
// System.out.println("Got a document set");
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
// First of all: find out if the job for these documents has been aborted, paused, etc.
// If so, we requeue the documents immediately.
IJobDescription job = qds.getJobDescription();
Long jobID = job.getID();
if (jobManager.checkJobActive(jobID) == false)
// Recycle; let these documents be requeued and go get the next set.
continue;
if (Logging.threads.isDebugEnabled())
{
StringBuilder sb = new StringBuilder();
for (int z = 0; z < qds.getCount(); z++)
{
sb.append(qds.getDocument(z).getDocumentDescription().getID()).append(" ");
}
Logging.threads.debug("Worker thread processing documents: "+sb);
}
// Build a basic pipeline specification right off; we need it whenever
// we interact with Incremental Ingester.
IPipelineConnections pipelineConnections = new PipelineConnections(new PipelineSpecificationBasic(job),transformationConnectionManager,outputConnectionManager);
String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineConnections);
// Universal job data we'll need later
String connectionName = job.getConnectionName();
Specification spec = job.getSpecification();
int jobType = job.getType();
IRepositoryConnection connection = qds.getConnection();
OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr);
// The flow through this section of the code is as follows.
// (1) We start with a list of documents
// (2) We attempt to do various things to these documents
// (3) Based on what happens, and what errors we get, we progressively move documents out of the main list
// and into secondary lists that will be all treated in the same way
// First, initialize the active document set to contain everything.
List<QueuedDocument> activeDocuments = new ArrayList<QueuedDocument>(qds.getCount());
for (int i = 0; i < qds.getCount(); i++)
{
QueuedDocument qd = qds.getDocument(i);
activeDocuments.add(qd);
}
// Clear out all of our disposition lists
finishList.clear();
deleteList.clear();
ingesterCheckList.clear();
hopcountremoveList.clear();
rescanList.clear(); // jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1);
abortOnFail = null;
// Keep track of the starting processing time, for statistics calculation
long processingStartTime = System.currentTimeMillis();
// Log these documents in the overlap calculator
qds.beginProcessing(queueTracker);
try
{
long currentTime = System.currentTimeMillis();
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Worker thread starting document count is "+Integer.toString(activeDocuments.size()));
// Get the legal link types. This is needed for later hopcount checking.
String[] legalLinkTypes = null;
if (activeDocuments.size() > 0)
{
legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
// If this came back null, it means that there is no underlying implementation available, so treat this like a kind of service interruption.
if (legalLinkTypes == null)
{
// Failure here puts all remaining documents into rescan list
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Moving "+makeListString(activeDocuments)+" to rescanList");
moveList(activeDocuments,rescanList);
}
}
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Post-relationship document count is "+Integer.toString(activeDocuments.size()));
// Do the hopcount checks, if any. This will iteratively reduce the viable list of
// document identifiers in need of having their versions fetched.
if (legalLinkTypes != null && activeDocuments.size() > 0)
{
// Set up the current ID array
String[] currentDocIDHashArray = new String[activeDocuments.size()];
for (int i = 0; i < currentDocIDHashArray.length; i++)
{
currentDocIDHashArray[i] = activeDocuments.get(i).getDocumentDescription().getDocumentIdentifierHash();
}
Map filterMap = job.getHopCountFilters();
Iterator filterIter = filterMap.keySet().iterator();
// Array to accumulate hopcount results for all link types
boolean[] overallResults = new boolean[currentDocIDHashArray.length];
for (int i = 0; i < overallResults.length; i++)
{
overallResults[i] = true;
}
// Calculate the hopcount result for each link type, and fold it in.
while (filterIter.hasNext())
{
String linkType = (String)filterIter.next();
int maxHop = (int)((Long)filterMap.get(linkType)).longValue();
boolean[] results = jobManager.findHopCounts(job.getID(),legalLinkTypes,currentDocIDHashArray,linkType,
maxHop,job.getHopcountMode());
for (int i = 0; i < results.length; i++)
{
overallResults[i] = overallResults[i] && results[i];
}
}
// Move all documents to the appropriate list
List<QueuedDocument> newActiveSet = new ArrayList<QueuedDocument>(activeDocuments.size());
for (int i = 0; i < overallResults.length; i++)
{
if (overallResults[i] == false)
{
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+activeDocuments.get(i).getDocumentDescription().getID()+" to hopcountremovelist");
hopcountremoveList.add(activeDocuments.get(i));
}
else
{
newActiveSet.add(activeDocuments.get(i));
}
}
activeDocuments = newActiveSet;
}
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Post-hopcount pruned document count is "+Integer.toString(activeDocuments.size()));
// From here on down we need a connector instance, so get one.
IRepositoryConnector connector = null;
if (activeDocuments.size() > 0 || hopcountremoveList.size() > 0)
{
connector = repositoryConnectorPool.grab(connection);
// If we wind up with a null here, it means that a document got queued for a connector which is now gone.
// Basically, what we want to do in that case is to treat this kind of like a service interruption - the document
// must be requeued for immediate reprocessing. When the rest of the world figures out that the job that owns this
// document is in fact unable to function, we'll stop getting such documents handed to us, because the state of the
// job will be changed.
if (connector == null)
{
// Failure here puts all remaining documents into rescan list
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Moving "+makeListString(activeDocuments)+" to rescanList");
moveList(activeDocuments,rescanList);
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Moving "+makeListString(hopcountremoveList)+" to rescanList");
moveList(hopcountremoveList,rescanList);
}
}
if (connector != null)
{
// Open try/finally block to free the connector instance no matter what
try
{
// Check for interruption before we start fetching
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
// We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
// We put this in a map so it can be looked up by document identifier.
// Create a full PipelineSpecification, including description strings. (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
IPipelineSpecification pipelineSpecification;
try
{
pipelineSpecification = new PipelineSpecification(pipelineConnections,job,ingester);
}
catch (ServiceInterruption e)
{
// Handle service interruption from pipeline
if (!e.jobInactiveAbort())
Logging.jobs.warn("Service interruption reported for job "+
job.getID()+" connection '"+job.getConnectionName()+"': "+
e.getMessage());
// All documents get requeued, because we never got far enough to make distinctions. All we have to decide
// is whether to requeue or abort.
List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
for (QueuedDocument qd : activeDocuments)
{
DocumentDescription dd = qd.getDocumentDescription();
// Check for hard failure. But no hard failure possible of it's a job inactive abort.
if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
dd.getFailRetryCount() == 0))
{
// Treat this as a hard failure.
if (e.isAbortOnFail())
{
rescanList.add(qd);
abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
}
else
{
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList because of output service interruption");
requeueList.add(qd);
}
}
else
{
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList because of output service interruption");
requeueList.add(qd);
}
}
requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
e.getFailRetryCount());
Logging.threads.debug(" Clearing active documents list due to output service interruption");
activeDocuments.clear();
pipelineSpecification = null;
}
if (activeDocuments.size() > 0)
{
// **** New worker thread code starts here!!! ****
IExistingVersions existingVersions = new ExistingVersions(lastIndexedOutputConnectionName,activeDocuments);
String aclAuthority = connection.getACLAuthority();
if (aclAuthority == null)
aclAuthority = "";
boolean isDefaultAuthority = (aclAuthority.length() == 0);
// Build the processActivity object
Map<String,QueuedDocument> previousDocuments = new HashMap<String,QueuedDocument>();
String[] documentIDs = new String[activeDocuments.size()];
int k = 0;
for (QueuedDocument qd : activeDocuments)
{
previousDocuments.put(qd.getDocumentDescription().getDocumentIdentifierHash(),qd);
documentIDs[k++] = qd.getDocumentDescription().getDocumentIdentifier();
}
ProcessActivity activity = new ProcessActivity(job.getID(),processID,
rt,jobManager,ingester,
connectionName,pipelineSpecification,
previousDocuments,
currentTime,
job.getExpiration(),
job.getInterval(),
job.getMaxInterval(),
job.getHopcountMode(),
connection,connector,connMgr,legalLinkTypes,ingestLogger);
try
{
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Worker thread about to process "+makeListString(activeDocuments));
// Now, process in bulk -- catching and handling ServiceInterruptions
ServiceInterruption serviceInterruption = null;
try
{
connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
// Now do everything that the connector might have done if we were not doing it for it.
// Right now, that's just getting rid of untouched components.
for (QueuedDocument qd : activeDocuments)
{
String documentIdentifier = qd.getDocumentDescription().getDocumentIdentifier();
if (!activity.wasDocumentAborted(documentIdentifier) && !activity.wasDocumentDeleted(documentIdentifier))
{
String documentIdentifierHash = qd.getDocumentDescription().getDocumentIdentifierHash();
// In order to be able to loop over all the components that the incremental ingester knows about, we need to know
// what the FIRST output is.
DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineConnections));
if (set != null)
{
Iterator<String> componentHashes = set.componentIterator();
while (componentHashes.hasNext())
{
String componentHash = componentHashes.next();
// Check whether we've indexed or not
if (!activity.wasDocumentComponentTouched(documentIdentifier,
componentHash))
{
// This component must be removed.
ingester.documentRemove(
pipelineConnections,
connectionName,documentIdentifierHash,componentHash,
ingestLogger);
}
}
}
}
}
// Done with connector functionality!
}
catch (ServiceInterruption e)
{
serviceInterruption = e;
if (!e.jobInactiveAbort())
Logging.jobs.warn("Service interruption reported for job "+
job.getID()+" connection '"+job.getConnectionName()+"': "+
e.getMessage());
}
// Flush remaining references into the database!
activity.flush();
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+" documents");
// Either way, handle the documents we were supposed to process. But if there was a service interruption,
// and the disposition of the document was unclear, then the document will need to be requeued instead of handled normally.
List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
for (QueuedDocument qd : activeDocuments)
{
// If this document was aborted, then treat it specially.
if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier()))
{
// Special treatment for aborted documents.
// We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
// We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
// Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an
// unconditional requeue.
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to finishList");
finishList.add(qd);
}
else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier()))
{
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to deleteList");
deleteList.add(qd);
}
else if (serviceInterruption != null)
{
// Service interruption has precedence over unchanged, because we might have been interrupted while scanning the document
// for references
DocumentDescription dd = qd.getDocumentDescription();
// Check for hard failure. But no hard failure possible of it's a job inactive abort.
if (!serviceInterruption.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < serviceInterruption.getRetryTime() ||
dd.getFailRetryCount() == 0))
{
// Treat this as a hard failure.
if (serviceInterruption.isAbortOnFail())
{
// Make sure that the job aborts.
abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause());
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to rescanList due to service interruption");
rescanList.add(qd);
}
else
{
// Skip the document, rather than failing.
// We want this particular document to be not included in the
// reprocessing. Therefore, we do the same thing as we would
// if we got back a null version.
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to deleteList due to service interruption");
deleteList.add(qd);
}
}
else
{
// Not a hard failure. Requeue.
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList");
requeueList.add(qd);
}
}
else
{
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to finishList");
finishList.add(qd);
}
// Note whether the document was untouched; if so, update it
if (!activity.wasDocumentTouched(qd.getDocumentDescription().getDocumentIdentifier()))
{
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to ingesterCheckList");
ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
}
}
if (serviceInterruption != null)
{
// Requeue the documents we've identified as needing to be repeated
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Requeuing documents "+makeListString(requeueList));
requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(),
serviceInterruption.getFailRetryCount());
}
// Note the documents that have been checked but not reingested. This should happen BEFORE we need
// the statistics (which are calculated during the finishlist step below)
if (ingesterCheckList.size() > 0)
{
String[] checkClasses = new String[ingesterCheckList.size()];
String[] checkIDs = new String[ingesterCheckList.size()];
for (int i = 0; i < checkIDs.length; i++)
{
checkClasses[i] = connectionName;
checkIDs[i] = ingesterCheckList.get(i);
}
// This method should exercise reasonable intelligence. If the document has never been indexed, it should detect that
// and stop. Otherwise, it should update the statistics accordingly.
ingester.documentCheckMultiple(pipelineConnections,checkClasses,checkIDs,currentTime);
}
// Process the finish list!
if (finishList.size() > 0)
{
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Finishing documents "+makeListString(finishList));
// "Finish" the documents (removing unneeded carrydown info, and compute hopcounts).
// This can ONLY be done on fully-completed documents; everything else should be left in a dangling
// state (which we know is OK because it will be fixed the next time the document is attempted).
String[] documentIDHashes = new String[finishList.size()];
k = 0;
for (QueuedDocument qd : finishList)
{
documentIDHashes[k++] = qd.getDocumentDescription().getDocumentIdentifierHash();
}
DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode());
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Requeueing documents due to carrydown "+makeListString(requeueCandidates));
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,rt,currentTime);
// In both job types, we have to go through the finishList to figure out what to do with the documents.
// In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
switch (job.getType())
{
case IJobDescription.TYPE_CONTINUOUS:
{
// We need to populate timeArray
String[] timeIDClasses = new String[finishList.size()];
String[] timeIDHashes = new String[finishList.size()];
for (int i = 0; i < timeIDHashes.length; i++)
{
QueuedDocument qd = (QueuedDocument)finishList.get(i);
DocumentDescription dd = qd.getDocumentDescription();
String documentIDHash = dd.getDocumentIdentifierHash();
timeIDClasses[i] = connectionName;
timeIDHashes[i] = documentIDHash;
}
long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineConnections,timeIDClasses,timeIDHashes);
Long[] recheckTimeArray = new Long[timeArray.length];
int[] actionArray = new int[timeArray.length];
DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
for (int i = 0; i < finishList.size(); i++)
{
QueuedDocument qd = finishList.get(i);
recrawlDocs[i] = qd.getDocumentDescription();
String documentID = recrawlDocs[i].getDocumentIdentifier();
// If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
boolean wasAborted = activity.wasDocumentAborted(documentID);
if (wasAborted)
{
// Requeue for immediate reprocessing
if (Logging.scheduling.isDebugEnabled())
Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
actionArray[i] = IJobManager.ACTION_RESCAN;
recheckTimeArray[i] = new Long(0L); // Must not use null; that means 'never'.
}
else
{
// Calculate the next time to run, or time to expire.
// For run time, the formula is to calculate the running avg interval between changes,
// add an additional interval (which comes from the job description),
// and add that to the current time.
// One caveat: we really want to calculate the interval from the last
// time change was detected, but this is not implemented yet.
long timeAmt = timeArray[i];
// null value indicates never to schedule
Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
// Merge the two times together. We decide on the action based on the action with the lowest time.
if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
{
if (Logging.scheduling.isDebugEnabled())
Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
recheckTimeArray[i] = recrawlTime;
actionArray[i] = IJobManager.ACTION_RESCAN;
}
else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
{
if (Logging.scheduling.isDebugEnabled())
Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
recheckTimeArray[i] = expireTime;
actionArray[i] = IJobManager.ACTION_REMOVE;
}
else
{
// Default activity if conflict will be rescan
if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
recheckTimeArray[i] = recrawlTime;
actionArray[i] = IJobManager.ACTION_RESCAN;
}
}
}
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Requeuing "+makeListString(recrawlDocs));
jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
}
break;
case IJobDescription.TYPE_SPECIFIED:
{
// Separate the ones we actually finished from the ones we need to requeue because they were aborted
List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
for (QueuedDocument qd : finishList)
{
DocumentDescription dd = qd.getDocumentDescription();
if (activity.wasDocumentAborted(dd.getDocumentIdentifier()))
{
// The document was aborted, so put it into the abortedList
abortedList.add(dd);
}
else
{
// The document was completed.
completedList.add(dd);
}
}
// Requeue the ones that must be repeated
if (abortedList.size() > 0)
{
DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
Long[] recheckTimeArray = new Long[docDescriptions.length];
int[] actionArray = new int[docDescriptions.length];
for (int i = 0; i < docDescriptions.length; i++)
{
docDescriptions[i] = abortedList.get(i);
recheckTimeArray[i] = new Long(0L);
actionArray[i] = IJobManager.ACTION_RESCAN;
}
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Requeuing "+makeListString(docDescriptions));
jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
}
// Mark the ones completed that were actually completed.
if (completedList.size() > 0)
{
DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
for (int i = 0; i < docDescriptions.length; i++)
{
docDescriptions[i] = completedList.get(i);
}
if (Logging.threads.isDebugEnabled())
Logging.threads.debug(" Marking completed "+makeListString(docDescriptions));
jobManager.markDocumentCompletedMultiple(docDescriptions);
}
}
break;
default:
throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
}
// Finally, if we're still alive, mark everything we finished as "processed".
for (QueuedDocument qd : finishList)
{
qd.setProcessed();
}
}
}
finally
{
// Make sure we don't leave any dangling carrydown files
activity.discard();
}
// Successful processing of the set
// We count 'get version' time in the average, so even if we decide not to process a doc
// it still counts.
long elapsedTime = System.currentTimeMillis() - processingStartTime;
if (Logging.scheduling.isDebugEnabled())
Logging.scheduling.debug("Worker thread for connection "+connectionName+" took "+new Long(elapsedTime).toString()+"ms to handle "+Integer.toString(qds.getCount())+" documents");
queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,elapsedTime);
}
// Now, handle the delete list
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Deleting "+makeListString(deleteList));
processDeleteLists(pipelineConnections,connector,connection,jobManager,
deleteList,ingester,
job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
// Handle hopcount removal
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Hopcount removal "+makeListString(hopcountremoveList));
processHopcountRemovalLists(pipelineConnections,connector,connection,jobManager,
hopcountremoveList,ingester,
job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
}
finally
{
repositoryConnectorPool.release(connection,connector);
}
}
// Handle rescanning
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Rescanning documents "+makeListString(rescanList));
for (QueuedDocument qd : rescanList)
{
jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1);
qd.setProcessed();
}
}
finally
{
// Note termination of processing of these documents in the overlap calculator
qds.endProcessing(queueTracker);
}
if (abortOnFail != null)
throw abortOnFail;
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
break;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
throw e;
if (jobManager.errorAbort(qds.getJobDescription().getID(),e.getMessage()))
{
// We eat the exception if there was already one recorded.
// An exception occurred in the processing of a set of documents.
// Shut the corresponding job down, with an appropriate error
Logging.threads.error("Exception tossed: "+e.getMessage(),e);
}
}
finally
{
// Go through qds and requeue any that aren't closed out in one way or another. This allows the job
// to be aborted; no dangling entries are left around.
for (int i = 0; i < qds.getCount(); i++)
{
QueuedDocument qd = qds.getDocument(i);
if (!qd.wasProcessed())
{
jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1);
}
}
}
}
catch (ManifoldCFException e)
{
if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
break;
if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
{
// Note the failure, which will cause a reset to occur
resetManager.noteEvent();
Logging.threads.error("Worker thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
try
{
// Give the database a chance to catch up/wake up
ManifoldCF.sleep(10000L);
}
catch (InterruptedException se)
{
break;
}
continue;
}
// An exception occurred in the cleanup from another error.
// Log the error (but that's all we can do)
Logging.threads.error("Exception tossed: "+e.getMessage(),e);
}
catch (InterruptedException e)
{
// We're supposed to quit
break;
}
catch (OutOfMemoryError e)
{
System.err.println(sdf.format(new Date()) + " agents process ran out of memory - shutting down");
e.printStackTrace(System.err);
ManifoldCF.systemExit(-200);
}
catch (Throwable e)
{
// A more severe error - but stay alive
Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
}
}
}
catch (Throwable e)
{
// Severe error on initialization
System.err.println("agents process could not start - shutting down");
Logging.threads.fatal("WorkerThread "+id+" initialization error tossed: "+e.getMessage(),e);
ManifoldCF.systemExit(-300);
}
}