protected void activate()

in src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java [191:247]


    protected void activate(final Map<String, Object> props, final Config config) {
        this.name = config.queue_name();
        try {
            this.priority = ThreadPriority.valueOf(config.queue_priority());
        } catch ( final IllegalArgumentException iae) {
            logger.warn("Invalid value for queue priority. Using default instead of : {}", config.queue_priority());
            this.priority = ThreadPriority.valueOf(ConfigurationConstants.DEFAULT_PRIORITY);
        }
        try {
            this.type = Type.valueOf(config.queue_type());
        } catch ( final IllegalArgumentException iae) {
            logger.error("Invalid value for queue type configuration: {}", config.queue_type());
            this.type = null;
        }
        this.retries = config.queue_retries();
        this.retryDelay = config.queue_retrydelay();

        // Float values are treated as percentage.  int values are treated as number of cores, -1 == all available
        // Note: the value is based on the core count at startup.  It will not change dynamically if core count changes.
        int cores = ConfigurationConstants.NUMBER_OF_PROCESSORS;
        final double inMaxParallel = config.queue_maxparallel();
        logger.debug("Max parallel for queue {} is {}", this.name, inMaxParallel);
        if ((inMaxParallel == Math.floor(inMaxParallel)) && !Double.isInfinite(inMaxParallel)) {
            // integral type
            if ((int) inMaxParallel == 0) {
                logger.warn("Max threads property for {} set to zero.", this.name);
            }
            this.maxParallelProcesses = (inMaxParallel <= -1 ? cores : (int) inMaxParallel);
        } else {
            // percentage (rounded)
            if ((inMaxParallel > 0.0) && (inMaxParallel < 1.0)) {
                this.maxParallelProcesses = (int) Math.round(cores * inMaxParallel);
            } else {
                logger.warn("Invalid queue max parallel value for queue {}. Using {}", this.name, cores);
                this.maxParallelProcesses =  cores;
            }
        }
        logger.debug("Thread pool size for {} was set to {}", this.name, this.maxParallelProcesses);

        // ignore parallel setting for ordered queues
        if ( this.type == Type.ORDERED ) {
            this.maxParallelProcesses = 1;
        }
        final String[] topicsParam = config.queue_topics();
        this.matchers = TopicMatcherHelper.buildMatchers(topicsParam);
        if ( this.matchers == null ) {
            this.topics = null;
        } else {
            this.topics = topicsParam;
        }
        this.keepJobs = config.queue_keepJobs();
        this.serviceRanking = config.service_ranking();
        this.ownThreadPoolSize = config.queue_threadPoolSize();
        this.preferCreationInstance = config.queue_preferRunOnCreationInstance();
        this.pid = (String)props.get(Constants.SERVICE_PID);
        this.valid = this.checkIsValid();
    }