public void handle()

in activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java [53:166]


    public void handle(IOException exception) {
        if (!broker.isStarted() || ignoreAllErrors) {
            allowIOResumption();
            LOG.info("Ignoring IO exception, " + exception, exception);
            return;
        }

        if (ignoreNoSpaceErrors) {
            Throwable cause = exception;
            while (cause != null && cause instanceof IOException) {
                String message = cause.getMessage();
                if (message != null && message.contains(noSpaceMessage)) {
                    LOG.info("Ignoring no space left exception, " + exception, exception);
                    allowIOResumption();
                    return;
                }
                cause = cause.getCause();
            }
        }

        if (ignoreSQLExceptions) {
            Throwable cause = exception;
            while (cause != null) {
                if (cause instanceof SQLException) {
                    String message = cause.getMessage();

                    if (message == null) {
                        message = "";
                    }

                    if (message.contains(sqlExceptionMessage)) {
                        LOG.info("Ignoring SQLException, " + exception, cause);
                        return;
                    }
                }
                cause = cause.getCause();
            }
        }

        if (stopStartConnectors) {
            if (handlingException.compareAndSet(false, true)) {
                LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception);

                new Thread("IOExceptionHandler: stop transports") {
                    @Override
                    public void run() {
                        try {
                            ServiceStopper stopper = new ServiceStopper();
                            broker.stopAllConnectors(stopper);
                            LOG.info("Successfully stopped transports on " + broker);
                        } catch (Exception e) {
                            LOG.warn("Failure occurred while stopping broker connectors", e);
                        } finally {
                            // resume again
                            new Thread("IOExceptionHandler: restart transports") {
                                @Override
                                public void run() {
                                    try {
                                        allowIOResumption();
                                        while (hasLockOwnership() && isPersistenceAdapterDown()) {
                                            LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
                                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
                                        }
                                        if (hasLockOwnership()) {
                                            Map<ActiveMQDestination, Destination> destinations = ((RegionBroker)broker.getRegionBroker()).getDestinationMap();
                                            for (Destination destination : destinations.values()) {

                                                if (destination instanceof Queue) {
                                                    Queue queue = (Queue)destination;
                                                    if (queue.isResetNeeded()) {
                                                        queue.clearPendingMessages(0);
                                                    }
                                                }
                                            }
                                            broker.startAllConnectors();
                                            LOG.info("Successfully restarted transports on " + broker);
                                        }
                                    } catch (Exception e) {
                                        LOG.warn("Stopping " + broker + " due to failure restarting transports", e);
                                        stopBroker(e);
                                    } finally {
                                        handlingException.compareAndSet(true, false);
                                    }
                                }

                                private boolean isPersistenceAdapterDown() {
                                    boolean checkpointSuccess = false;
                                    try {
                                        broker.getPersistenceAdapter().checkpoint(true);
                                        checkpointSuccess = true;
                                    } catch (Throwable ignored) {
                                    }
                                    return !checkpointSuccess;
                                }
                            }.start();


                        }
                    }
                }.start();
            }

            throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception);
        }

        if (handlingException.compareAndSet(false, true)) {
            stopBroker(exception);
        }

        // we don't want to propagate the exception back to the client
        // They will see a delay till they see a disconnect via socket.close
        // at which point failover: can kick in.
        throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
    }