private void ensureStreamDecodedTo()

in protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java [672:741]


    private void ensureStreamDecodedTo(StreamState desiredState) throws ClientException {
        checkClosedOrAborted();

        while (currentState.ordinal() < desiredState.ordinal()) {
            try {
                final StreamTypeDecoder<?> decoder;
                try {
                    decoder = protonDecoder.readNextTypeDecoder(deliveryStream, decoderState);
                } catch (DecodeEOFException eof) {
                    currentState = StreamState.FOOTER_READ;
                    break;
                }

                final Class<?> typeClass = decoder.getTypeClass();

                if (typeClass == Header.class) {
                    header = (Header) decoder.readValue(deliveryStream, decoderState);
                    currentState = StreamState.HEADER_READ;
                } else if (typeClass == DeliveryAnnotations.class) {
                    deliveryAnnotations = (DeliveryAnnotations) decoder.readValue(deliveryStream, decoderState);
                    currentState = StreamState.DELIVERY_ANNOTATIONS_READ;
                } else if (typeClass == MessageAnnotations.class) {
                    annotations = (MessageAnnotations) decoder.readValue(deliveryStream, decoderState);
                    currentState = StreamState.MESSAGE_ANNOTATIONS_READ;
                } else if (typeClass == Properties.class) {
                    properties = (Properties) decoder.readValue(deliveryStream, decoderState);
                    currentState = StreamState.PROPERTIES_READ;
                } else if (typeClass == ApplicationProperties.class) {
                    applicationProperties = (ApplicationProperties) decoder.readValue(deliveryStream, decoderState);
                    currentState = StreamState.APPLICATION_PROPERTIES_READ;
                } else if (typeClass == AmqpSequence.class) {
                    currentState = StreamState.BODY_READABLE;
                    if (bodyStream == null) {
                        bodyStream = new AmqpSequenceInputStream(deliveryStream);
                    }
                } else if (typeClass == AmqpValue.class) {
                    currentState = StreamState.BODY_READABLE;
                    if (bodyStream == null) {
                        bodyStream = new AmqpValueInputStream(deliveryStream);
                    }
                } else if (typeClass == Data.class) {
                    currentState = StreamState.BODY_READABLE;
                    if (bodyStream == null) {
                        bodyStream = new DataSectionInputStream(deliveryStream);
                    }
                } else if (typeClass == Footer.class) {
                    footer = (Footer) decoder.readValue(deliveryStream, decoderState);
                    currentState = StreamState.FOOTER_READ;
                } else {
                    throw new ClientMessageFormatViolationException("Incoming message carries unknown Section");
                }
            } catch (ClientMessageFormatViolationException | DecodeException ex) {
                currentState = StreamState.DECODE_ERROR;
                if (deliveryStream != null) {
                    try {
                        deliveryStream.close();
                    } catch (IOException e) {
                    }
                }

                // TODO: At the moment there is no automatic rejection or release etc
                //       of the delivery.  The user is expected to apply a disposition in
                //       response to this error that initiates the desired outcome.  We
                //       could look to add auto settlement with a configured outcome in
                //       the future.

                throw ClientExceptionSupport.createNonFatalOrPassthrough(ex);
            }
        }
    }