in src/Proton.TestPeer/Driver/FrameDecoder.cs [308:397]
internal override void Parse(Stream input)
{
uint dataOffset = (uint)((input.ReadByte() << 2) & 0x3FF);
uint frameSize = (uint)(this.frameSize + FRAME_SIZE_BYTES);
ValidateDataOffset(dataOffset, frameSize);
int type = input.ReadByte() & 0xFF;
ushort channel = input.ReadUnsignedShort();
// note that this skips over the extended header if it's present
if (dataOffset != 8)
{
input.Position = input.Position + dataOffset - 8;
}
uint frameBodySize = frameSize - dataOffset;
byte[] payload = null;
object val = null;
if (frameBodySize > 0)
{
uint frameBodyStartIndex = (uint)input.Position;
try
{
decoder.codec.Decode(input);
}
catch (Exception e)
{
throw new Exception("Decoder failed reading remote input:", e);
}
DataType dataType = decoder.codec.DataType;
if (dataType != DataType.Described)
{
throw new ArgumentException(
"Frame body type expected to be " + DataType.Described + " but was: " + dataType);
}
try
{
val = decoder.codec.GetDescribedType();
}
finally
{
decoder.codec.Clear();
}
// Slice to the known Frame body size and use that as the buffer for any payload once
// the actual Performative has been decoded. The implies that the data comprising the
// performative will be held as long as the payload buffer is kept.
if (input.Position < input.Length)
{
// Check that the remaining bytes aren't part of another frame.
uint payloadSize = (uint)(frameBodySize - (input.Position - frameBodyStartIndex));
if (payloadSize > 0)
{
payload = input.ReadBytes((int)payloadSize);
}
}
}
else
{
decoder.logger.LogTrace("{} Read: CH[{}] : Heartbeat [{}]", frameHandler.Name, channel, payload);
decoder.TransitionToFrameSizeParsingStage();
frameHandler.HandleHeartbeat(frameSize, channel);
return;
}
if (type == AMQP_FRAME_TYPE)
{
PerformativeDescribedType performative = (PerformativeDescribedType)val;
decoder.logger.LogTrace("{} Read: CH[{}] : {} [{}]", frameHandler.Name, channel, performative, payload);
decoder.TransitionToFrameSizeParsingStage();
frameHandler.HandlePerformative(frameSize, performative, channel, payload);
}
else if (type == SASL_FRAME_TYPE)
{
SaslDescribedType performative = (SaslDescribedType)val;
decoder.logger.LogTrace("{} Read: {} [{}]", frameHandler.Name, performative, payload);
decoder.TransitionToFrameSizeParsingStage();
frameHandler.HandleSaslPerformative(frameSize, performative, channel, payload);
}
else
{
throw new ArgumentException(string.Format("unknown frame type: {0}", type));
}
}