in src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java [223:317]
private void assignJobs(final Resource jobsRoot,
final boolean unassign) {
final ResourceResolver resolver = jobsRoot.getResourceResolver();
final Iterator<Resource> topicIter = jobsRoot.listChildren();
while ( caps.isActive() && topicIter.hasNext() ) {
final Resource topicResource = topicIter.next();
final String topicName = topicResource.getName().replace('.', '/');
logger.debug("Found topic {}", topicName);
// first check if there is an instance for these topics
final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName);
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager();
if ( qcm == null ) {
break;
}
final QueueInfo info = qcm.getQueueInfo(topicName);
logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@Override
public boolean handle(final Resource rsrc) {
try {
final ValueMap vm = ResourceHelper.getValueMap(rsrc);
final String targetId = caps.detectTarget(topicName, vm, info);
if ( targetId != null ) {
final String newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
final Map<String, Object> props = new HashMap<>(vm);
props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
try {
ResourceHelper.getOrCreateResource(resolver, newPath, props);
resolver.delete(rsrc);
resolver.commit();
final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
} catch ( final PersistenceException pe ) {
logger.warn("Unable to move unassigned job from " + rsrc.getPath() + " to " + newPath, pe);
resolver.refresh();
resolver.revert();
}
}
} catch (final InstantiationException ie) {
// something happened with the resource in the meantime
logger.warn("Unable to move unassigned job from " + rsrc.getPath(), ie);
resolver.refresh();
resolver.revert();
}
return caps.isActive();
}
});
}
// now unassign if there are still jobs
if ( caps.isActive() && unassign ) {
// we have to move everything to the unassigned area
JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@Override
public boolean handle(final Resource rsrc) {
try {
final ValueMap vm = ResourceHelper.getValueMap(rsrc);
final String newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
final Map<String, Object> props = new HashMap<>(vm);
props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
try {
ResourceHelper.getOrCreateResource(resolver, newPath, props);
resolver.delete(rsrc);
resolver.commit();
final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
} catch ( final PersistenceException pe ) {
logger.warn("Unable to unassigned job from " + rsrc.getPath() + " to " + newPath, pe);
resolver.refresh();
resolver.revert();
}
} catch (final InstantiationException ie) {
// something happened with the resource in the meantime
logger.warn("Unable to unassigned job from " + rsrc.getPath(), ie);
resolver.refresh();
resolver.revert();
}
return caps.isActive();
}
});
}
}
}