protected ActiveMQEndpointWorker()

in activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java [83:226]


    protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
        this.endpointActivationKey = key;
        this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
        this.workManager = adapter.getBootstrapContext().getWorkManager();
        try {
            this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
        } catch (NoSuchMethodException e) {
            throw new ResourceException("Endpoint does not implement the onMessage method.");
        }

        connectWork = new Work() {
            long currentReconnectDelay = INITIAL_RECONNECT_DELAY;

            public void release() {
                //
            }

            public void run() {
                currentReconnectDelay = INITIAL_RECONNECT_DELAY;
                MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
                if (LOG.isInfoEnabled()) {
                    LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
                }

                while (connecting.get() && running) {
                    try {
                        connection = adapter.makeConnection(activationSpec);
                        connection.setExceptionListener(new ExceptionListener() {
                            public void onException(JMSException error) {
                                if (!serverSessionPool.isClosing()) {
                                    // initiate reconnection only once, i.e. on initial exception
                                    // and only if not already trying to connect
                                    LOG.error("Connection to broker failed: " + error.getMessage(), error);
                                    if (connecting.compareAndSet(false, true)) {
                                        synchronized (connectWork) {
                                            disconnect();
                                            serverSessionPool.closeSessions();
                                            connect();
                                        }
                                    } else {
                                        // connection attempt has already been initiated
                                        LOG.info("Connection attempt already in progress, ignoring connection exception");
                                    }
                                }
                            }
                        });
                        connection.start();

                        if (activationSpec.isDurableSubscription()) {
                            consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer(
                                    (Topic) dest,
                                    activationSpec.getSubscriptionName(),
                                    emptyToNull(activationSpec.getMessageSelector()),
                                    serverSessionPool,
                                    connection.getPrefetchPolicy().getDurableTopicPrefetch(),
                                    activationSpec.getNoLocalBooleanValue());
                        } else {
                            consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer(
                                    dest,
                                    emptyToNull(activationSpec.getMessageSelector()),
                                    serverSessionPool,
                                    getPrefetch(activationSpec, connection, dest),
                                    activationSpec.getNoLocalBooleanValue());
                        }


                        if (connecting.compareAndSet(true, false)) {
                            if (LOG.isInfoEnabled()) {
                                LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
                            }
                        } else {
                            LOG.error("Could not release connection lock");
                        }

                        if (consumer.getConsumerInfo().getCurrentPrefetchSize() == 0) {
                            LOG.error("Endpoint " + endpointActivationKey.getActivationSpec() + " will not receive any messages due to broker 'zero prefetch' configuration for: " + dest);
                        }

                    } catch (JMSException error) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Failed to connect: " + error.getMessage(), error);
                        }
                        disconnect();
                        pause(error);
                    }
                }
            }

            private int getPrefetch(MessageActivationSpec activationSpec, ActiveMQConnection connection, ActiveMQDestination destination) {
                if (destination.isTopic()) {
                    return connection.getPrefetchPolicy().getTopicPrefetch();
                } else if (destination.isQueue()) {
                    return connection.getPrefetchPolicy().getQueuePrefetch();
                } else {
                    return activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
                }
            }
            
            private void pause(JMSException error) {
                if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
                    LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " 
                            + error.getMessage(), error);
                    LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
                }
                try {
                    synchronized ( shutdownMutex ) {
                        // shutdownMutex will be notified by stop() method in
                        // order to accelerate shutdown of endpoint
                        shutdownMutex.wait(currentReconnectDelay);
                    }
                } catch ( InterruptedException e ) {
                    Thread.interrupted();
                }
                currentReconnectDelay *= 2;
                if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
                    currentReconnectDelay = MAX_RECONNECT_DELAY;
                }                
            }
        };

        MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
        if (activationSpec.isUseJndi()) {
            try {
                InitialContext initialContext = new InitialContext();
                dest = (ActiveMQDestination) initialContext.lookup(activationSpec.getDestination());
            }
            catch (NamingException exc) {
                throw new ResourceException("JNDI lookup failed for "
                    + activationSpec.getDestination());
            }
        }
        else {
            if ("jakarta.jms.Queue".equals(activationSpec.getDestinationType())) {
                dest = new ActiveMQQueue(activationSpec.getDestination());
            }
            else if ("jakarta.jms.Topic".equals(activationSpec.getDestinationType())) {
                dest = new ActiveMQTopic(activationSpec.getDestination());
            }
            else {
                throw new ResourceException("Unknown destination type: "
                    + activationSpec.getDestinationType());
            }
        }
    }