private void assignJobs()

in src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java [223:317]


    private void assignJobs(final Resource jobsRoot,
            final boolean unassign) {
        final ResourceResolver resolver = jobsRoot.getResourceResolver();

        final Iterator<Resource> topicIter = jobsRoot.listChildren();
        while ( caps.isActive() && topicIter.hasNext() ) {
            final Resource topicResource = topicIter.next();

            final String topicName = topicResource.getName().replace('.', '/');
            logger.debug("Found topic {}", topicName);

            // first check if there is an instance for these topics
            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName);
            if ( potentialTargets != null && potentialTargets.size() > 0 ) {
                final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager();
                if ( qcm == null ) {
                    break;
                }
                final QueueInfo info = qcm.getQueueInfo(topicName);
                logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);

                JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {

                    @Override
                    public boolean handle(final Resource rsrc) {
                        try {
                            final ValueMap vm = ResourceHelper.getValueMap(rsrc);
                            final String targetId = caps.detectTarget(topicName, vm, info);

                            if ( targetId != null ) {
                                final String newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
                                final Map<String, Object> props = new HashMap<>(vm);
                                props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
                                props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
                                props.remove(Job.PROPERTY_JOB_STARTED_TIME);
                                try {
                                    ResourceHelper.getOrCreateResource(resolver, newPath, props);
                                    resolver.delete(rsrc);
                                    resolver.commit();
                                    final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
                                    configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
                                } catch ( final PersistenceException pe ) {
                                    logger.warn("Unable to move unassigned job from " + rsrc.getPath() + " to " + newPath, pe);
                                    resolver.refresh();
                                    resolver.revert();
                                }
                            }
                        } catch (final InstantiationException ie) {
                            // something happened with the resource in the meantime
                            logger.warn("Unable to move unassigned job from " + rsrc.getPath(), ie);
                            resolver.refresh();
                            resolver.revert();
                        }
                        return caps.isActive();
                    }
                });
            }
            // now unassign if there are still jobs
            if ( caps.isActive() && unassign ) {
                // we have to move everything to the unassigned area
                JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {

                    @Override
                    public boolean handle(final Resource rsrc) {
                        try {
                            final ValueMap vm = ResourceHelper.getValueMap(rsrc);
                            final String newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
                            final Map<String, Object> props = new HashMap<>(vm);
                            props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
                            props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
                            props.remove(Job.PROPERTY_JOB_STARTED_TIME);

                            try {
                                ResourceHelper.getOrCreateResource(resolver, newPath, props);
                                resolver.delete(rsrc);
                                resolver.commit();
                                final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
                                configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
                            } catch ( final PersistenceException pe ) {
                                logger.warn("Unable to unassigned job from " + rsrc.getPath() + " to " + newPath, pe);
                                resolver.refresh();
                                resolver.revert();
                            }
                        } catch (final InstantiationException ie) {
                            // something happened with the resource in the meantime
                            logger.warn("Unable to unassigned job from " + rsrc.getPath(), ie);
                            resolver.refresh();
                            resolver.revert();
                        }
                        return caps.isActive();
                    }
                });
            }
        }
    }