in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java [85:140]
protected JmsMessageConsumer(JmsConsumerId consumerId, JmsSession session, JmsDestination destination,
String name, String selector, boolean noLocal) throws JMSException {
this.session = session;
this.connection = session.getConnection();
this.tracer = connection.getTracer();
this.address = destination.getAddress();
this.acknowledgementMode = isBrowser() ? Session.AUTO_ACKNOWLEDGE : session.acknowledgementMode();
if (destination.isTemporary()) {
connection.checkConsumeFromTemporaryDestination((JmsTemporaryDestination) destination);
}
JmsPrefetchPolicy prefetchPolicy = session.getPrefetchPolicy();
JmsRedeliveryPolicy redeliveryPolicy = session.getRedeliveryPolicy().copy();
JmsDeserializationPolicy deserializationPolicy = session.getDeserializationPolicy().copy();
int configuredPrefetch = prefetchPolicy.getConfiguredPrefetch(session, destination, isDurableSubscription(), isBrowser());
if (connection.isLocalMessagePriority()) {
this.messageQueue = new PriorityMessageQueue();
} else {
this.messageQueue = new FifoMessageQueue(configuredPrefetch);
}
consumerInfo = new JmsConsumerInfo(consumerId, this);
consumerInfo.setExplicitClientID(connection.isExplicitClientID());
consumerInfo.setSelector(selector);
consumerInfo.setDurable(isDurableSubscription());
consumerInfo.setSubscriptionName(name);
consumerInfo.setShared(isSharedSubscription());
consumerInfo.setDestination(destination);
consumerInfo.setAcknowledgementMode(acknowledgementMode);
consumerInfo.setNoLocal(noLocal);
consumerInfo.setBrowser(isBrowser());
consumerInfo.setPrefetchSize(configuredPrefetch);
consumerInfo.setRedeliveryPolicy(redeliveryPolicy);
consumerInfo.setLocalMessageExpiry(connection.isLocalMessageExpiry());
consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(session, destination));
consumerInfo.setDeserializationPolicy(deserializationPolicy);
session.getConnection().createResource(consumerInfo, new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
session.add(JmsMessageConsumer.this);
}
@Override
public void onPendingFailure(ProviderException cause) {
}
});
if (session.isStarted()) {
start();
}
}