in activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java [53:166]
public void handle(IOException exception) {
if (!broker.isStarted() || ignoreAllErrors) {
allowIOResumption();
LOG.info("Ignoring IO exception, " + exception, exception);
return;
}
if (ignoreNoSpaceErrors) {
Throwable cause = exception;
while (cause != null && cause instanceof IOException) {
String message = cause.getMessage();
if (message != null && message.contains(noSpaceMessage)) {
LOG.info("Ignoring no space left exception, " + exception, exception);
allowIOResumption();
return;
}
cause = cause.getCause();
}
}
if (ignoreSQLExceptions) {
Throwable cause = exception;
while (cause != null) {
if (cause instanceof SQLException) {
String message = cause.getMessage();
if (message == null) {
message = "";
}
if (message.contains(sqlExceptionMessage)) {
LOG.info("Ignoring SQLException, " + exception, cause);
return;
}
}
cause = cause.getCause();
}
}
if (stopStartConnectors) {
if (handlingException.compareAndSet(false, true)) {
LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception);
new Thread("IOExceptionHandler: stop transports") {
@Override
public void run() {
try {
ServiceStopper stopper = new ServiceStopper();
broker.stopAllConnectors(stopper);
LOG.info("Successfully stopped transports on " + broker);
} catch (Exception e) {
LOG.warn("Failure occurred while stopping broker connectors", e);
} finally {
// resume again
new Thread("IOExceptionHandler: restart transports") {
@Override
public void run() {
try {
allowIOResumption();
while (hasLockOwnership() && isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
if (hasLockOwnership()) {
Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap();
for (Destination destination : destinations.values()) {
if (destination instanceof Queue) {
Queue queue = (Queue)destination;
if (queue.isResetNeeded()) {
queue.clearPendingMessages(0);
}
}
}
broker.startAllConnectors();
LOG.info("Successfully restarted transports on " + broker);
}
} catch (Exception e) {
LOG.warn("Stopping " + broker + " due to failure restarting transports", e);
stopBroker(e);
} finally {
handlingException.compareAndSet(true, false);
}
}
private boolean isPersistenceAdapterDown() {
boolean checkpointSuccess = false;
try {
broker.getPersistenceAdapter().checkpoint(true);
checkpointSuccess = true;
} catch (Throwable ignored) {
}
return !checkpointSuccess;
}
}.start();
}
}
}.start();
}
throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception);
}
if (handlingException.compareAndSet(false, true)) {
stopBroker(exception);
}
// we don't want to propagate the exception back to the client
// They will see a delay till they see a disconnect via socket.close
// at which point failover: can kick in.
throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
}