in axis-rt-transport-jms/src/main/java/org/apache/axis/transport/jms/JMSConnector.java [765:964]
protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
javax.jms.Connection connection,
String threadName,
String clientID,
String username,
String password)
throws JMSException;
protected abstract class AsyncConnection extends Connection
{
HashMap m_subscriptions;
Object m_subscriptionLock;
protected AsyncConnection(ConnectionFactory connectionFactory,
javax.jms.Connection connection,
String threadName,
String clientID,
String username,
String password)
throws JMSException
{
super(connectionFactory, connection, threadName,
clientID, username, password);
m_subscriptions = new HashMap();
m_subscriptionLock = new Object();
}
protected abstract ListenerSession createListenerSession(
javax.jms.Connection connection,
Subscription subscription)
throws Exception;
protected void onShutdown()
{
synchronized(m_subscriptionLock)
{
Iterator subscriptions = m_subscriptions.keySet().iterator();
while(subscriptions.hasNext())
{
Subscription subscription = (Subscription)subscriptions.next();
ListenerSession session = (ListenerSession)
m_subscriptions.get(subscription);
if(session != null)
{
session.cleanup();
}
}
m_subscriptions.clear();
}
}
/**
* @todo add in security exception propagation
* @param subscription
*/
void subscribe(Subscription subscription)
throws Exception
{
long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
synchronized(m_subscriptionLock)
{
if(m_subscriptions.containsKey(subscription))
return;
while(true)
{
if(System.currentTimeMillis() > timeoutTime)
{
throw new InvokeTimeoutException("Cannot subscribe listener");
}
try
{
ListenerSession session = createListenerSession(m_connection,
subscription);
m_subscriptions.put(subscription, session);
break;
}
catch(JMSException jmse)
{
if(!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION))
{
throw jmse;
}
try{m_subscriptionLock.wait(m_interactRetryInterval);}
catch(InterruptedException ignore){}
//give reconnect a chance
Thread.yield();
continue;
}
catch(NullPointerException jmse)
{
//we ARE reconnecting
try{m_subscriptionLock.wait(m_interactRetryInterval);}
catch(InterruptedException ignore){}
//give reconnect a chance
Thread.yield();
continue;
}
}
}
}
void unsubscribe(Subscription subscription)
{
long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
synchronized(m_subscriptionLock)
{
if(!m_subscriptions.containsKey(subscription))
return;
while(true)
{
if(System.currentTimeMillis() > timeoutTime)
{
throw new InvokeTimeoutException("Cannot unsubscribe listener");
}
//give reconnect a chance
Thread.yield();
try
{
ListenerSession session = (ListenerSession)
m_subscriptions.get(subscription);
session.cleanup();
m_subscriptions.remove(subscription);
break;
}
catch(NullPointerException jmse)
{
//we are reconnecting
try{m_subscriptionLock.wait(m_interactRetryInterval);}
catch(InterruptedException ignore){}
continue;
}
}
}
}
protected void onConnect()
throws Exception
{
synchronized(m_subscriptionLock)
{
Iterator subscriptions = m_subscriptions.keySet().iterator();
while(subscriptions.hasNext())
{
Subscription subscription = (Subscription)subscriptions.next();
if(m_subscriptions.get(subscription) == null)
{
m_subscriptions.put(subscription,
createListenerSession(m_connection, subscription));
}
}
m_subscriptionLock.notifyAll();
}
}
protected void onException()
{
synchronized(m_subscriptionLock)
{
Iterator subscriptions = m_subscriptions.keySet().iterator();
while(subscriptions.hasNext())
{
Subscription subscription = (Subscription)subscriptions.next();
m_subscriptions.put(subscription, null);
}
}
}
protected class ListenerSession extends ConnectorSession
{
protected MessageConsumer m_consumer;
protected Subscription m_subscription;
ListenerSession(Session session,
MessageConsumer consumer,
Subscription subscription)
throws Exception
{
super(session);
m_subscription = subscription;
m_consumer = consumer;
Destination destination = subscription.m_endpoint.getDestination(m_session);
m_consumer.setMessageListener(subscription.m_listener);
}
void cleanup()
{
try{m_consumer.close();}catch(Exception ignore){}
try{m_session.close();}catch(Exception ignore){}
}
}
}