private ClientStreamReceiverMessage EnsureStreamDecodedTo()

in src/Proton.Client/Client/Implementation/ClientStreamReceiverMessage.cs [488:592]


      private ClientStreamReceiverMessage EnsureStreamDecodedTo(StreamState desiredState)
      {
         CheckClosedOrAborted();

         while (currentState < desiredState)
         {
            try
            {
               IStreamTypeDecoder decoder;
               try
               {
                  decoder = protonDecoder.ReadNextTypeDecoder(deliveryStream, decoderState);
               }
               catch (DecodeEOFException)
               {
                  currentState = StreamState.FOOTER_READ;
                  break;
               }

               Type typeClass = decoder.DecodesType;
               if (typeClass == typeof(Header))
               {
                  header = (Header)decoder.ReadValue(deliveryStream, decoderState);
                  currentState = StreamState.HEADER_READ;
               }
               else if (typeClass == typeof(DeliveryAnnotations))
               {
                  deliveryAnnotations = (DeliveryAnnotations)decoder.ReadValue(deliveryStream, decoderState);
                  currentState = StreamState.DELIVERY_ANNOTATIONS_READ;
               }
               else if (typeClass == typeof(MessageAnnotations))
               {
                  annotations = (MessageAnnotations)decoder.ReadValue(deliveryStream, decoderState);
                  currentState = StreamState.MESSAGE_ANNOTATIONS_READ;
               }
               else if (typeClass == typeof(Properties))
               {
                  properties = (Properties)decoder.ReadValue(deliveryStream, decoderState);
                  currentState = StreamState.PROPERTIES_READ;
               }
               else if (typeClass == typeof(ApplicationProperties))
               {
                  applicationProperties = (ApplicationProperties)decoder.ReadValue(deliveryStream, decoderState);
                  currentState = StreamState.APPLICATION_PROPERTIES_READ;
               }
               else if (typeClass == typeof(AmqpSequence))
               {
                  currentState = StreamState.BODY_READABLE;
                  if (bodyStream == null)
                  {
                     bodyStream = new AmqpSequenceInputStream(this);
                  }
               }
               else if (typeClass == typeof(AmqpValue))
               {
                  currentState = StreamState.BODY_READABLE;
                  if (bodyStream == null)
                  {
                     bodyStream = new AmqpValueInputStream(this);
                  }
               }
               else if (typeClass == typeof(Data))
               {
                  currentState = StreamState.BODY_READABLE;
                  if (bodyStream == null)
                  {
                     bodyStream = new DataSectionInputStream(this);
                  }
               }
               else if (typeClass == typeof(Footer))
               {
                  footer = (Footer)decoder.ReadValue(deliveryStream, decoderState);
                  currentState = StreamState.FOOTER_READ;
               }
               else
               {
                  throw new ClientMessageFormatViolationException("Incoming message carries unknown Section");
               }
            }
            catch (Exception ex) when (ex is ClientMessageFormatViolationException or DecodeException)
            {
               currentState = StreamState.DECODE_ERROR;
               if (deliveryStream != null)
               {
                  try
                  {
                     deliveryStream.Close();
                  }
                  catch (IOException)
                  {
                  }
               }

               // TODO: At the moment there is no automatic rejection or release etc
               //       of the delivery.  The user is expected to apply a disposition in
               //       response to this error that initiates the desired outcome.  We
               //       could look to add auto settlement with a configured outcome in
               //       the future.

               throw ClientExceptionSupport.CreateNonFatalOrPassthrough(ex);
            }
         }

         return this;
      }