public String detectTarget()

in src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java [246:315]


    public String detectTarget(final String jobTopic, final Map<String, Object> jobProperties,
            final QueueInfo queueInfo) {
        final List<InstanceDescription> potentialTargets = this.getPotentialTargets(jobTopic);
        logger.debug("Potential targets for {} : {}", jobTopic, potentialTargets);
        String createdOn = null;
        if ( jobProperties != null ) {
            createdOn = (String) jobProperties.get(org.apache.sling.event.jobs.Job.PROPERTY_JOB_CREATED_INSTANCE);
        }
        if ( createdOn == null ) {
            createdOn = Environment.APPLICATION_ID;
        }
        final InstanceDescription createdOnInstance = this.instanceMap.get(createdOn);

        if ( potentialTargets != null && potentialTargets.size() > 0 ) {
            if ( createdOnInstance != null ) {
                // create a list with local targets first.
                final List<InstanceDescription> localTargets = new ArrayList<InstanceDescription>();
                for(final InstanceDescription desc : potentialTargets) {
                    if ( desc.getClusterView().getId().equals(createdOnInstance.getClusterView().getId()) ) {
                        if ( !this.jobManagerConfiguration.disableDistribution() || desc.isLeader() ) {
                            localTargets.add(desc);
                        }
                    }
                }
                if ( localTargets.size() > 0 ) {
                    potentialTargets.clear();
                    potentialTargets.addAll(localTargets);
                    logger.debug("Potential targets filtered for {} : {}", jobTopic, potentialTargets);
                }
            }
            // check prefer run on creation instance
            if ( queueInfo.queueConfiguration.isPreferRunOnCreationInstance() ) {
                InstanceDescription creationDesc = null;
                for(final InstanceDescription desc : potentialTargets) {
                    if ( desc.getSlingId().equals(createdOn) ) {
                        creationDesc = desc;
                        break;
                    }
                }
                if ( creationDesc != null ) {
                    potentialTargets.clear();
                    potentialTargets.add(creationDesc);
                    logger.debug("Potential targets reduced to creation instance for {} : {}", jobTopic, potentialTargets);
                }
            }
            if ( queueInfo.queueConfiguration.getType() == QueueConfiguration.Type.ORDERED ) {
                // for ordered queues we always pick the first as we have to pick the same target on each cluster view
                // on all instances (TODO - we could try to do some round robin of the whole queue)
                final String result = potentialTargets.get(0).getSlingId();
                logger.debug("Target for {} : {}", jobTopic, result);

                return result;
            }
            // TODO - this is a simple round robin which is not based on the actual load
            //        of the instances
            Integer index = this.roundRobinMap.get(jobTopic);
            if ( index == null ) {
                index = 0;
            }
            if ( index >= potentialTargets.size() ) {
                index = 0;
            }
            this.roundRobinMap.put(jobTopic, index + 1);
            final String result = potentialTargets.get(index).getSlingId();
            logger.debug("Target for {} : {}", jobTopic, result);
            return result;
        }

        return null;
    }