in activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java [83:226]
protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
this.endpointActivationKey = key;
this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
this.workManager = adapter.getBootstrapContext().getWorkManager();
try {
this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
} catch (NoSuchMethodException e) {
throw new ResourceException("Endpoint does not implement the onMessage method.");
}
connectWork = new Work() {
long currentReconnectDelay = INITIAL_RECONNECT_DELAY;
public void release() {
//
}
public void run() {
currentReconnectDelay = INITIAL_RECONNECT_DELAY;
MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
if (LOG.isInfoEnabled()) {
LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
}
while (connecting.get() && running) {
try {
connection = adapter.makeConnection(activationSpec);
connection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException error) {
if (!serverSessionPool.isClosing()) {
// initiate reconnection only once, i.e. on initial exception
// and only if not already trying to connect
LOG.error("Connection to broker failed: " + error.getMessage(), error);
if (connecting.compareAndSet(false, true)) {
synchronized (connectWork) {
disconnect();
serverSessionPool.closeSessions();
connect();
}
} else {
// connection attempt has already been initiated
LOG.info("Connection attempt already in progress, ignoring connection exception");
}
}
}
});
connection.start();
if (activationSpec.isDurableSubscription()) {
consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer(
(Topic) dest,
activationSpec.getSubscriptionName(),
emptyToNull(activationSpec.getMessageSelector()),
serverSessionPool,
connection.getPrefetchPolicy().getDurableTopicPrefetch(),
activationSpec.getNoLocalBooleanValue());
} else {
consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer(
dest,
emptyToNull(activationSpec.getMessageSelector()),
serverSessionPool,
getPrefetch(activationSpec, connection, dest),
activationSpec.getNoLocalBooleanValue());
}
if (connecting.compareAndSet(true, false)) {
if (LOG.isInfoEnabled()) {
LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
}
} else {
LOG.error("Could not release connection lock");
}
if (consumer.getConsumerInfo().getCurrentPrefetchSize() == 0) {
LOG.error("Endpoint " + endpointActivationKey.getActivationSpec() + " will not receive any messages due to broker 'zero prefetch' configuration for: " + dest);
}
} catch (JMSException error) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to connect: " + error.getMessage(), error);
}
disconnect();
pause(error);
}
}
}
private int getPrefetch(MessageActivationSpec activationSpec, ActiveMQConnection connection, ActiveMQDestination destination) {
if (destination.isTopic()) {
return connection.getPrefetchPolicy().getTopicPrefetch();
} else if (destination.isQueue()) {
return connection.getPrefetchPolicy().getQueuePrefetch();
} else {
return activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
}
}
private void pause(JMSException error) {
if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: "
+ error.getMessage(), error);
LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
}
try {
synchronized ( shutdownMutex ) {
// shutdownMutex will be notified by stop() method in
// order to accelerate shutdown of endpoint
shutdownMutex.wait(currentReconnectDelay);
}
} catch ( InterruptedException e ) {
Thread.interrupted();
}
currentReconnectDelay *= 2;
if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
currentReconnectDelay = MAX_RECONNECT_DELAY;
}
}
};
MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
if (activationSpec.isUseJndi()) {
try {
InitialContext initialContext = new InitialContext();
dest = (ActiveMQDestination) initialContext.lookup(activationSpec.getDestination());
}
catch (NamingException exc) {
throw new ResourceException("JNDI lookup failed for "
+ activationSpec.getDestination());
}
}
else {
if ("jakarta.jms.Queue".equals(activationSpec.getDestinationType())) {
dest = new ActiveMQQueue(activationSpec.getDestination());
}
else if ("jakarta.jms.Topic".equals(activationSpec.getDestinationType())) {
dest = new ActiveMQTopic(activationSpec.getDestination());
}
else {
throw new ResourceException("Unknown destination type: "
+ activationSpec.getDestinationType());
}
}
}