in src/Proton.Client/Client/Implementation/ClientStreamReceiverMessage.cs [488:592]
private ClientStreamReceiverMessage EnsureStreamDecodedTo(StreamState desiredState)
{
CheckClosedOrAborted();
while (currentState < desiredState)
{
try
{
IStreamTypeDecoder decoder;
try
{
decoder = protonDecoder.ReadNextTypeDecoder(deliveryStream, decoderState);
}
catch (DecodeEOFException)
{
currentState = StreamState.FOOTER_READ;
break;
}
Type typeClass = decoder.DecodesType;
if (typeClass == typeof(Header))
{
header = (Header)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.HEADER_READ;
}
else if (typeClass == typeof(DeliveryAnnotations))
{
deliveryAnnotations = (DeliveryAnnotations)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.DELIVERY_ANNOTATIONS_READ;
}
else if (typeClass == typeof(MessageAnnotations))
{
annotations = (MessageAnnotations)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.MESSAGE_ANNOTATIONS_READ;
}
else if (typeClass == typeof(Properties))
{
properties = (Properties)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.PROPERTIES_READ;
}
else if (typeClass == typeof(ApplicationProperties))
{
applicationProperties = (ApplicationProperties)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.APPLICATION_PROPERTIES_READ;
}
else if (typeClass == typeof(AmqpSequence))
{
currentState = StreamState.BODY_READABLE;
if (bodyStream == null)
{
bodyStream = new AmqpSequenceInputStream(this);
}
}
else if (typeClass == typeof(AmqpValue))
{
currentState = StreamState.BODY_READABLE;
if (bodyStream == null)
{
bodyStream = new AmqpValueInputStream(this);
}
}
else if (typeClass == typeof(Data))
{
currentState = StreamState.BODY_READABLE;
if (bodyStream == null)
{
bodyStream = new DataSectionInputStream(this);
}
}
else if (typeClass == typeof(Footer))
{
footer = (Footer)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.FOOTER_READ;
}
else
{
throw new ClientMessageFormatViolationException("Incoming message carries unknown Section");
}
}
catch (Exception ex) when (ex is ClientMessageFormatViolationException or DecodeException)
{
currentState = StreamState.DECODE_ERROR;
if (deliveryStream != null)
{
try
{
deliveryStream.Close();
}
catch (IOException)
{
}
}
// TODO: At the moment there is no automatic rejection or release etc
// of the delivery. The user is expected to apply a disposition in
// response to this error that initiates the desired outcome. We
// could look to add auto settlement with a configured outcome in
// the future.
throw ClientExceptionSupport.CreateNonFatalOrPassthrough(ex);
}
}
return this;
}