in protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java [672:741]
private void ensureStreamDecodedTo(StreamState desiredState) throws ClientException {
checkClosedOrAborted();
while (currentState.ordinal() < desiredState.ordinal()) {
try {
final StreamTypeDecoder<?> decoder;
try {
decoder = protonDecoder.readNextTypeDecoder(deliveryStream, decoderState);
} catch (DecodeEOFException eof) {
currentState = StreamState.FOOTER_READ;
break;
}
final Class<?> typeClass = decoder.getTypeClass();
if (typeClass == Header.class) {
header = (Header) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.HEADER_READ;
} else if (typeClass == DeliveryAnnotations.class) {
deliveryAnnotations = (DeliveryAnnotations) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.DELIVERY_ANNOTATIONS_READ;
} else if (typeClass == MessageAnnotations.class) {
annotations = (MessageAnnotations) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.MESSAGE_ANNOTATIONS_READ;
} else if (typeClass == Properties.class) {
properties = (Properties) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.PROPERTIES_READ;
} else if (typeClass == ApplicationProperties.class) {
applicationProperties = (ApplicationProperties) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.APPLICATION_PROPERTIES_READ;
} else if (typeClass == AmqpSequence.class) {
currentState = StreamState.BODY_READABLE;
if (bodyStream == null) {
bodyStream = new AmqpSequenceInputStream(deliveryStream);
}
} else if (typeClass == AmqpValue.class) {
currentState = StreamState.BODY_READABLE;
if (bodyStream == null) {
bodyStream = new AmqpValueInputStream(deliveryStream);
}
} else if (typeClass == Data.class) {
currentState = StreamState.BODY_READABLE;
if (bodyStream == null) {
bodyStream = new DataSectionInputStream(deliveryStream);
}
} else if (typeClass == Footer.class) {
footer = (Footer) decoder.readValue(deliveryStream, decoderState);
currentState = StreamState.FOOTER_READ;
} else {
throw new ClientMessageFormatViolationException("Incoming message carries unknown Section");
}
} catch (ClientMessageFormatViolationException | DecodeException ex) {
currentState = StreamState.DECODE_ERROR;
if (deliveryStream != null) {
try {
deliveryStream.close();
} catch (IOException e) {
}
}
// TODO: At the moment there is no automatic rejection or release etc
// of the delivery. The user is expected to apply a disposition in
// response to this error that initiates the desired outcome. We
// could look to add auto settlement with a configured outcome in
// the future.
throw ClientExceptionSupport.createNonFatalOrPassthrough(ex);
}
}
}