protected SimpleDistributionAgent createAgent()

in src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java [163:254]


    protected SimpleDistributionAgent createAgent(String agentName, BundleContext context, Map<String, Object> config, DefaultDistributionLog distributionLog) {
        String serviceName = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(SERVICE_NAME), null));
        String[] allowedRoots = PropertiesUtil.toStringArray(config.get(ALLOWED_ROOTS), null);
        allowedRoots = SettingsUtils.removeEmptyEntries(allowedRoots);

        boolean queueProcessingEnabled = PropertiesUtil.toBoolean(config.get(QUEUE_PROCESSING_ENABLED), true);

        String[] passiveQueues = PropertiesUtil.toStringArray(config.get(PASSIVE_QUEUES), new String[0]);
        passiveQueues = SettingsUtils.removeEmptyEntries(passiveQueues, new String[0]);

        Map<String, String> priorityQueues = PropertiesUtil.toMap(config.get(PRIORITY_QUEUES), new String[0]);
        priorityQueues = SettingsUtils.removeEmptyEntries(priorityQueues);

        Integer timeout = PropertiesUtil.toInteger(config.get(HTTP), 10) * 1000;
        HttpConfiguration httpConfiguration = new HttpConfiguration(timeout);

        DistributionPackageExporter packageExporter = new LocalDistributionPackageExporter(packageBuilder);

        DistributionQueueProvider queueProvider;
        String queueProviderName = PropertiesUtil.toString(config.get(QUEUE_PROVIDER), JobHandlingDistributionQueueProvider.TYPE);
        if (JobHandlingDistributionQueueProvider.TYPE.equals(queueProviderName)) {
            queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context, configAdmin);
        } else if (SimpleDistributionQueueProvider.TYPE.equals(queueProviderName)) {
            queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, false);
        } else if (ResourceQueueProvider.TYPE.equals(queueProviderName)) {
            queueProvider = new ResourceQueueProvider(context,
                    resourceResolverFactory, SimpleDistributionAgent.DEFAULT_AGENT_SERVICE, agentName, scheduler, true);
        } else { // when SimpleDistributionQueueProvider.TYPE_CHECKPOINT is "queueProviderName"
            queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, true);
        }
        queueProvider = new MonitoringDistributionQueueProvider(queueProvider, context);

        DistributionQueueDispatchingStrategy exportQueueStrategy;
        DistributionQueueDispatchingStrategy errorQueueStrategy = null;

        DistributionPackageImporter packageImporter;
        Map<String, String> importerEndpointsMap = SettingsUtils.toUriMap(config.get(IMPORTER_ENDPOINTS));
        Set<String> processingQueues = new HashSet<String>();

        Set<String> endpointNames = importerEndpointsMap.keySet();

        Set<String> endpointsAndPassiveQueues = new TreeSet<String>();
        endpointsAndPassiveQueues.addAll(endpointNames);
        endpointsAndPassiveQueues.addAll(Arrays.asList(passiveQueues));

        // names of all the queues
        String[] queueNames = endpointsAndPassiveQueues.toArray(new String[endpointsAndPassiveQueues.size()]);

        if (priorityQueues != null) {
            PriorityQueueDispatchingStrategy dispatchingStrategy = new PriorityQueueDispatchingStrategy(priorityQueues, queueNames);
            Map<String, String> queueAliases = dispatchingStrategy.getMatchingQueues(null);
            importerEndpointsMap = SettingsUtils.expandUriMap(importerEndpointsMap, queueAliases);
            exportQueueStrategy = dispatchingStrategy;
            endpointNames = importerEndpointsMap.keySet();
        } else {
            boolean asyncDelivery = PropertiesUtil.toBoolean(config.get(ASYNC_DELIVERY), false);
            if (asyncDelivery) {
                // delivery queues' names
                Map<String, String> deliveryQueues = new HashMap<String, String>();
                for (String e : endpointNames) {
                    deliveryQueues.put(e, "delivery-" + e);
                }

                processingQueues.addAll(deliveryQueues.values());
                exportQueueStrategy = new AsyncDeliveryDispatchingStrategy(deliveryQueues);
            } else {
                exportQueueStrategy = new MultipleQueueDispatchingStrategy(endpointNames.toArray(new String[endpointNames.size()]));
            }
        }

        processingQueues.addAll(endpointNames);
        processingQueues.removeAll(Arrays.asList(passiveQueues));

        packageImporter = new RemoteDistributionPackageImporter(distributionLog, transportSecretProvider,
                importerEndpointsMap, httpConfiguration);

        DistributionRequestType[] allowedRequests = new DistributionRequestType[]{DistributionRequestType.ADD, DistributionRequestType.DELETE};

        String retryStrategy = SettingsUtils.removeEmptyEntry(PropertiesUtil.toString(config.get(RETRY_STRATEGY), null));
        int retryAttepts = PropertiesUtil.toInteger(config.get(RETRY_ATTEMPTS), 100);

        if ("errorQueue".equals(retryStrategy)) {
            errorQueueStrategy = new ErrorQueueDispatchingStrategy(processingQueues.toArray(new String[processingQueues.size()]));
        }

        return new SimpleDistributionAgent(agentName, queueProcessingEnabled, processingQueues,
                serviceName, packageImporter, packageExporter, requestAuthorizationStrategy,
                queueProvider, exportQueueStrategy, errorQueueStrategy, distributionEventFactory, resourceResolverFactory, slingRepository,
                distributionLog, allowedRequests, allowedRoots, retryAttepts);


    }