private void reassignStaleJobs()

in src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java [104:189]


    private void reassignStaleJobs() {
        if ( caps.isActive() ) {
            this.logger.debug("Checking for stale jobs...");
            final ResourceResolver resolver = this.configuration.createResourceResolver();
            if ( resolver != null ) {
                try {
                    final Resource jobsRoot = resolver.getResource(this.configuration.getLocalJobsPath());

                    // this resource should exist, but we check anyway
                    if ( jobsRoot != null ) {
                        final Iterator<Resource> topicIter = jobsRoot.listChildren();
                        while ( caps.isActive() && topicIter.hasNext() ) {
                            final Resource topicResource = topicIter.next();

                            final String topicName = topicResource.getName().replace('.', '/');
                            this.logger.debug("Checking topic {}..." , topicName);
                            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName);
                            boolean reassign = true;
                            for(final InstanceDescription desc : potentialTargets) {
                                if ( desc.isLocal() ) {
                                    reassign = false;
                                    break;
                                }
                            }
                            if ( reassign ) {
                                final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager();
                                if ( qcm == null ) {
                                    break;
                                }
                                final QueueInfo info = qcm.getQueueInfo(topicName);
				logger.info ("Start reassigning stale jobs");
                                JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {

                                    @Override
                                    public boolean handle(final Resource rsrc) {
                                        try {
                                            final ValueMap vm = ResourceHelper.getValueMap(rsrc);
                                            final String targetId = caps.detectTarget(topicName, vm, info);

                                            final Map<String, Object> props = new HashMap<>(vm);
                                            props.remove(Job.PROPERTY_JOB_STARTED_TIME);

                                            final String newPath;
                                            if ( targetId != null ) {
                                                newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
                                                props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
                                                props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
                                            } else {
                                                newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
                                                props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
                                                props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
                                            }
                                            try {
                                                ResourceHelper.getOrCreateResource(resolver, newPath, props);
                                                resolver.delete(rsrc);
                                                resolver.commit();
                                                final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
                                                if ( targetId != null ) {
                                                    configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
                                                } else {
                                                    configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
                                                }
                                            } catch ( final PersistenceException pe ) {
                                                logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
                                                resolver.refresh();
                                                resolver.revert();
                                            }
                                        } catch (final InstantiationException ie) {
                                            // something happened with the resource in the meantime
                                            logger.warn("Unable to move stale job from " + rsrc.getPath(), ie);
                                            resolver.refresh();
                                            resolver.revert();
                                        }
                                        return caps.isActive();
                                    }
                                });

                            }
                        }
                    }
                } finally {
                    resolver.close();
                }
            }
        }
    }