in src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java [172:224]
private JobQueueImpl(final String name,
final InternalQueueConfiguration config,
final QueueServices services,
final QueueJobCache cache,
final OutdatedJobQueueInfo outdatedQueue) {
if ( config.getOwnThreadPoolSize() > 0 ) {
this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
} else {
this.threadPool = services.eventingThreadPool;
}
this.queueName = name;
this.configuration = config;
this.services = services;
this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
this.running = true;
this.cache = cache;
this.maxParallel = config.getMaxParallel();
if (outdatedQueue == null) {
// queue is created the first time
this.available = new Semaphore(this.maxParallel, true);
this.drainage = new Semaphore(0, true);
} else {
// queue was previously outdated - let's reuse available and drainage
this.available = outdatedQueue.getAvailable();
this.drainage = outdatedQueue.getDrainage();
int oldMaxParallel = outdatedQueue.getMaxParallel();
int maxParallelDiff = this.maxParallel - oldMaxParallel;
int drainedOldDrainage = 0;
int drainedOldAvailable = 0;
if (maxParallelDiff != 0) {
// config change
drainedOldDrainage = this.drainage.drainPermits();
drainedOldAvailable = this.available.drainPermits();
int netNewPermits = drainedOldAvailable - drainedOldDrainage + maxParallelDiff;
if (netNewPermits > 0) {
this.available.release(netNewPermits);
} else if (netNewPermits < 0) {
// special case : maxparallel got reduced since last outdating,
// resulting in effectively negative number of currently available permits.
// to account for that, jobs try to drain first before re-adding to available
// to trigger this behaviour, releasing the permit-diff to drainage
this.drainage.release(-netNewPermits);
}
}
logger.info("<init> reused outdated queue info: queueName : {}"
+ ", old available : {}, old drainage : {}, old maxParallel : {}"
+ ", new available : {}, new drainage : {}, new maxParallel : {}",
queueName, drainedOldAvailable, drainedOldDrainage, oldMaxParallel,
available.availablePermits(), drainage.availablePermits(), this.maxParallel);
}
logger.info("Starting job queue {}", queueName);
logger.debug("Configuration for job queue={}", configuration);
}