in src/main/java/org/apache/sling/commons/threads/impl/DefaultThreadPool.java [62:167]
public DefaultThreadPool(final String name,
final ThreadPoolConfig origConfig) {
// name
if ( name != null ) {
this.name = name;
} else {
this.name = DefaultThreadPoolManager.DEFAULT_THREADPOOL_NAME;
}
this.logger.info("Initializing thread pool [{}] ...", this.name);
this.configuration = new ModifiableThreadPoolConfig(origConfig);
// factory
final ThreadFactory delegateThreadFactory;
if (this.configuration.getFactory() == null) {
logger.debug("Thread pool [{}] ; No ThreadFactory is configured. Will use JVM default thread factory: {}",
this.name, ExtendedThreadFactory.class.getName());
delegateThreadFactory = Executors.defaultThreadFactory();
} else {
delegateThreadFactory = this.configuration.getFactory();
}
// Min pool size
if (this.configuration.getMinPoolSize() < 0) {
this.configuration.setMinPoolSize(1);
this.logger.warn("min-pool-size < 0 for pool \"" + this.name + "\". Set to 1");
}
// Max pool size
if ( this.configuration.getMaxPoolSize() < 0 ) {
this.configuration.setMaxPoolSize(Integer.MAX_VALUE);
}
// Set priority and daemon flag
final ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
delegateThreadFactory,
this.name,
this.configuration.getPriority(),
this.configuration.isDaemon()
);
// Keep alive time
if (this.configuration.getKeepAliveTime() < 0) {
this.configuration.setKeepAliveTime(1000);
this.logger.warn("keep-alive-time-ms < 0 for pool \"" + this.name + "\". Set to 1000");
}
// Queue
final BlockingQueue<Runnable> queue;
if (this.configuration.getQueueSize() != 0) {
if (this.configuration.getQueueSize() > 0) {
queue = new java.util.concurrent.ArrayBlockingQueue<Runnable>(this.configuration.getQueueSize());
} else {
// SLING-7407 : queue size is -1 (or negative) == unbounded
// in this case the max pool size wouldn't have any effect, since the
// pool is only increased (ie threads only created) when the queue is blocked
// but with an unbounded queue that never happens, thus you'd always get only
// maximum min queue size threads.
// To fix this somewhat odd behaviour, we now automatically set the min to max for this case:
if (this.configuration.getMinPoolSize() < this.configuration.getMaxPoolSize()) {
this.logger.info("min-pool-size (" + configuration.getMinPoolSize() +
") < max-pool-size (" + configuration.getMaxPoolSize() + ") for pool \"" + this.name +
"\" which has unbounded queue (queue size -1). Set to " + configuration.getMaxPoolSize());
this.configuration.setMinPoolSize(configuration.getMaxPoolSize());
}
queue = new LinkedBlockingQueue<Runnable>();
}
} else {
queue = new SynchronousQueue<Runnable>();
}
RejectedExecutionHandler handler = null;
switch (this.configuration.getBlockPolicy()) {
case ABORT :
handler = new ThreadPoolExecutor.AbortPolicy();
break;
case DISCARD :
handler = new ThreadPoolExecutor.DiscardPolicy();
break;
case DISCARDOLDEST :
handler = new ThreadPoolExecutor.DiscardOldestPolicy();
break;
case RUN :
handler = new ThreadPoolExecutor.CallerRunsPolicy();
break;
}
try {
this.executor = new ThreadPoolExecutorCleaningThreadLocals(this.configuration.getMinPoolSize(),
this.configuration.getMaxPoolSize(),
this.configuration.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
queue,
threadFactory,
handler,
new LoggingThreadLocalChangeListener());
} catch (RuntimeException | Error e) {
logger.warn("Unsupported JRE, cannot register ThreadPoolExecutorCleaningThreadLocals due to '{}', fall back to regular ThreadPoolExecutor", e.getMessage(), e);
this.executor = new ThreadPoolExecutor(this.configuration.getMinPoolSize(),
this.configuration.getMaxPoolSize(),
this.configuration.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
queue,
threadFactory,
handler);
}
this.logger.info("Thread pool [{}] initialized.", name);
}