in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java [955:1049]
private void processUpdates() {
try {
Event protonEvent = null;
while ((protonEvent = protonCollector.peek()) != null) {
if (!protonEvent.getType().equals(Type.TRANSPORT)) {
LOG.trace("New Proton Event: {}", protonEvent.getType());
}
AmqpEventSink amqpEventSink = null;
switch (protonEvent.getType()) {
case CONNECTION_REMOTE_CLOSE:
amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
if (amqpEventSink != null) {
amqpEventSink.processRemoteClose(this);
}
break;
case CONNECTION_REMOTE_OPEN:
amqpEventSink = (AmqpEventSink) protonEvent.getConnection().getContext();
if (amqpEventSink != null) {
amqpEventSink.processRemoteOpen(this);
}
break;
case SESSION_REMOTE_CLOSE:
amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
if (amqpEventSink != null) {
amqpEventSink.processRemoteClose(this);
}
break;
case SESSION_REMOTE_OPEN:
amqpEventSink = (AmqpEventSink) protonEvent.getSession().getContext();
if (amqpEventSink != null) {
amqpEventSink.processRemoteOpen(this);
}
break;
case LINK_REMOTE_CLOSE:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
if (amqpEventSink != null) {
amqpEventSink.processRemoteClose(this);
}
break;
case LINK_REMOTE_DETACH:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
if (amqpEventSink != null) {
amqpEventSink.processRemoteDetach(this);
}
break;
case LINK_REMOTE_OPEN:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
if (amqpEventSink != null) {
amqpEventSink.processRemoteOpen(this);
}
break;
case LINK_FLOW:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
if (amqpEventSink != null) {
amqpEventSink.processFlowUpdates(this);
}
break;
case DELIVERY:
amqpEventSink = (AmqpEventSink) protonEvent.getLink().getContext();
if (amqpEventSink != null) {
amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext());
}
break;
case TRANSPORT_ERROR:
// We handle authentication failure elsewhere, but in doing so we close the transport
// head which would also get us here, so only action this if auth succeeded.
if (authenticator == null || (authenticator.isComplete() && authenticator.wasSuccessful())) {
protonTransportErrorHandled = true;
ErrorCondition transportCondition = protonTransport.getCondition();
String message = extractTransportErrorMessage(transportCondition);
// Transport has failed, ensure that we see local end of connection as closed
// so other shutdown processing doesn't mistakenly assume we can still get a
// close from the remote.
protonConnection.setCondition(transportCondition);
protonConnection.close();
throw new ProviderFailedException(message);
}
break;
default:
break;
}
protonCollector.pop();
}
} catch (Throwable t) {
try {
LOG.warn("Caught problem during update processing: {}", t.getMessage(), t);
} finally {
fireProviderException(ProviderExceptionSupport.createOrPassthroughFatal(t));
}
}
}