in src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java [104:189]
private void reassignStaleJobs() {
if ( caps.isActive() ) {
this.logger.debug("Checking for stale jobs...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
if ( resolver != null ) {
try {
final Resource jobsRoot = resolver.getResource(this.configuration.getLocalJobsPath());
// this resource should exist, but we check anyway
if ( jobsRoot != null ) {
final Iterator<Resource> topicIter = jobsRoot.listChildren();
while ( caps.isActive() && topicIter.hasNext() ) {
final Resource topicResource = topicIter.next();
final String topicName = topicResource.getName().replace('.', '/');
this.logger.debug("Checking topic {}..." , topicName);
final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName);
boolean reassign = true;
for(final InstanceDescription desc : potentialTargets) {
if ( desc.isLocal() ) {
reassign = false;
break;
}
}
if ( reassign ) {
final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager();
if ( qcm == null ) {
break;
}
final QueueInfo info = qcm.getQueueInfo(topicName);
logger.info ("Start reassigning stale jobs");
JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@Override
public boolean handle(final Resource rsrc) {
try {
final ValueMap vm = ResourceHelper.getValueMap(rsrc);
final String targetId = caps.detectTarget(topicName, vm, info);
final Map<String, Object> props = new HashMap<>(vm);
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
final String newPath;
if ( targetId != null ) {
newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
} else {
newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
}
try {
ResourceHelper.getOrCreateResource(resolver, newPath, props);
resolver.delete(rsrc);
resolver.commit();
final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
if ( targetId != null ) {
configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
} else {
configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
}
} catch ( final PersistenceException pe ) {
logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
resolver.refresh();
resolver.revert();
}
} catch (final InstantiationException ie) {
// something happened with the resource in the meantime
logger.warn("Unable to move stale job from " + rsrc.getPath(), ie);
resolver.refresh();
resolver.revert();
}
return caps.isActive();
}
});
}
}
}
} finally {
resolver.close();
}
}
}
}