in src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java [108:152]
private void upgradeBridgedJobs(final Resource topicResource) {
final String topicName = topicResource.getName().replace('.', '/');
final QueueConfigurationManager qcm = configuration.getQueueConfigurationManager();
if ( qcm == null ) {
return;
}
final QueueInfo info = qcm.getQueueInfo(topicName);
JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@Override
public boolean handle(final Resource rsrc) {
try {
final ValueMap vm = ResourceHelper.getValueMap(rsrc);
final String targetId = caps.detectTarget(topicName, vm, info);
final Map<String, Object> props = new HashMap<>(vm);
final String newPath;
if ( targetId != null ) {
newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
} else {
newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
}
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
try {
ResourceHelper.getOrCreateResource(topicResource.getResourceResolver(), newPath, props);
topicResource.getResourceResolver().delete(rsrc);
topicResource.getResourceResolver().commit();
} catch ( final PersistenceException pe ) {
logger.warn("Unable to move job from previous version " + rsrc.getPath(), pe);
topicResource.getResourceResolver().refresh();
topicResource.getResourceResolver().revert();
}
} catch (final InstantiationException ie) {
logger.warn("Unable to move job from previous version " + rsrc.getPath(), ie);
topicResource.getResourceResolver().refresh();
topicResource.getResourceResolver().revert();
}
return caps.isActive();
}
});
}