public static AMQPTransportPollingTask createPollingTaskForService()

in modules/transports/optional/amqp/src/main/java/org/apache/synapse/transport/amqp/pollingtask/AMQPTransportPollingTaskFactory.java [36:239]


    public static AMQPTransportPollingTask createPollingTaskForService(
            AxisService service,
            ScheduledExecutorService pool,
            AMQPTransportEndpoint endpoint,
            AMQPTransportConnectionFactory connectionFactory,
            AMQPTransportReconnectHandler haHandler) throws AxisFault {

        Map<String, String> svcParam =
                AMQPTransportUtils.getServiceStringParameters(service.getParameters());
        Map<String, String> conFacParam = connectionFactory.getParameters();


        AMQPTransportPollingTask pt = new AMQPTransportPollingTask();

        pt.setServiceName(service.getName());
        pt.setEndpoint(endpoint);
        pt.setPollingTaskScheduler(pool);
        pt.setHaHandler(haHandler);

        // set buffers to hold request/response messages for this task
        pt.setBuffers(new AMQPTransportBuffers());

        String exchangeName = AMQPTransportUtils.getOptionalStringParameter(
                AMQPTransportConstant.PARAMETER_EXCHANGE_NAME, svcParam, conFacParam);
        pt.setExchangeName(exchangeName);

        Boolean isDurable = AMQPTransportUtils.getOptionalBooleanParameter(
                AMQPTransportConstant.PARAMETER_EXCHANGE_IS_DURABLE, svcParam, conFacParam);
        if (isDurable != null) {
            pt.setExchangeDurable(isDurable);
        }

        Boolean isAutoDelete = AMQPTransportUtils.getOptionalBooleanParameter(
                AMQPTransportConstant.PARAMETER_EXCHANGE_IS_AUTO_DELETE, svcParam, conFacParam);
        if (isAutoDelete != null) {
            pt.setExchangeAutoDelete(isAutoDelete);
        }

        String exchangeType = AMQPTransportUtils.getOptionalStringParameter(
                AMQPTransportConstant.PARAMETER_EXCHANGE_TYPE, svcParam, conFacParam);
        if (exchangeType != null) {
            if (exchangeName == null) {
                throw new AxisFault("Possible configuration error. No exchange name provided but " +
                        "exchange type is set to '" + exchangeType + "'");
            }

            pt.setExchangeType(exchangeType);
        }

        Boolean isInternalExchange = AMQPTransportUtils.getOptionalBooleanParameter(
                AMQPTransportConstant.PARAMETER_EXCHANGE_INTERNAL, svcParam, conFacParam);
        if (isInternalExchange != null) {
            if (exchangeName == null) {
                throw new AxisFault("Possible configuration error. No exchange name provided but " +
                        "exchange restricted as " + (isInternalExchange ? "internal." : "external."));
            }
            pt.setInternalExchange(isInternalExchange);
        }

        pt.setChannel(connectionFactory.getChannel());
        pt.setConnectionFactoryName(connectionFactory.getName());

        String responseConFac = AMQPTransportUtils.getOptionalStringParameter(
                AMQPTransportConstant.PARAMETER_RESPONSE_CONNECTION_FACTORY_NAME,
                svcParam, conFacParam);
        if (responseConFac != null) {
            pt.setResponseConnectionFactory(responseConFac);
        }

        String consumerExchange = AMQPTransportUtils.getOptionalStringParameter(
                AMQPTransportConstant.PARAMETER_BIND_EXCHANGE, svcParam, conFacParam);
        if (consumerExchange != null) {
            if (exchangeName != null && !consumerExchange.equals(exchangeName)) {
                log.warn("Possible configuration error? Exchange name is set to '" +
                        exchangeName + "' and consumer's exchange name is set to '" +
                        consumerExchange + "'");
            }
            pt.setConsumerExchangeName(consumerExchange);
        }

        String bindingKeyString = AMQPTransportUtils.getOptionalStringParameter(
                AMQPTransportConstant.PARAMETER_BINDING_KEYS, svcParam, conFacParam);

        if (bindingKeyString != null) {
            pt.setBindingsKeys(AMQPTransportUtils.split(
                    bindingKeyString, AMQPTransportConstant.ROUTING_KEY_DELIMITER));
        }

        String queueName = AMQPTransportUtils.getOptionalStringParameter(
                AMQPTransportConstant.PARAMETER_QUEUE_NAME, svcParam, conFacParam);
        if (queueName == null) {
            queueName = service.getName(); // set the service name as the queue name for default.
        }
        pt.setQueueName(queueName);

        String configuredContentType = AMQPTransportUtils.getOptionalStringParameter(
                AMQPTransportConstant.PARAMETER_CONFIGURED_CONTENT_TYPE, svcParam, conFacParam);
        if (configuredContentType != null) {
            pt.setConfiguredContentType(configuredContentType);
        }

        Boolean isQueueDurable = AMQPTransportUtils.getOptionalBooleanParameter(
                AMQPTransportConstant.PARAMETER_QUEUE_DURABLE, svcParam, conFacParam);
        if (isDurable != null) {
            pt.setQueueDurable(isQueueDurable);
        }

        Boolean isQueueRestricted = AMQPTransportUtils.getOptionalBooleanParameter(
                AMQPTransportConstant.PARAMETER_QUEUE_RESTRICTED, svcParam, conFacParam);
        if (isQueueRestricted != null) {
            pt.setQueueRestricted(isQueueRestricted);
        }

        Boolean isQueueAutoDelete = AMQPTransportUtils.getOptionalBooleanParameter(
                AMQPTransportConstant.PARAMETER_QUEUE_AUTO_DELETE, svcParam, conFacParam);
        if (isQueueAutoDelete != null) {
            pt.setQueueAutoDelete(isQueueAutoDelete);
        }

        Boolean isBlockingMode = AMQPTransportUtils.getOptionalBooleanParameter(
                AMQPTransportConstant.PARAMETER_OPERATE_ON_BLOCKING_MODE, svcParam, conFacParam);
        if (isBlockingMode != null) {
            pt.setBlockingMode(isBlockingMode);
        }

        try {
            Integer noOfConsumers = AMQPTransportUtils.getOptionalIntParameter(
                    AMQPTransportConstant.PARAMETER_NO_OF_CONCURRENT_CONSUMERS,
                    svcParam, conFacParam);
            if (noOfConsumers != null) {
                pt.setNoOfConcurrentConsumers(noOfConsumers);
            }
        } catch (AMQPTransportException e) {
            throw new AxisFault("Could not assign the number of concurrent consumers", e);
        }

        try {
            Integer dispatchingTask = AMQPTransportUtils.getOptionalIntParameter(
                    AMQPTransportConstant.PARAMETER_DISPATCHING_TASK_SIZE,
                    svcParam, conFacParam);
            if (dispatchingTask != null) {
                pt.setNoOfDispatchingTask(dispatchingTask);
            }
        } catch (AMQPTransportException e) {
            throw new AxisFault("Could not assign number of dispatching task value", e);
        }

        Boolean isUseTx = AMQPTransportUtils.getOptionalBooleanParameter(
                AMQPTransportConstant.PARAMETER_CONSUMER_TX, svcParam, conFacParam);
        if (isUseTx != null) {
            pt.setUseTx(isUseTx);
        }

        try {
            Integer initialDelay = AMQPTransportUtils.getOptionalIntParameter(
                    AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_INITIAL_DELAY,
                    svcParam, conFacParam);
            if (initialDelay != null) {
                pt.setScheduledTaskInitialDelay(initialDelay.intValue());
            }
        } catch (AMQPTransportException e) {
            throw new AxisFault("Could not assign the scheduled task initial delay value", e);
        }

        try {
            Integer delay = AMQPTransportUtils.getOptionalIntParameter(
                    AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_INITIAL_DELAY,
                    svcParam, conFacParam);
            if (delay != null) {
                pt.setScheduledTaskDelay(delay.intValue());
            }
        } catch (AMQPTransportException e) {
            throw new AxisFault("Could not assign the scheduled task delay value", e);
        }

        String timeUnit = AMQPTransportUtils.getOptionalStringParameter(
                AMQPTransportConstant.PARAMETER_SCHEDULED_TASK_TIME_UNIT,
                svcParam, conFacParam);

        if (timeUnit != null) {
            pt.setScheduledTaskTimeUnit(getTimeUnit(timeUnit));
        }

        if (log.isDebugEnabled()) {
            log.debug("A polling task for the service '" + service.getName() + "' was produced with " +
                    "following parameters.\n" +
                    "Exchange Name: '" + pt.getExchangeName() + "'\n" +
                    "Exchange Type: '" + pt.getExchangeType() + "'\n" +
                    "Exchange Durable?: '" + pt.isExchangeDurable() + "'\n" +
                    "Exchange AutoDelete?: '" + pt.isExchangeAutoDelete() + "\n" +
                    "Is internal exchange: '" + pt.isInternalExchange() + "'\n" +
                    "Consumer Exchange: " + pt.getConsumerExchangeName() + "'\n" +
                    "Routing Keys: '" + bindingKeyString + "'\n" +
                    "QueueName: '" + pt.getQueueName() + "'\n" +
                    "Is queue durable: '" + pt.isQueueDurable() + "'\n" +
                    "Is queue restricted: '" + pt.isQueueRestricted() + "'\n" +
                    "Is queue auto deleted: '" + pt.isQueueAutoDelete() + "'\n" +
                    "Is blocking mode: '" + pt.isBlockingMode() + "'\n" +
                    "Number of concurrent consumers: '" + pt.getNoOfConcurrentConsumers() + "'\n" +
                    "Number of dispatching task: '" + pt.getNoOfDispatchingTask() + "'");
        }

        return pt;
    }