in client/src/main/java/org/apache/qpid/client/AMQSession.java [347:463]
protected AMQSession(AMQConnection con,
int channelId,
boolean transacted,
int acknowledgeMode,
int defaultPrefetchHighMark,
int defaultPrefetchLowMark)
{
_useAMQPEncodedMapMessage = con == null || !con.isUseLegacyMapMessageFormat();
_useAMQPEncodedStreamMessage = con != null && !con.isUseLegacyStreamMessageFormat();
_strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
_strictAMQPFATAL =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
_immediatePrefetch =
_strictAMQP
|| Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
_dispatcherShutdownTimeoutMs = Integer.parseInt(System.getProperty(DISPATCHER_SHUTDOWN_TIMEOUT_MS, DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT));
_connection = con;
_transacted = transacted;
if (transacted)
{
_acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
}
else
{
_acknowledgeMode = acknowledgeMode;
}
_messageEncryptionHelper = new MessageEncryptionHelper(this);
_channelId = channelId;
_messageFactoryRegistry = MessageFactoryRegistry.newDefaultRegistry(this);
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_prefetchHighMark = defaultPrefetchHighMark;
_prefetchLowMark = defaultPrefetchLowMark == defaultPrefetchHighMark && defaultPrefetchHighMark > 0
? Math.max(defaultPrefetchHighMark / 2, 1)
: defaultPrefetchLowMark;
// we coalesce suspend jobs using single threaded pool executor with queue length of one
// and discarding policy
_flowControlNoAckTaskPool = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1),
new ThreadFactory()
{
@Override
public Thread newThread(final Runnable r)
{
Thread thread = new Thread(r, "Connection_" + _connection.getConnectionNumber() + "_session_" + _channelId);
if (!thread.isDaemon())
{
thread.setDaemon(true);
}
return thread;
}
}, new ThreadPoolExecutor.DiscardPolicy());
final FlowControllingBlockingQueue.ThresholdListener listener =
new FlowControllingBlockingQueue.ThresholdListener()
{
private final AtomicBoolean _suspendState = new AtomicBoolean();
public void aboveThreshold(int currentValue)
{
if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
// Only execute change if previous state was false
if (!_suspendState.getAndSet(true))
{
_logger.debug(
"Above threshold ({}) so suspending channel. Current value is {}",
_prefetchHighMark,
currentValue);
doSuspend();
}
}
}
public void underThreshold(int currentValue)
{
if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
{
// Only execute change if previous state was true
if (_suspendState.getAndSet(false))
{
_logger.debug(
"Below threshold ({}) so unsuspending channel. Current value is {}",
_prefetchLowMark,
currentValue);
doSuspend();
}
}
}
private void doSuspend()
{
_flowControlNoAckTaskPool.execute(new SuspenderRunner(_suspendState));
}
};
_queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, _prefetchLowMark, listener);
}
else
{
_prefetchHighMark = defaultPrefetchHighMark;
_prefetchLowMark = defaultPrefetchLowMark;
_flowControlNoAckTaskPool = null;
_queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, null);
}
// Add creation logging to tie in with the existing close logging
if (_logger.isDebugEnabled())
{
_logger.debug("Created session:" + this);
}
}