protected AMQSession()

in client/src/main/java/org/apache/qpid/client/AMQSession.java [347:463]


    protected AMQSession(AMQConnection con,
                         int channelId,
                         boolean transacted,
                         int acknowledgeMode,
                         int defaultPrefetchHighMark,
                         int defaultPrefetchLowMark)
    {
        _useAMQPEncodedMapMessage = con == null || !con.isUseLegacyMapMessageFormat();
        _useAMQPEncodedStreamMessage = con != null && !con.isUseLegacyStreamMessageFormat();
        _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
        _strictAMQPFATAL =
                Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
        _immediatePrefetch =
                _strictAMQP
                || Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
        _dispatcherShutdownTimeoutMs = Integer.parseInt(System.getProperty(DISPATCHER_SHUTDOWN_TIMEOUT_MS, DISPATCHER_SHUTDOWN_TIMEOUT_MS_DEFAULT));

        _connection = con;
        _transacted = transacted;
        if (transacted)
        {
            _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
        }
        else
        {
            _acknowledgeMode = acknowledgeMode;
        }
        _messageEncryptionHelper = new MessageEncryptionHelper(this);
        _channelId = channelId;
        _messageFactoryRegistry = MessageFactoryRegistry.newDefaultRegistry(this);

        if (_acknowledgeMode == NO_ACKNOWLEDGE)
        {
            _prefetchHighMark = defaultPrefetchHighMark;
            _prefetchLowMark = defaultPrefetchLowMark == defaultPrefetchHighMark && defaultPrefetchHighMark > 0
                   ? Math.max(defaultPrefetchHighMark / 2, 1)
                    : defaultPrefetchLowMark;

            // we coalesce suspend jobs using single threaded pool executor with queue length of one
            // and discarding policy
            _flowControlNoAckTaskPool = new ThreadPoolExecutor(1, 1,
                                                               0L, TimeUnit.MILLISECONDS,
                                                               new LinkedBlockingQueue<Runnable>(1),
                                                               new ThreadFactory()
            {
                @Override
                public Thread newThread(final Runnable r)
                {
                    Thread thread = new Thread(r, "Connection_" + _connection.getConnectionNumber() + "_session_" + _channelId);
                    if (!thread.isDaemon())
                    {
                        thread.setDaemon(true);
                    }

                    return thread;
                }
            }, new ThreadPoolExecutor.DiscardPolicy());

            final FlowControllingBlockingQueue.ThresholdListener listener =
                    new FlowControllingBlockingQueue.ThresholdListener()
                    {
                        private final AtomicBoolean _suspendState = new AtomicBoolean();

                        public void aboveThreshold(int currentValue)
                        {
                            if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                            {
                                // Only execute change if previous state was false
                                if (!_suspendState.getAndSet(true))
                                {
                                    _logger.debug(
                                            "Above threshold ({}) so suspending channel. Current value is {}",
                                            _prefetchHighMark,
                                            currentValue);

                                    doSuspend();
                                }
                            }
                        }

                        public void underThreshold(int currentValue)
                        {
                            if (!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
                            {
                                // Only execute change if previous state was true
                                if (_suspendState.getAndSet(false))
                                {
                                    _logger.debug(
                                            "Below threshold ({}) so unsuspending channel. Current value is {}",
                                            _prefetchLowMark,
                                            currentValue);
                                    doSuspend();
                                }
                            }
                        }

                        private void doSuspend()
                        {
                            _flowControlNoAckTaskPool.execute(new SuspenderRunner(_suspendState));
                        }
                    };
            _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, _prefetchLowMark, listener);
        }
        else
        {
            _prefetchHighMark = defaultPrefetchHighMark;
            _prefetchLowMark = defaultPrefetchLowMark;
            _flowControlNoAckTaskPool = null;
            _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, null);
        }

        // Add creation logging to tie in with the existing close logging
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Created session:" + this);
        }
    }