in src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java [93:120]
public DistributionQueue getQueue(@NotNull String queueName, @NotNull DistributionQueueType type) {
String topic = JobHandlingDistributionQueue.DISTRIBUTION_QUEUE_TOPIC + '/' + type.name().toLowerCase() + '/' + prefix + "/" + queueName;
boolean isActive = jobConsumer != null && (processingQueueNames == null || processingQueueNames.contains(queueName));
try {
if (configAdmin != null && jobManager.getQueue(queueName) == null && configAdmin.getConfiguration(queueName) == null) {
Configuration config = configAdmin.createFactoryConfiguration(
QueueConfiguration.class.getName(), null);
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(ConfigurationConstants.PROP_NAME, queueName);
props.put(ConfigurationConstants.PROP_TYPE, DistributionQueueType.PARALLEL.equals(type) ?
QueueConfiguration.Type.UNORDERED.name() : QueueConfiguration.Type.ORDERED.name());
props.put(ConfigurationConstants.PROP_TOPICS, new String[]{topic});
props.put(ConfigurationConstants.PROP_RETRIES, -1);
props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
props.put(ConfigurationConstants.PROP_PRIORITY, "MAX");
props.put(ConfigurationConstants.PROP_MAX_PARALLEL, ConfigurationConstants.DEFAULT_MAX_PARALLEL);
config.update(props);
}
} catch (IOException e) {
throw new RuntimeException("could not create config for queue " + queueName, e);
}
DistributionQueue queue = new JobHandlingDistributionQueue(queueName, topic, jobManager, isActive, type);
queue = new CachingDistributionQueue(topic, queue);
return queue;
}