src/Proton.Client/Client/Implementation/ClientStreamReceiverMessage.cs (675 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using Apache.Qpid.Proton.Buffer;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Codec;
using Apache.Qpid.Proton.Codec.Decoders;
using Apache.Qpid.Proton.Codec.Decoders.Primitives;
using Apache.Qpid.Proton.Engine;
using Apache.Qpid.Proton.Logging;
using Apache.Qpid.Proton.Types;
using Apache.Qpid.Proton.Types.Messaging;
namespace Apache.Qpid.Proton.Client.Implementation
{
public sealed class ClientStreamReceiverMessage : IStreamReceiverMessage
{
private static readonly IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientStreamReceiverMessage>();
private readonly ClientStreamReceiver receiver;
private readonly ClientStreamDelivery delivery;
private readonly Stream deliveryStream;
private readonly IIncomingDelivery protonDelivery;
private readonly IStreamDecoder protonDecoder;
private readonly IStreamDecoderState decoderState;
private Header header;
private DeliveryAnnotations deliveryAnnotations;
private MessageAnnotations annotations;
private Properties properties;
private ApplicationProperties applicationProperties;
private Footer footer;
private StreamState currentState = StreamState.IDLE;
private MessageBodyInputStream bodyStream;
internal ClientStreamReceiverMessage(ClientStreamReceiver receiver, ClientStreamDelivery delivery, Stream deliveryStream)
{
this.receiver = receiver;
this.delivery = delivery;
this.deliveryStream = deliveryStream;
this.protonDelivery = delivery.ProtonDelivery;
this.protonDecoder = ProtonStreamDecoderFactory.Create();
this.decoderState = protonDecoder.NewDecoderState();
}
public IStreamDelivery Delivery => delivery;
public IStreamReceiver Receiver => receiver;
public bool Aborted => protonDelivery?.IsAborted ?? false;
public bool Completed => (!protonDelivery?.IsPartial ?? false) && (!protonDelivery?.IsAborted ?? false);
public uint MessageFormat
{
get => protonDelivery?.MessageFormat ?? 0;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiverMessage");
}
public IProtonBuffer Encode(IDictionary<string, object> deliveryAnnotations)
{
throw new ClientUnsupportedOperationException("Cannot encode from an StreamReceiverMessage instance.");
}
#region AMQP Header Access API
public Header Header
{
get => EnsureStreamDecodedTo(StreamState.HEADER_READ).header;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public bool Durable
{
get => Header?.Durable ?? Header.DEFAULT_DURABILITY;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public byte Priority
{
get => Header?.Priority ?? Header.DEFAULT_PRIORITY;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public uint TimeToLive
{
get => Header?.TimeToLive ?? Header.DEFAULT_TIME_TO_LIVE;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public bool FirstAcquirer
{
get => Header?.FirstAcquirer ?? Header.DEFAULT_FIRST_ACQUIRER;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public uint DeliveryCount
{
get => Header?.DeliveryCount ?? Header.DEFAULT_DELIVERY_COUNT;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
#endregion
#region AMQP Header Access API
public Properties Properties
{
get => EnsureStreamDecodedTo(StreamState.PROPERTIES_READ).properties;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public object MessageId
{
get => Properties?.MessageId;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public byte[] UserId
{
get
{
byte[] userId = null;
if (Properties != null && Properties.UserId != null)
{
userId = new byte[Properties.UserId.ReadableBytes];
Properties.UserId.CopyInto(Properties.UserId.ReadOffset, userId, 0, userId.LongLength);
}
return userId;
}
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public string To
{
get => Properties?.To;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public string Subject
{
get => Properties?.Subject;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public string ReplyTo
{
get => Properties?.ReplyTo;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public object CorrelationId
{
get => Properties?.CorrelationId;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public string ContentType
{
get => Properties?.ContentType;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public string ContentEncoding
{
get => Properties?.ContentEncoding;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public ulong AbsoluteExpiryTime
{
get => Properties?.AbsoluteExpiryTime ?? 0;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public ulong CreationTime
{
get => Properties?.CreationTime ?? 0;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public string GroupId
{
get => Properties?.GroupId;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public uint GroupSequence
{
get => Properties?.GroupSequence ?? 0;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public string ReplyToGroupId
{
get => Properties?.ReplyToGroupId;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
#endregion
#region Internal Delivery Annotations access for use by the managing delivery object
internal DeliveryAnnotations DeliveryAnnotations =>
EnsureStreamDecodedTo(StreamState.DELIVERY_ANNOTATIONS_READ).deliveryAnnotations;
#endregion
#region AMQP Message Annotations Access API
public MessageAnnotations Annotations
{
get => EnsureStreamDecodedTo(StreamState.MESSAGE_ANNOTATIONS_READ).annotations;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public bool HasAnnotations
{
get
{
EnsureStreamDecodedTo(StreamState.MESSAGE_ANNOTATIONS_READ);
return annotations?.Value?.Count > 0;
}
}
public IMessage<Stream> ForEachAnnotation(Action<string, object> consumer)
{
if (HasAnnotations)
{
foreach (KeyValuePair<Symbol, object> entry in annotations.Value)
{
consumer.Invoke(entry.Key.ToString(), entry.Value);
}
}
return this;
}
public object GetAnnotation(string key)
{
object result = null;
Annotations?.Value?.TryGetValue(Symbol.Lookup(key), out result);
return result;
}
public IMessage<Stream> SetAnnotation(string key, object value)
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public bool HasAnnotation(string key)
{
return Annotations?.Value?.ContainsKey(Symbol.Lookup(key)) ?? false;
}
public object RemoveAnnotation(string key)
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
#endregion
#region AMQP Application Properties Access API
public ApplicationProperties ApplicationProperties
{
get => EnsureStreamDecodedTo(StreamState.APPLICATION_PROPERTIES_READ).applicationProperties;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public bool HasProperties
{
get
{
EnsureStreamDecodedTo(StreamState.APPLICATION_PROPERTIES_READ);
return applicationProperties?.Value?.Count > 0;
}
}
public IMessage<Stream> ForEachProperty(Action<string, object> consumer)
{
if (HasProperties)
{
foreach (KeyValuePair<string, object> entry in applicationProperties.Value)
{
consumer.Invoke(entry.Key, entry.Value);
}
}
return this;
}
public object GetProperty(string key)
{
object result = null;
ApplicationProperties?.Value?.TryGetValue(key, out result);
return result;
}
public IMessage<Stream> SetProperty(string key, object value)
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public bool HasProperty(string key)
{
return ApplicationProperties?.Value?.ContainsKey(key) ?? false;
}
public object RemoveProperty(string key)
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
#endregion
#region AMQP Footer Access API
public Footer Footer
{
get => EnsureStreamDecodedTo(StreamState.FOOTER_READ).footer;
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public bool HasFooters
{
get
{
EnsureStreamDecodedTo(StreamState.BODY_READABLE);
if (currentState != StreamState.FOOTER_READ)
{
if (currentState == StreamState.DECODE_ERROR)
{
throw new ClientException("Cannot read Footer due to decoding error in message payload");
}
else
{
throw new ClientIllegalStateException("Cannot read message Footer until message body fully read");
}
}
return footer?.Value?.Count > 0;
}
}
public IMessage<Stream> ForEachFooter(Action<string, object> consumer)
{
if (HasFooters)
{
foreach (KeyValuePair<Symbol, object> entry in footer.Value)
{
consumer.Invoke(entry.Key.ToString(), entry.Value);
}
}
return this;
}
public object GetFooter(string key)
{
object result = null;
Footer?.Value?.TryGetValue(Symbol.Lookup(key), out result);
return result;
}
public IMessage<Stream> SetFooter(string key, object value)
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public bool HasFooter(string key)
{
return Footer?.Value?.ContainsKey(Symbol.Lookup(key)) ?? false;
}
public object RemoveFooter(string key)
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
#endregion
#region AMQP Message body access API
public Stream Body
{
get
{
if (currentState > StreamState.BODY_READABLE)
{
if (currentState == StreamState.DECODE_ERROR)
{
throw new ClientException("Cannot read body due to decoding error in message payload");
}
else if (bodyStream != null)
{
throw new ClientIllegalStateException("Cannot read body from message whose body has already been read.");
}
}
EnsureStreamDecodedTo(StreamState.BODY_READABLE);
return bodyStream;
}
set => throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public IAdvancedMessage<Stream> ForEachBodySection(Action<ISection> consumer)
{
throw new ClientUnsupportedOperationException("Cannot iterate all body sections from a StreamReceiverMessage instance.");
}
public IAdvancedMessage<Stream> AddBodySection(ISection section)
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public IAdvancedMessage<Stream> ClearBodySections()
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
public IEnumerable<ISection> GetBodySections()
{
throw new ClientUnsupportedOperationException("Cannot iterate all body sections from a StreamReceiverMessage instance.");
}
public IAdvancedMessage<Stream> SetBodySections(IEnumerable<ISection> section)
{
throw new ClientUnsupportedOperationException("Cannot write to a StreamReceiveMessage");
}
#endregion
#region Private stream receiver message implementation
/// <summary>
/// Used to locate where in the stream read precess the message is currently
/// </summary>
private enum StreamState
{
IDLE,
HEADER_READ,
DELIVERY_ANNOTATIONS_READ,
MESSAGE_ANNOTATIONS_READ,
PROPERTIES_READ,
APPLICATION_PROPERTIES_READ,
BODY_PENDING,
BODY_READABLE,
FOOTER_READ,
DECODE_ERROR
}
private void CheckClosedOrAborted()
{
if (receiver.IsClosed)
{
throw new ClientIllegalStateException("The parent Receiver instance has already been closed.");
}
if (Aborted)
{
throw new ClientIllegalStateException("The incoming delivery was aborted.");
}
}
private ClientStreamReceiverMessage EnsureStreamDecodedTo(StreamState desiredState)
{
CheckClosedOrAborted();
while (currentState < desiredState)
{
try
{
IStreamTypeDecoder decoder;
try
{
decoder = protonDecoder.ReadNextTypeDecoder(deliveryStream, decoderState);
}
catch (DecodeEOFException)
{
currentState = StreamState.FOOTER_READ;
break;
}
Type typeClass = decoder.DecodesType;
if (typeClass == typeof(Header))
{
header = (Header)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.HEADER_READ;
}
else if (typeClass == typeof(DeliveryAnnotations))
{
deliveryAnnotations = (DeliveryAnnotations)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.DELIVERY_ANNOTATIONS_READ;
}
else if (typeClass == typeof(MessageAnnotations))
{
annotations = (MessageAnnotations)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.MESSAGE_ANNOTATIONS_READ;
}
else if (typeClass == typeof(Properties))
{
properties = (Properties)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.PROPERTIES_READ;
}
else if (typeClass == typeof(ApplicationProperties))
{
applicationProperties = (ApplicationProperties)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.APPLICATION_PROPERTIES_READ;
}
else if (typeClass == typeof(AmqpSequence))
{
currentState = StreamState.BODY_READABLE;
if (bodyStream == null)
{
bodyStream = new AmqpSequenceInputStream(this);
}
}
else if (typeClass == typeof(AmqpValue))
{
currentState = StreamState.BODY_READABLE;
if (bodyStream == null)
{
bodyStream = new AmqpValueInputStream(this);
}
}
else if (typeClass == typeof(Data))
{
currentState = StreamState.BODY_READABLE;
if (bodyStream == null)
{
bodyStream = new DataSectionInputStream(this);
}
}
else if (typeClass == typeof(Footer))
{
footer = (Footer)decoder.ReadValue(deliveryStream, decoderState);
currentState = StreamState.FOOTER_READ;
}
else
{
throw new ClientMessageFormatViolationException("Incoming message carries unknown Section");
}
}
catch (Exception ex) when (ex is ClientMessageFormatViolationException or DecodeException)
{
currentState = StreamState.DECODE_ERROR;
if (deliveryStream != null)
{
try
{
deliveryStream.Close();
}
catch (IOException)
{
}
}
// TODO: At the moment there is no automatic rejection or release etc
// of the delivery. The user is expected to apply a disposition in
// response to this error that initiates the desired outcome. We
// could look to add auto settlement with a configured outcome in
// the future.
throw ClientExceptionSupport.CreateNonFatalOrPassthrough(ex);
}
}
return this;
}
#endregion
#region Message Body Input Stream implementation
internal abstract class MessageBodyInputStream : Stream
{
protected readonly Stream rawInputStream;
protected readonly ClientStreamReceiverMessage message;
protected bool closed;
protected uint remainingSectionBytes = 0;
public MessageBodyInputStream(ClientStreamReceiverMessage message)
{
this.message = message;
this.rawInputStream = message.deliveryStream;
ValidateAndScanNextSection();
}
public override bool CanRead => true;
public override bool CanSeek => rawInputStream.CanSeek;
public override bool CanWrite => false;
public override long Length => rawInputStream.Length;
public override long Position
{
get => rawInputStream.Position;
set => rawInputStream.Position = value;
}
public abstract Type BodyTypeClass { get; }
public override void Close()
{
try
{
// This will check is another body section is present or if there was a footer and if
// a Footer is present it will be decoded and the message payload should be fully consumed
// at that point. Otherwise the underlying raw InputStream will handle the task of
// discarding pending bytes for the message to ensure the receiver does not still on
// waiting for session window to be opened.
if (remainingSectionBytes == 0)
{
message.EnsureStreamDecodedTo(StreamState.FOOTER_READ);
}
}
catch (ClientException e)
{
throw new IOException("Caught error while attempting to advance past remaining message body", e);
}
finally
{
closed = true;
rawInputStream.Close();
base.Close();
}
}
public override void Flush()
{
}
public override int ReadByte()
{
CheckClosed();
while (true)
{
if (remainingSectionBytes == 0 && !TryMoveToNextBodySection())
{
return -1; // Cannot read any further.
}
else
{
remainingSectionBytes--;
return rawInputStream.ReadByte();
}
}
}
public override int Read(Span<byte> buffer)
{
CheckClosed();
int bytesRead = 0;
for (; bytesRead < buffer.Length; ++bytesRead)
{
if (remainingSectionBytes == 0 && !TryMoveToNextBodySection())
{
break; // We are at the end of the body sections
}
int result = ReadByte();
if (result >= 0)
{
buffer[bytesRead] = (byte)result;
}
else
{
break;
}
}
return bytesRead;
}
public override int Read(byte[] buffer, int offset, int count)
{
CheckClosed();
int bytesRead = 0;
while (bytesRead != count)
{
if (remainingSectionBytes == 0 && !TryMoveToNextBodySection())
{
bytesRead = bytesRead > 0 ? bytesRead : 0;
break; // We are at the end of the body sections
}
int readChunk = (int)Math.Min(remainingSectionBytes, count - bytesRead);
int actualRead = rawInputStream.Read(buffer, offset + bytesRead, readChunk);
if (actualRead > 0)
{
bytesRead += actualRead;
remainingSectionBytes -= (uint)actualRead;
}
}
return bytesRead;
}
public override long Seek(long offset, SeekOrigin origin)
{
return rawInputStream.Seek(offset, origin);
}
public override void SetLength(long value)
{
throw new NotSupportedException("Cannot set length on an incoming streamed message Stream");
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException("Cannot write to an incoming streamed message Stream");
}
protected void CheckClosed()
{
if (closed)
{
throw new IOException("Stream was closed previously");
}
}
protected abstract void ValidateAndScanNextSection();
protected bool TryMoveToNextBodySection()
{
try
{
if (message.currentState != StreamState.FOOTER_READ)
{
message.currentState = StreamState.BODY_PENDING;
message.EnsureStreamDecodedTo(StreamState.BODY_READABLE);
if (message.currentState == StreamState.BODY_READABLE)
{
ValidateAndScanNextSection();
return true;
}
}
return false;
}
catch (ClientException e)
{
throw new IOException(e.Message, e);
}
}
}
internal class DataSectionInputStream : MessageBodyInputStream
{
public DataSectionInputStream(ClientStreamReceiverMessage message) : base(message)
{
}
public override Type BodyTypeClass => typeof(Data);
protected override void ValidateAndScanNextSection()
{
IStreamTypeDecoder typeDecoder =
message.protonDecoder.ReadNextTypeDecoder(rawInputStream, message.decoderState);
if (typeDecoder.DecodesType == typeof(IProtonBuffer))
{
LOG.Trace("Data Section of size {0} ready for read.", remainingSectionBytes);
IBinaryTypeDecoder binaryDecoder = (IBinaryTypeDecoder)typeDecoder;
remainingSectionBytes = (uint)binaryDecoder.ReadSize(rawInputStream, message.decoderState);
}
else if (typeDecoder.DecodesType == typeof(void))
{
// Null body in the Data section which can be skipped.
LOG.Trace("Data Section with no Binary payload read and skipped.");
remainingSectionBytes = 0;
}
else
{
throw new DecodeException("Unknown payload in body of Data Section encoding.");
}
}
}
internal class AmqpSequenceInputStream : MessageBodyInputStream
{
public AmqpSequenceInputStream(ClientStreamReceiverMessage message) : base(message)
{
}
public override Type BodyTypeClass => typeof(System.Collections.IList);
protected override void ValidateAndScanNextSection()
{
throw new DecodeException("Cannot read the payload of an AMQP Sequence payload.");
}
}
internal class AmqpValueInputStream : MessageBodyInputStream
{
public AmqpValueInputStream(ClientStreamReceiverMessage message) : base(message)
{
}
public override Type BodyTypeClass => typeof(void);
protected override void ValidateAndScanNextSection()
{
throw new DecodeException("Cannot read the payload of an AMQP Value payload.");
}
}
}
#endregion
}