private void processUpdates()

in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java [955:1049]


    private void processUpdates() {
        try {
            Event protonEvent = null;
            while ((protonEvent = protonCollector.peek()) != null) {
                if (!protonEvent.getType().equals(Type.TRANSPORT)) {
                    LOG.trace("New Proton Event: {}", protonEvent.getType());
                }

                AmqpEventSink amqpEventSink = null;
                switch (protonEvent.getType()) {
                    case CONNECTION_REMOTE_CLOSE:
                        amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processRemoteClose(this);
                        }
                        break;
                    case CONNECTION_REMOTE_OPEN:
                        amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processRemoteOpen(this);
                        }
                        break;
                    case SESSION_REMOTE_CLOSE:
                        amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processRemoteClose(this);
                        }
                        break;
                    case SESSION_REMOTE_OPEN:
                        amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processRemoteOpen(this);
                        }
                        break;
                    case LINK_REMOTE_CLOSE:
                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processRemoteClose(this);
                        }
                        break;
                    case LINK_REMOTE_DETACH:
                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processRemoteDetach(this);
                        }
                        break;
                    case LINK_REMOTE_OPEN:
                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processRemoteOpen(this);
                        }
                        break;
                    case LINK_FLOW:
                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processFlowUpdates(this);
                        }
                        break;
                    case DELIVERY:
                        amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
                        if (amqpEventSink != null) {
                            amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
                        }
                        break;
                    case TRANSPORT_ERROR:
                        // We handle authentication failure elsewhere, but in doing so we close the transport
                        // head which would also get us here, so only action this if auth succeeded.
                        if (authenticator == null || (authenticator.isComplete() && authenticator.wasSuccessful())) {
                            protonTransportErrorHandled = true;
                            ErrorCondition transportCondition = protonTransport.getCondition();
                            String message = extractTransportErrorMessage(transportCondition);

                            // Transport has failed, ensure that we see local end of connection as closed
                            // so other shutdown processing doesn't mistakenly assume we can still get a
                            // close from the remote.
                            protonConnection.setCondition(transportCondition);
                            protonConnection.close();

                            throw new ProviderFailedException(message);
                        }
                        break;
                    default:
                        break;
                }

                protonCollector.pop();
            }
        } catch (Throwable t) {
            try {
                LOG.warn("Caught problem during update processing: {}", t.getMessage(), t);
            } finally {
                fireProviderException(ProviderExceptionSupport.createOrPassthroughFatal(t));
            }
        }
    }