private JobQueueImpl()

in src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java [172:224]


    private JobQueueImpl(final String name,
                        final InternalQueueConfiguration config,
                        final QueueServices services,
                        final QueueJobCache cache,
                        final OutdatedJobQueueInfo outdatedQueue) {
        if ( config.getOwnThreadPoolSize() > 0 ) {
            this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
        } else {
            this.threadPool = services.eventingThreadPool;
        }
        this.queueName = name;
        this.configuration = config;
        this.services = services;
        this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
        this.running = true;
        this.cache = cache;
        this.maxParallel = config.getMaxParallel();
        if (outdatedQueue == null) {
            // queue is created the first time
            this.available = new Semaphore(this.maxParallel, true);
            this.drainage = new Semaphore(0, true);
        } else {
            // queue was previously outdated - let's reuse available and drainage
            this.available = outdatedQueue.getAvailable();
            this.drainage = outdatedQueue.getDrainage();
            int oldMaxParallel = outdatedQueue.getMaxParallel();
            int maxParallelDiff = this.maxParallel - oldMaxParallel;
            int drainedOldDrainage = 0;
            int drainedOldAvailable = 0;
            if (maxParallelDiff != 0) {
                // config change
                drainedOldDrainage = this.drainage.drainPermits();
                drainedOldAvailable = this.available.drainPermits();
                int netNewPermits = drainedOldAvailable - drainedOldDrainage + maxParallelDiff;
                if (netNewPermits > 0) {
                    this.available.release(netNewPermits);
                } else if (netNewPermits < 0) {
                    // special case : maxparallel got reduced since last outdating,
                    // resulting in effectively negative number of currently available permits.
                    // to account for that, jobs try to drain first before re-adding to available
                    // to trigger this behaviour, releasing the permit-diff to drainage
                    this.drainage.release(-netNewPermits);
                }
            }
            logger.info("<init> reused outdated queue info: queueName : {}"
                    + ", old available : {}, old drainage : {}, old maxParallel : {}"
                    + ", new available : {}, new drainage : {}, new maxParallel : {}",
                    queueName, drainedOldAvailable, drainedOldDrainage, oldMaxParallel,
                    available.availablePermits(), drainage.availablePermits(), this.maxParallel);
        }
        logger.info("Starting job queue {}", queueName);
        logger.debug("Configuration for job queue={}", configuration);
    }