protected abstract AsyncConnection createAsyncConnection()

in axis-rt-transport-jms/src/main/java/org/apache/axis/transport/jms/JMSConnector.java [765:964]


    protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
                                                             javax.jms.Connection connection,
                                                             String threadName,
                                                             String clientID,
                                                             String username,
                                                             String password)

        throws JMSException;

    protected abstract class AsyncConnection extends Connection
    {
        HashMap m_subscriptions;
        Object m_subscriptionLock;

        protected AsyncConnection(ConnectionFactory connectionFactory,
                                  javax.jms.Connection connection,
                                  String threadName,
                                  String clientID,
                                  String username,
                                  String password)
            throws JMSException
        {
            super(connectionFactory, connection, threadName,
                  clientID, username, password);
            m_subscriptions = new HashMap();
            m_subscriptionLock = new Object();
        }

        protected abstract ListenerSession createListenerSession(
                                                javax.jms.Connection connection,
                                                Subscription subscription)
            throws Exception;

        protected void onShutdown()
        {
            synchronized(m_subscriptionLock)
            {
                Iterator subscriptions = m_subscriptions.keySet().iterator();
                while(subscriptions.hasNext())
                {
                    Subscription subscription = (Subscription)subscriptions.next();
                    ListenerSession session  = (ListenerSession)
                                                m_subscriptions.get(subscription);
                    if(session != null)
                    {
                        session.cleanup();
                    }

                }
                m_subscriptions.clear();
            }
        }

        /**
         * @todo add in security exception propagation
         * @param subscription
         */
        void subscribe(Subscription subscription)
            throws Exception
        {
            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
            synchronized(m_subscriptionLock)
            {
                if(m_subscriptions.containsKey(subscription))
                    return;
                while(true)
                {
                    if(System.currentTimeMillis() > timeoutTime)
                    {
                        throw new InvokeTimeoutException("Cannot subscribe listener");
                    }

                    try
                    {
                        ListenerSession session = createListenerSession(m_connection,
                                                                        subscription);
                        m_subscriptions.put(subscription, session);
                        break;
                    }
                    catch(JMSException jmse)
                    {
                        if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION))
                        {
                            throw jmse;
                        }

                        try{m_subscriptionLock.wait(m_interactRetryInterval);}
                        catch(InterruptedException ignore){}
                        //give reconnect a chance
                        Thread.yield();
                        continue;
                    }
                    catch(NullPointerException jmse)
                    {
                        //we ARE reconnecting
                        try{m_subscriptionLock.wait(m_interactRetryInterval);}
                        catch(InterruptedException ignore){}
                        //give reconnect a chance
                        Thread.yield();
                        continue;
                    }
                }
            }
        }

        void unsubscribe(Subscription subscription)
        {
            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
            synchronized(m_subscriptionLock)
            {
                if(!m_subscriptions.containsKey(subscription))
                    return;
                while(true)
                {
                    if(System.currentTimeMillis() > timeoutTime)
                    {
                        throw new InvokeTimeoutException("Cannot unsubscribe listener");
                    }

                    //give reconnect a chance
                    Thread.yield();
                    try
                    {
                        ListenerSession session = (ListenerSession)
                                                m_subscriptions.get(subscription);
                        session.cleanup();
                        m_subscriptions.remove(subscription);
                        break;
                    }
                    catch(NullPointerException jmse)
                    {
                        //we are reconnecting
                        try{m_subscriptionLock.wait(m_interactRetryInterval);}
                        catch(InterruptedException ignore){}
                        continue;
                    }
                }
            }
        }

        protected void onConnect()
            throws Exception
        {
            synchronized(m_subscriptionLock)
            {
                Iterator subscriptions = m_subscriptions.keySet().iterator();
                while(subscriptions.hasNext())
                {
                    Subscription subscription = (Subscription)subscriptions.next();

                    if(m_subscriptions.get(subscription) == null)
                    {
                        m_subscriptions.put(subscription,
                            createListenerSession(m_connection, subscription));
                    }
                }
                m_subscriptionLock.notifyAll();
            }
        }

        protected void onException()
        {
            synchronized(m_subscriptionLock)
            {
                Iterator subscriptions = m_subscriptions.keySet().iterator();
                while(subscriptions.hasNext())
                {
                    Subscription subscription = (Subscription)subscriptions.next();
                    m_subscriptions.put(subscription, null);
                }
            }
        }



        protected class ListenerSession extends ConnectorSession
        {
            protected MessageConsumer m_consumer;
            protected Subscription    m_subscription;

            ListenerSession(Session session,
                            MessageConsumer consumer,
                            Subscription subscription)
                throws Exception
            {
                super(session);
                m_subscription = subscription;
                m_consumer = consumer;
                Destination destination = subscription.m_endpoint.getDestination(m_session);
                m_consumer.setMessageListener(subscription.m_listener);
            }

            void cleanup()
            {
                try{m_consumer.close();}catch(Exception ignore){}
                try{m_session.close();}catch(Exception ignore){}
            }

        }
    }