in deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/ThreadPoolManager.java [89:237]
public ExecutorService find(final String name)
{
if (closed)
{
throw new IllegalStateException("Container is shutting down");
}
ExecutorService pool = pools.get(name);
if (pool == null)
{
synchronized (this)
{
pool = pools.get(name);
if (pool == null)
{
// the instantiation does the following:
// 1. check if there is a named bean matching this name using @Default qualifier
// 2. check if there is a JNDI entry (ManagedExecutorService case) matching this name
// 3. create a new executor service based on the DS-config
// 1.
final Set<Bean<?>> beans = beanManager.getBeans(name);
if (beans != null && !beans.isEmpty())
{
final Bean<?> bean = beanManager.resolve(beans);
if (bean.getTypes().contains(ExecutorService.class))
{
final CreationalContext<Object> creationalContext =
beanManager.createCreationalContext(null);
if (!beanManager.isNormalScope(bean.getScope()))
{
contexts.add(creationalContext);
}
pool = ExecutorService.class.cast(beanManager.getReference(
bean, ExecutorService.class, creationalContext));
}
}
if (pool == null) // 2.
{
for (final String prefix : asList(
"", "java:app/", "java:global/", "java:global/threads/",
"java:global/deltaspike/", "java:"))
{
try
{
final Object instance = new InitialContext().lookup(prefix + name);
if (ExecutorService.class.isInstance(instance))
{
pool = ExecutorService.class.cast(instance);
break;
}
}
catch (final NamingException e)
{
// no-op
}
}
}
if (pool == null) // 3.
{
final String configPrefix = "futureable.pool." + name + ".";
final int coreSize = ConfigResolver.resolve(configPrefix + "coreSize")
.as(Integer.class)
.withDefault(Math.max(2, Runtime.getRuntime().availableProcessors()))
.getValue();
final int maxSize = ConfigResolver.resolve(configPrefix + "maxSize")
.as(Integer.class)
.withDefault(coreSize)
.getValue();
final long keepAlive = ConfigResolver.resolve(configPrefix + "keepAlive.value")
.as(Long.class)
.withDefault(0L)
.getValue();
final String keepAliveUnit = ConfigResolver.resolve(configPrefix + "keepAlive.unit")
.as(String.class)
.withDefault("MILLISECONDS")
.getValue();
final String queueType = ConfigResolver.resolve(configPrefix + "queue.type")
.as(String.class)
.withDefault("LINKED")
.getValue();
final BlockingQueue<Runnable> queue;
if ("ARRAY".equalsIgnoreCase(queueType))
{
final int size = ConfigResolver.resolve(configPrefix + "queue.size")
.as(Integer.class)
.withDefault(1024)
.getValue();
final boolean fair = ConfigResolver.resolve(configPrefix + "queue.fair")
.as(Boolean.class)
.withDefault(false)
.getValue();
queue = new ArrayBlockingQueue<Runnable>(size, fair);
}
else if ("SYNCHRONOUS".equalsIgnoreCase(queueType))
{
final boolean fair = ConfigResolver.resolve(configPrefix + "queue.fair")
.as(Boolean.class)
.withDefault(false)
.getValue();
queue = new SynchronousQueue<Runnable>(fair);
}
else
{
final int capacity = ConfigResolver.resolve(configPrefix + "queue.capacity")
.as(Integer.class)
.withDefault(Integer.MAX_VALUE)
.getValue();
queue = new LinkedBlockingQueue<Runnable>(capacity);
}
final String threadFactoryName = ConfigResolver.getPropertyValue(
configPrefix + "threadFactory.name");
final ThreadFactory threadFactory;
if (threadFactoryName != null)
{
threadFactory = lookupByName(threadFactoryName, ThreadFactory.class);
}
else
{
threadFactory = Executors.defaultThreadFactory();
}
final String rejectedHandlerName = ConfigResolver.getPropertyValue(
configPrefix + "rejectedExecutionHandler.name");
final RejectedExecutionHandler rejectedHandler;
if (rejectedHandlerName != null)
{
rejectedHandler = lookupByName(rejectedHandlerName, RejectedExecutionHandler.class);
}
else
{
rejectedHandler = new ThreadPoolExecutor.AbortPolicy();
}
pool = new ThreadPoolExecutor(
coreSize, maxSize,
keepAlive, TimeUnit.valueOf(keepAliveUnit),
queue, threadFactory, rejectedHandler);
}
pools.put(name, pool);
}
}
}
return pool;
}