in client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java [325:424]
public void closed(Connection conn)
{
final ConnectionException exc = exception;
exception = null;
if (exc == null)
{
return;
}
final ConnectionClose close = exc.getClose();
if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED)
{
final CountDownLatch failoverLatch = new CountDownLatch(1);
_conn.getProtocolHandler().setFailoverLatch(failoverLatch);
final AtomicBoolean failoverDone = new AtomicBoolean();
try
{
_qpidConnection.notifyFailoverRequired();
_conn.doWithAllLocks(new Runnable()
{
@Override
public void run()
{
try
{
boolean preFailover = _conn.firePreFailover(false);
if (preFailover)
{
boolean reconnected;
if (exc instanceof RedirectConnectionException)
{
RedirectConnectionException redirect = (RedirectConnectionException) exc;
reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
}
else
{
reconnected = _conn.attemptReconnection();
}
if (reconnected)
{
failoverPrep();
_conn.resubscribeSessions();
_conn.fireFailoverComplete();
failoverDone.set(true);
}
}
}
catch (Exception e)
{
_logger.error("error during failover", e);
}
}
});
}
finally
{
failoverLatch.countDown();
_conn.getProtocolHandler().setFailoverLatch(null);
}
if (failoverDone.get())
{
return;
}
}
for(AMQSession<?,?> session : _conn.getSessions().values())
{
session.markClosed();
}
_conn.setClosed();
final ExceptionListener listener = _conn.getExceptionListenerNoCheck();
if (listener == null)
{
_logger.error("connection exception: " + conn, exc);
}
else
{
_conn.performConnectionTask(new Runnable()
{
@Override
public void run()
{
String code = null;
if (close != null)
{
code = close.getReplyCode().toString();
}
listener.onException(JMSExceptionHelper.chainJMSException(new JMSException(exc.getMessage(), code),
exc));
}
});
}
}